#![cfg_attr(not(test), deny(warnings, clippy::all, clippy::pedantic, clippy::cargo))]
#![allow(clippy::single_match_else)]
#![deny(missing_docs)]
#![forbid(unsafe_code)]
use self::wrapper::Wrapper;
use core::future::Future;
use core::iter::FromIterator;
use core::ops::{Deref, DerefMut};
use std::sync::Arc;
#[cfg(feature = "async")]
pub use async_::{AsyncLease, PoolStream};
pub use init::InitPool;
use parking_lot::lock_api::ArcMutexGuard;
use parking_lot::RawMutex;
#[cfg(feature = "async")]
mod async_;
pub mod init;
mod wrapper;
#[must_use]
pub struct Pool<T> {
inner: Arc<PoolInner<T>>,
}
struct PoolInner<T> {
buffer: lockfree::set::Set<Wrapper<T>>,
#[cfg(feature = "async")]
waiting_futures: Arc<async_::WaitingFutures<T>>,
}
impl<T> Default for PoolInner<T> {
#[inline]
fn default() -> Self {
Self {
buffer: lockfree::set::Set::default(),
#[cfg(feature = "async")]
waiting_futures: Arc::default(),
}
}
}
impl<T: Send + Sync + 'static> Pool<T> {
pub fn into_init_pool<I: init::Init>(self, init: I) -> InitPool<T, I> {
InitPool::new_from_pool(self, init)
}
}
impl<T> Pool<T> {
pub fn try_into_locked_pool(mut self) -> Result<LockedPool<T>, (Pool<T>, PoolConversionError)> {
let len = self.len();
if len == 0 {
return Err((self, PoolConversionError::EmptyPool));
}
let Some(_) = Arc::get_mut(&mut self.inner) else {
let count = Arc::strong_count(&self.inner) - 1;
return Err((self, PoolConversionError::OtherCopies { count }));
};
Ok(LockedPool { pool: self, len })
}
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn with_initial_size(pool_size: usize, mut init: impl FnMut() -> T) -> Self {
(0..pool_size).map(|_| init()).collect()
}
#[inline]
pub fn try_with_initial_size<E>(pool_size: usize, mut init: impl FnMut() -> Result<T, E>) -> Result<Self, E> {
let buffer = lockfree::set::Set::new();
for _ in 0..pool_size {
buffer
.insert(Wrapper::new(init()?))
.unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
}
Ok(Self {
inner: Arc::new(PoolInner {
buffer,
#[cfg(feature = "async")]
waiting_futures: Arc::default(),
}),
})
}
#[must_use]
#[inline]
pub fn try_get(&self) -> Option<Lease<T>> {
self.inner.buffer.iter().find_map(|wrapper| Lease::from_arc_mutex(&wrapper, self))
}
#[inline]
fn try_get_or_len(&self) -> Result<Lease<T>, usize> {
let mut count = 0;
let lease = self
.inner
.buffer
.iter()
.inspect(|_| count += 1)
.find_map(|wrapper| Lease::from_arc_mutex(&wrapper, self));
lease.ok_or(count)
}
#[cfg(feature = "async")]
#[inline]
pub async fn get(&self) -> Lease<T> {
self.get_async().await
}
#[cfg(feature = "async")]
#[inline]
fn get_async(&self) -> AsyncLease<T> {
let (sender, receiver) = futures_channel::oneshot::channel();
self.inner.waiting_futures.insert(sender);
if let Some(lease) = self.try_get() {
self.inner.waiting_futures.wake_next(lease);
}
AsyncLease::<T>::new(receiver)
}
#[cfg(feature = "async")]
#[inline]
pub fn stream(&self) -> impl futures_core::Stream<Item = Lease<T>> {
PoolStream::new(self)
}
#[inline]
pub fn try_get_or_new(&self, init: impl FnOnce() -> T) -> Lease<T> {
self.try_get().unwrap_or_else(|| self.insert_with_lease(init()))
}
#[inline]
pub async fn get_or_new<FUT: Future<Output = T>, FN: FnOnce() -> FUT>(&self, init: FN) -> Lease<T> {
match self.try_get() {
Some(lease) => lease,
None => self.insert_with_lease(init().await),
}
}
#[inline]
pub fn try_get_or_try_new<E>(&self, init: impl FnOnce() -> Result<T, E>) -> Result<Lease<T>, E> {
match self.try_get() {
Some(l) => Ok(l),
None => Ok(self.insert_with_lease(init()?)),
}
}
#[inline]
pub async fn get_or_try_new<E, FUT: Future<Output = Result<T, E>>, FN: FnOnce() -> FUT>(&self, init: FN) -> Result<Lease<T>, E> {
match self.try_get() {
None => Ok(self.insert_with_lease(init().await?)),
Some(l) => Ok(l),
}
}
#[inline]
pub fn try_get_or_new_with_cap(&self, cap: usize, init: impl FnOnce() -> T) -> Option<Lease<T>> {
match self.try_get_or_len() {
Ok(t) => Some(t),
Err(len) => (len < cap).then(|| self.insert_with_lease(init())),
}
}
#[cfg(feature = "async")]
#[inline]
pub async fn get_or_new_with_cap<FUT: Future<Output = T>, FN: FnOnce() -> FUT>(&self, cap: usize, init: FN) -> Lease<T> {
match self.try_get_or_len() {
Ok(t) => t,
Err(len) => {
if len >= cap {
return self.get().await;
}
self.insert_with_lease(init().await)
}
}
}
#[inline]
pub fn try_get_or_try_new_with_cap<E>(&self, cap: usize, init: impl FnOnce() -> Result<T, E>) -> Result<Option<Lease<T>>, E> {
match self.try_get_or_len() {
Ok(t) => Ok(Some(t)),
Err(len) => {
if len >= cap {
return Ok(None);
}
Ok(Some(self.insert_with_lease(init()?)))
}
}
}
#[cfg(feature = "async")]
#[inline]
pub async fn get_or_try_new_with_cap<E, FUT: Future<Output = Result<T, E>>, FN: FnOnce() -> FUT>(
&self,
cap: usize,
init: FN,
) -> Result<Lease<T>, E> {
match self.try_get_or_len() {
Ok(t) => Ok(t),
Err(len) => {
if len >= cap {
return Ok(self.get().await);
}
Ok(self.insert_with_lease(init().await?))
}
}
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.inner.buffer.iter().count()
}
#[inline]
pub fn clear(&self) {
self.inner.buffer.iter().for_each(|g| {
let wrapper: &Wrapper<_> = &g;
self.inner.buffer.remove(wrapper);
});
}
#[inline]
pub fn resize(&self, pool_size: usize, mut init: impl FnMut() -> T) {
let set = &self.inner.buffer;
self.inner.buffer.iter().skip(pool_size).for_each(|g| {
self.inner.buffer.remove(&*g);
});
set.extend((self.len()..pool_size).map(|_| Wrapper::new(init())));
}
#[inline]
pub fn try_resize<E>(&self, pool_size: usize, mut init: impl FnMut() -> Result<T, E>) -> Result<(), E> {
let set = &self.inner.buffer;
set.iter().skip(pool_size).for_each(|g| {
set.remove(&*g);
});
for _ in self.len()..pool_size {
set
.insert(Wrapper::new(init()?))
.unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
}
Ok(())
}
#[inline]
pub fn extend<I: IntoIterator<Item = T>>(&self, iter: I) {
self.inner.buffer.extend(iter.into_iter().map(Wrapper::new));
}
#[inline]
pub fn insert(&self, t: T) {
let lease = self.insert_with_lease(t);
self.notify(lease);
}
#[inline]
pub fn insert_with_lease(&self, t: T) -> Lease<T> {
let wrapper = Wrapper::new(t);
let lease = Lease::from_arc_mutex(&wrapper, self).unwrap_or_else(|| unreachable!("Wrapper is unlocked when new"));
self
.inner
.buffer
.insert(wrapper)
.unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
lease
}
#[must_use]
#[inline]
pub fn available(&self) -> usize {
self.inner.buffer.iter().filter(|b| !b.is_locked()).count()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.buffer.iter().next().is_none()
}
#[inline]
pub fn disassociate(&self, lease: &Lease<T>) {
self.inner.buffer.remove(&Wrapper(ArcMutexGuard::mutex(lease.guard()).clone()));
}
#[cfg_attr(not(feature = "async"), allow(clippy::unused_self))]
#[inline]
fn notify(&self, lease: Lease<T>) {
#[cfg(feature = "async")]
self.inner.waiting_futures.wake_next(lease);
#[cfg(not(feature = "async"))]
drop(lease);
}
}
impl<T: Default> Pool<T> {
#[inline]
pub fn get_or_default(&self) -> Lease<T> {
self.try_get_or_new(T::default)
}
#[must_use]
#[inline]
pub fn get_or_default_with_cap(&self, cap: usize) -> Option<Lease<T>> {
self.try_get_or_new_with_cap(cap, T::default)
}
#[inline]
pub fn resize_default(&self, pool_size: usize) {
self.resize(pool_size, T::default);
}
}
impl<T> Default for Pool<T> {
fn default() -> Self {
Self {
inner: Arc::new(PoolInner::default()),
}
}
}
impl<T> core::fmt::Debug for Pool<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
struct ListDebugger<'a, T> {
set: &'a lockfree::set::Set<Wrapper<T>>,
}
impl<T> core::fmt::Debug for ListDebugger<'_, T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_list().entries(self.set.iter().map(|m| !m.is_locked())).finish()
}
}
let mut s = f.debug_struct("Pool");
s.field("len", &self.len())
.field("available", &self.available())
.field("availabilities", &ListDebugger { set: &self.inner.buffer })
.finish()
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<T> FromIterator<T> for Pool<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self {
inner: Arc::new(PoolInner {
buffer: iter.into_iter().map(Wrapper::new).collect(),
#[cfg(feature = "async")]
waiting_futures: Arc::default(),
}),
}
}
}
#[derive(Default, Clone)]
pub struct LockedPool<T> {
pool: Pool<T>,
len: usize,
}
impl<T> LockedPool<T> {
pub fn try_into_pool(mut self) -> Result<Pool<T>, (LockedPool<T>, PoolConversionError)> {
let Some(_) = Arc::get_mut(&mut self.pool.inner) else {
let count = Arc::strong_count(&self.pool.inner) - 1;
return Err((self, PoolConversionError::OtherCopies { count }));
};
Ok(self.pool)
}
#[must_use]
#[inline]
pub fn try_get(&self) -> Option<Lease<T>> {
self.pool.try_get()
}
#[cfg(feature = "async")]
#[inline]
pub async fn get(&self) -> Lease<T> {
self.pool.get().await
}
#[cfg(feature = "async")]
#[inline]
pub fn stream(&self) -> impl futures_core::Stream<Item = Lease<T>> {
self.pool.stream()
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.len
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[must_use]
#[inline]
pub fn available(&self) -> usize {
self.pool.available()
}
}
impl<T> core::fmt::Debug for LockedPool<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.pool.fmt(f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolConversionError {
EmptyPool,
OtherCopies {
count: usize,
},
}
impl core::fmt::Display for PoolConversionError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::EmptyPool => f.write_str("Pool is empty"),
Self::OtherCopies { count } => write!(f, "Pool has {count} other copies"),
}
}
}
impl std::error::Error for PoolConversionError {}
#[must_use]
pub struct Lease<T> {
guard: Option<ArcMutexGuard<RawMutex, T>>,
#[cfg(feature = "async")]
waiting_futures: Arc<async_::WaitingFutures<T>>,
}
impl<T> Drop for Lease<T> {
fn drop(&mut self) {
#[cfg(feature = "async")]
{
if let Some(guard) = self.guard.take() {
let lease = Self {
guard: Some(guard),
waiting_futures: self.waiting_futures.clone(),
};
self.waiting_futures.wake_next(lease);
}
}
#[cfg(not(feature = "async"))]
{
self.guard.take();
}
}
}
impl<T> Lease<T> {
#[inline]
fn from_arc_mutex(arc: &Wrapper<T>, #[allow(unused)] pool: &Pool<T>) -> Option<Self> {
arc.0.try_lock_arc().map(|guard| Self {
guard: Some(guard),
#[cfg(feature = "async")]
waiting_futures: pool.inner.waiting_futures.clone(),
})
}
fn guard(&self) -> &ArcMutexGuard<RawMutex, T> {
self.guard.as_ref().unwrap()
}
fn guard_mut(&mut self) -> &mut ArcMutexGuard<RawMutex, T> {
self.guard.as_mut().unwrap()
}
#[cfg(feature = "async")]
fn drop_without_recursion(mut self) {
self.guard.take();
}
}
impl<T: core::fmt::Debug> core::fmt::Debug for Lease<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.deref().fmt(f)
}
}
impl<T> Deref for Lease<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.guard()
}
}
impl<T> DerefMut for Lease<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard_mut()
}
}
impl<T, U: ?Sized> AsRef<U> for Lease<T>
where
T: AsRef<U>,
{
fn as_ref(&self) -> &U {
self.deref().as_ref()
}
}
impl<T, U: ?Sized> AsMut<U> for Lease<T>
where
T: AsMut<U>,
{
fn as_mut(&mut self) -> &mut U {
self.deref_mut().as_mut()
}
}
#[allow(unused)]
fn asserts() {
fn bytes<B: AsRef<[u8]> + AsMut<[u8]>>() {}
fn send_sync_static_clone<F: Send + 'static + Clone>() {}
bytes::<Lease<Vec<u8>>>();
send_sync_static_clone::<Pool<u8>>();
send_sync_static_clone::<init::InitPool<u8, init::InitFn<u8>>>();
}