use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
use super::resource::{ResourceGuard, ResourceInfo, ResourceLock};
use super::util::thread_waker;
pub type ReleaseFn<T> = Box<dyn Fn(&mut T, ResourceInfo) -> bool + Send + Sync>;
pub type DisposeFn<T> = Box<dyn Fn(T, ResourceInfo) + Send + Sync>;
pub enum SharedEvent<T> {
Created(ResourceLock<T>),
Verify(Instant, ResourceLock<T>),
}
pub struct Shared<T> {
busy: AtomicBool,
count: AtomicUsize,
dispose_count: AtomicUsize,
event_queue: ConcurrentQueue<SharedEvent<T>>,
idle_queue: ConcurrentQueue<ResourceLock<T>>,
idle_timeout: Option<Duration>,
max_count: usize,
min_count: usize,
on_dispose: Option<DisposeFn<T>>,
on_release: Option<ReleaseFn<T>>,
waker: thread_waker::Waker,
}
impl<T> Shared<T> {
pub fn new(
on_release: Option<ReleaseFn<T>>,
on_dispose: Option<DisposeFn<T>>,
min_count: usize,
max_count: usize,
idle_timeout: Option<Duration>,
) -> (Self, thread_waker::Waiter) {
let (waker, waiter) = thread_waker::pair();
(
Self {
busy: AtomicBool::new(false),
count: AtomicUsize::new(0),
dispose_count: AtomicUsize::new(0),
event_queue: ConcurrentQueue::unbounded(),
idle_queue: ConcurrentQueue::unbounded(),
idle_timeout,
max_count,
min_count,
on_dispose,
on_release,
waker,
},
waiter,
)
}
pub fn dispose(&self, mut guard: ResourceGuard<T>) {
self.count.fetch_sub(1, Ordering::AcqRel);
if let Some(res) = guard.take() {
if let Some(dispose) = self.on_dispose.as_ref() {
(dispose)(res, *guard.info())
}
}
self.dispose_count.fetch_add(1, Ordering::AcqRel);
self.notify();
}
pub fn can_reuse(&self) -> bool {
self.idle_timeout.is_some()
}
#[inline]
fn check_reuse(&self, guard: &mut ResourceGuard<T>) -> bool {
if guard.is_some() && (guard.info().reusable || self.busy.load(Ordering::Acquire)) {
if let Some(check) = self.on_release.as_ref() {
let info = *guard.info();
(check)(guard.as_mut().unwrap(), info)
} else {
true
}
} else {
self.busy.load(Ordering::Acquire)
}
}
pub fn count(&self) -> usize {
self.count.load(Ordering::Acquire)
}
pub fn dispose_count(&self) -> usize {
self.dispose_count.load(Ordering::Acquire)
}
pub fn have_idle(&self) -> bool {
!self.idle_queue.is_empty()
}
pub fn is_busy(&self) -> bool {
self.busy.load(Ordering::Acquire)
}
pub fn max_count(&self) -> usize {
self.max_count
}
pub fn min_count(&self) -> usize {
self.min_count
}
pub fn notify(&self) {
self.waker.wake();
}
pub fn pop_event(&self) -> Option<SharedEvent<T>> {
self.event_queue.pop().ok()
}
pub fn push_event(&self, event: SharedEvent<T>) {
self.event_queue.push(event).unwrap_or(())
}
pub fn release(&self, mut guard: ResourceGuard<T>) {
let now = Instant::now();
guard.info_mut().last_idle.replace(now);
if !self.check_reuse(&mut guard) {
self.dispose(guard);
} else {
let verify_at = self.idle_timeout.clone().map(|dur| now + dur);
guard.info_mut().verify_at = verify_at;
let lock = guard.unlock();
if verify_at.is_some() {
self.event_queue
.push(SharedEvent::Verify(
verify_at.clone().unwrap_or_else(|| Instant::now()),
lock.clone(),
))
.unwrap_or(());
}
self.idle_queue.push(lock).unwrap_or(());
self.notify();
}
}
pub fn set_busy(&self, busy: bool) -> bool {
self.busy.swap(busy, Ordering::Release)
}
pub fn try_acquire_idle(&self) -> Option<ResourceGuard<T>> {
while let Ok(res) = self.idle_queue.pop() {
if let Some(mut guard) = res.try_lock() {
if guard.is_some() {
guard.info_mut().last_acquire.replace(Instant::now());
guard.info_mut().acquire_count += 1;
return Some(guard);
} else {
}
} else {
}
}
None
}
pub fn try_update_count(&self, prev_count: usize, count: usize) -> Result<usize, usize> {
self.count
.compare_exchange_weak(prev_count, count, Ordering::SeqCst, Ordering::Acquire)
}
}