#![doc = include_str!("../README.md")]
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
#[cfg(test)]
mod tests;
pub struct Pool<T: PoolEntry> {
len: Arc<AtomicUsize>,
tx: mpsc::UnboundedSender<T>,
rx: Mutex<mpsc::UnboundedReceiver<T>>,
}
pub trait PoolEntry {
fn is_closed(&self) -> bool;
}
#[must_use = "if unused the PoolEntry will immediately reinserted"]
pub struct PoolGuard<T: PoolEntry> {
inner: Option<T>,
back: mpsc::UnboundedSender<T>,
pool_len: Arc<AtomicUsize>,
}
unsafe impl<T: Send + PoolEntry> Send for PoolGuard<T> {}
unsafe impl<T: Sync + PoolEntry> Sync for PoolGuard<T> {}
impl<T: PoolEntry> Drop for PoolGuard<T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
if inner.is_closed() {
self.pool_len.fetch_sub(1, Ordering::Relaxed);
} else if self.back.send(inner).is_err() {
}
}
}
}
impl<T: PoolEntry> Deref for PoolGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self.inner.as_ref() {
Some(inner) => inner,
None => unreachable!("PoolGuard can only be None after itself got dropped."),
}
}
}
impl<T: PoolEntry> DerefMut for PoolGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self.inner.as_mut() {
Some(inner) => inner,
None => unreachable!("PoolGuard can only be None after itself got dropped."),
}
}
}
impl<T: PoolEntry> Default for Pool<T> {
fn default() -> Self {
let (tx, rx) = mpsc::unbounded_channel();
Self {
len: Arc::new(AtomicUsize::new(0)),
tx,
rx: Mutex::new(rx),
}
}
}
impl<T: PoolEntry> Pool<T> {
pub async fn acquire(&self) -> Option<PoolGuard<T>> {
if self.is_empty() {
return None;
}
let mut rx = self.rx.lock().await;
let inner = match rx.recv().await {
Some(inner) => inner,
None => unreachable!("Channel is closed"),
};
Some(PoolGuard {
inner: Some(inner),
back: self.tx.clone(),
pool_len: self.len.clone(),
})
}
}
impl<T: PoolEntry> Pool<T> {
pub fn push(&self, inner: T) {
self.len.fetch_add(1, Ordering::Relaxed);
if self.tx.send(inner).is_err() {
unreachable!("Cannel is closed");
}
}
pub fn len(&self) -> usize {
self.len.load(Ordering::SeqCst)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}