#![cfg_attr(not(test), deny(warnings, clippy::all, clippy::pedantic, clippy::cargo))]
#![allow(clippy::single_match_else)]
#![deny(unsafe_code, missing_docs)]
use self::wrapper::Wrapper;
use core::future::Future;
use core::iter::FromIterator;
use core::ops::{Deref, DerefMut};
use parking_lot::Mutex;
use std::sync::Arc;
#[cfg(feature = "async")]
pub use async_::{AsyncLease, PoolStream};
pub use init::InitPool;
#[cfg(feature = "async")]
mod async_;
pub mod init;
mod wrapper;
#[must_use]
pub struct Pool<T> {
inner: Arc<PoolInner<T>>,
}
struct PoolInner<T> {
buffer: lockfree::set::Set<Wrapper<T>>,
#[cfg(feature = "async")]
waiting_futures: async_::WaitingFutures,
}
impl<T> Default for PoolInner<T> {
#[inline]
fn default() -> Self {
Self {
buffer: lockfree::set::Set::default(),
#[cfg(feature = "async")]
waiting_futures: async_::WaitingFutures::default(),
}
}
}
impl<T> Pool<T> {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn with_initial_size(pool_size: usize, mut init: impl FnMut() -> T) -> Self {
(0..pool_size).map(|_| init()).collect()
}
#[inline]
pub fn try_with_initial_size<E>(pool_size: usize, mut init: impl FnMut() -> Result<T, E>) -> Result<Self, E> {
let buffer = lockfree::set::Set::new();
for _ in 0..pool_size {
buffer
.insert(Wrapper::new(init()?))
.unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
}
Ok(Self {
inner: Arc::new(PoolInner {
buffer,
#[cfg(feature = "async")]
waiting_futures: async_::WaitingFutures::new(),
}),
})
}
#[must_use]
#[inline]
pub fn get(&self) -> Option<Lease<T>> {
self.inner.buffer.iter().find_map(|wrapper| Lease::from_arc_mutex(&wrapper, self))
}
#[inline]
fn get_or_len(&self) -> Result<Lease<T>, usize> {
let mut count = 0;
let lease = self
.inner
.buffer
.iter()
.inspect(|_| count += 1)
.find_map(|wrapper| Lease::from_arc_mutex(&wrapper, self));
lease.ok_or(count)
}
#[cfg(feature = "async")]
#[inline]
pub fn get_async(&self) -> AsyncLease<T> {
AsyncLease::new(self)
}
#[cfg(feature = "async")]
#[inline]
pub fn stream(&self) -> PoolStream<T> {
PoolStream::new(self)
}
#[inline]
pub fn get_or_new(&self, init: impl FnOnce() -> T) -> Lease<T> {
self.get().map_or_else(|| self.insert_with_lease(init()), |t| t)
}
#[inline]
pub async fn get_or_new_async<FUT: Future<Output = T>, FN: FnOnce() -> FUT>(&self, init: FN) -> Lease<T> {
match self.get() {
Some(lease) => lease,
None => self.insert_with_lease(init().await),
}
}
#[inline]
pub fn get_or_try_new<E>(&self, init: impl FnOnce() -> Result<T, E>) -> Result<Lease<T>, E> {
match self.get() {
None => Ok(self.insert_with_lease(init()?)),
Some(l) => Ok(l),
}
}
#[inline]
pub async fn get_or_try_new_async<E, FUT: Future<Output = Result<T, E>>, FN: FnOnce() -> FUT>(&self, init: FN) -> Result<Lease<T>, E> {
match self.get() {
None => Ok(self.insert_with_lease(init().await?)),
Some(l) => Ok(l),
}
}
#[inline]
pub fn get_or_new_with_cap(&self, cap: usize, init: impl FnOnce() -> T) -> Option<Lease<T>> {
match self.get_or_len() {
Ok(t) => Some(t),
Err(len) => (len < cap).then(|| self.insert_with_lease(init())),
}
}
#[cfg(feature = "async")]
#[inline]
pub async fn get_or_new_with_cap_async<FUT: Future<Output = T>, FN: FnOnce() -> FUT>(&self, cap: usize, init: FN) -> Lease<T> {
match self.get_or_len() {
Ok(t) => t,
Err(len) => {
if len >= cap {
return self.get_async().await;
}
self.insert_with_lease(init().await)
}
}
}
#[inline]
pub fn get_or_try_new_with_cap<E>(&self, cap: usize, init: impl FnOnce() -> Result<T, E>) -> Result<Option<Lease<T>>, E> {
match self.get_or_len() {
Ok(t) => Ok(Some(t)),
Err(len) => {
if len >= cap {
return Ok(None);
}
Ok(Some(self.insert_with_lease(init()?)))
}
}
}
#[cfg(feature = "async")]
#[inline]
pub async fn get_or_try_new_with_cap_async<E, FUT: Future<Output = Result<T, E>>, FN: FnOnce() -> FUT>(
&self,
cap: usize,
init: FN,
) -> Result<Lease<T>, E> {
match self.get_or_len() {
Ok(t) => Ok(t),
Err(len) => {
if len >= cap {
return Ok(self.get_async().await);
}
Ok(self.insert_with_lease(init().await?))
}
}
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.inner.buffer.iter().count()
}
#[inline]
pub fn clear(&self) {
self.inner.buffer.iter().for_each(|g| {
let wrapper: &Wrapper<_> = &g;
self.inner.buffer.remove(wrapper);
});
}
#[inline]
pub fn resize(&self, pool_size: usize, mut init: impl FnMut() -> T) {
let set = &self.inner.buffer;
self.inner.buffer.iter().skip(pool_size).for_each(|g| {
self.inner.buffer.remove(&*g);
});
set.extend((self.len()..pool_size).map(|_| Wrapper::new(init())));
}
#[inline]
pub fn try_resize<E>(&self, pool_size: usize, mut init: impl FnMut() -> Result<T, E>) -> Result<(), E> {
let set = &self.inner.buffer;
set.iter().skip(pool_size).for_each(|g| {
set.remove(&*g);
});
for _ in self.len()..pool_size {
set
.insert(Wrapper::new(init()?))
.unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
}
Ok(())
}
#[inline]
pub fn extend<I: IntoIterator<Item = T>>(&self, iter: I) {
self.inner.buffer.extend(iter.into_iter().map(Wrapper::new));
}
#[inline]
pub fn insert(&self, t: T) {
let wrapper = Wrapper::new(t);
self
.inner
.buffer
.insert(wrapper)
.unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
self.notify();
}
#[inline]
pub fn insert_with_lease(&self, t: T) -> Lease<T> {
let wrapper = Wrapper::new(t);
let lease = Lease::from_arc_mutex(&wrapper, self).unwrap_or_else(|| unreachable!("Wrapper is unlocked when new"));
self
.inner
.buffer
.insert(wrapper)
.unwrap_or_else(|_| unreachable!("Each new wrapper should be unique"));
lease
}
#[must_use]
#[inline]
pub fn available(&self) -> usize {
self.inner.buffer.iter().filter(|b| !b.is_locked()).count()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.buffer.iter().next().is_none()
}
#[inline]
pub fn disassociate(&self, lease: &Lease<T>) {
self.inner.buffer.remove(&Wrapper(lease.mutex.clone()));
}
#[cfg_attr(not(feature = "async"), allow(clippy::unused_self))]
#[inline]
fn notify(&self) {
#[cfg(feature = "async")]
self.inner.waiting_futures.wake_next();
}
}
impl<T: Default> Pool<T> {
#[inline]
pub fn get_or_default(&self) -> Lease<T> {
self.get_or_new(T::default)
}
#[must_use]
#[inline]
pub fn get_or_default_with_cap(&self, cap: usize) -> Option<Lease<T>> {
self.get_or_new_with_cap(cap, T::default)
}
#[inline]
pub fn resize_default(&self, pool_size: usize) {
self.resize(pool_size, T::default);
}
}
impl<T> Default for Pool<T> {
fn default() -> Self {
Self {
inner: Arc::new(PoolInner::default()),
}
}
}
impl<T> core::fmt::Debug for Pool<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
struct ListDebugger<'a, T> {
set: &'a lockfree::set::Set<Wrapper<T>>,
}
impl<T> core::fmt::Debug for ListDebugger<'_, T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_list().entries(self.set.iter().map(|m| !m.is_locked())).finish()
}
}
let mut s = f.debug_struct("Pool");
s.field("len", &self.len())
.field("available", &self.available())
.field("availabilities", &ListDebugger { set: &self.inner.buffer })
.finish()
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<T> FromIterator<T> for Pool<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self {
inner: Arc::new(PoolInner {
buffer: iter.into_iter().map(Wrapper::new).collect(),
#[cfg(feature = "async")]
waiting_futures: async_::WaitingFutures::new(),
}),
}
}
}
#[allow(unused)]
fn assert_lease_is_static() {
fn is_static<T: 'static>() {};
is_static::<Lease<()>>();
}
#[must_use]
pub struct Lease<T> {
mutex: Arc<Mutex<T>>,
#[cfg(feature = "async")]
pool: Pool<T>,
}
impl<T> Drop for Lease<T> {
fn drop(&mut self) {
#[allow(unsafe_code)]
unsafe {
self.mutex.force_unlock();
}
#[cfg(feature = "async")]
self.pool.notify();
}
}
impl<T> Lease<T> {
#[inline]
fn from_arc_mutex(arc: &Arc<Mutex<T>>, #[allow(unused)] pool: &Pool<T>) -> Option<Self> {
arc.try_lock().map(|guard| {
std::mem::forget(guard);
Self {
mutex: arc.clone(),
#[cfg(feature = "async")]
pool: pool.clone(),
}
})
}
}
impl<T: core::fmt::Debug> core::fmt::Debug for Lease<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.deref().fmt(f)
}
}
impl<T> Deref for Lease<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
debug_assert!(self.mutex.is_locked());
#[allow(unsafe_code)]
unsafe {
&*self.mutex.data_ptr()
}
}
}
impl<T> DerefMut for Lease<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
debug_assert!(self.mutex.is_locked());
#[allow(unsafe_code)]
unsafe {
&mut *self.mutex.data_ptr()
}
}
}
impl<T, U: ?Sized> AsRef<U> for Lease<T>
where
T: AsRef<U>,
{
fn as_ref(&self) -> &U {
self.deref().as_ref()
}
}
impl<T, U: ?Sized> AsMut<U> for Lease<T>
where
T: AsMut<U>,
{
fn as_mut(&mut self) -> &mut U {
self.deref_mut().as_mut()
}
}
#[allow(unused)]
fn asserts() {
fn bytes<B: AsRef<[u8]>>() {}
fn send_sync_static_clone<F: Send + Sync + 'static + Clone>() {}
bytes::<Lease<Vec<u8>>>();
send_sync_static_clone::<Pool<u8>>();
send_sync_static_clone::<init::InitPool<u8, init::InitFn<u8>>>();
}