minipool 0.1.2

Lightweight, generic tokio-based pool implementation for Rust.
Documentation
#![doc = include_str!("../README.md")]

use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use tokio::sync::{mpsc, Mutex};

#[cfg(test)]
mod tests;

/// Lightweight, generic tokio-based pool implementation for Rust.
///
/// This pool is save to be put into an [`Arc`].
pub struct Pool<T: PoolEntry> {
  len: Arc<AtomicUsize>,
  tx: mpsc::UnboundedSender<T>,
  rx: Mutex<mpsc::UnboundedReceiver<T>>,
}

/// Repsesents a entity that can be a member of a [`Pool`].
pub trait PoolEntry {
  /// Determinates if the PoolEntry should be reinserted into the pool and thereby is able to be
  /// reused in future invocations.
  ///
  /// # Examples
  /// - A closed TCP connection should probably be not inserted.
  /// - A simple buffer can always be reinserted.
  fn is_closed(&self) -> bool;
}

/// A wrapper around an occupied entry of a [`Pool`].
///
/// When this structure is dropped (falls out of
/// scope) and the entry is not yet closed ([`is_closed`]), the entry will be reinserted into the pool.
///
/// The data protected by the mutex can be accessed through this guard via its
/// [`Deref`] and [`DerefMut`] implementations.
///
/// This structure is created by the [`acquire`] method on [`Pool`].
///
/// [`is_closed`]: PoolEntry::is_closed
/// [`acquire`]: Pool::acquire
#[must_use = "if unused the PoolEntry will immediately reinserted"]
pub struct PoolGuard<T: PoolEntry> {
  inner: Option<T>,
  back: mpsc::UnboundedSender<T>,
  pool_len: Arc<AtomicUsize>,
}

unsafe impl<T: Send + PoolEntry> Send for PoolGuard<T> {}
unsafe impl<T: Sync + PoolEntry> Sync for PoolGuard<T> {}

impl<T: PoolEntry> Drop for PoolGuard<T> {
  fn drop(&mut self) {
    if let Some(inner) = self.inner.take() {
      if inner.is_closed() {
        self.pool_len.fetch_sub(1, Ordering::Relaxed);
      } else if self.back.send(inner).is_err() {
        // NOOP: channel is closed, probably because the program is shutting down
        //       so it's not bad if the entry get's dropped
      }
    }
  }
}

impl<T: PoolEntry> Deref for PoolGuard<T> {
  type Target = T;
  fn deref(&self) -> &Self::Target {
    match self.inner.as_ref() {
      Some(inner) => inner,
      None => unreachable!("PoolGuard can only be None after itself got dropped."),
    }
  }
}

impl<T: PoolEntry> DerefMut for PoolGuard<T> {
  fn deref_mut(&mut self) -> &mut Self::Target {
    match self.inner.as_mut() {
      Some(inner) => inner,
      None => unreachable!("PoolGuard can only be None after itself got dropped."),
    }
  }
}

impl<T: PoolEntry> Default for Pool<T> {
  fn default() -> Self {
    let (tx, rx) = mpsc::unbounded_channel();
    Self {
      len: Arc::new(AtomicUsize::new(0)),
      tx,
      rx: Mutex::new(rx),
    }
  }
}

impl<T: PoolEntry> Pool<T> {
  /// Tries to acquire an entry from the pool. May blocks untail an entry becomes
  /// available again. Entries are handed out on a first-come, first-served basis.
  ///
  /// Returns [`None`] if the pool is empty.
  pub async fn acquire(&self) -> Option<PoolGuard<T>> {
    if self.is_empty() {
      return None;
    }

    let mut rx = self.rx.lock().await;

    let inner = match rx.recv().await {
      Some(inner) => inner,
      None => unreachable!("Channel is closed"),
    };

    Some(PoolGuard {
      inner: Some(inner),
      back: self.tx.clone(),
      pool_len: self.len.clone(),
    })
  }
}

impl<T: PoolEntry> Pool<T> {
  /// Inserts an new [`PoolEntry`] into the pool.
  pub fn push(&self, inner: T) {
    self.len.fetch_add(1, Ordering::Relaxed);
    if self.tx.send(inner).is_err() {
      unreachable!("Cannel is closed");
    }
  }

  /// Returns the number of entries in the pool.
  pub fn len(&self) -> usize {
    self.len.load(Ordering::SeqCst)
  }

  /// Returns [`true`] if the pool has a length of 0.
  pub fn is_empty(&self) -> bool {
    self.len() == 0
  }
}