mod config;
mod errors;
use std::{
convert::TryInto,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicIsize, AtomicUsize, Ordering},
Arc, Mutex, Weak,
},
time::Duration,
};
use tokio::sync::{Semaphore, TryAcquireError};
pub use crate::Status;
pub use self::{config::PoolConfig, errors::PoolError};
#[derive(Debug)]
#[must_use]
pub struct Object<T> {
obj: Option<T>,
pool: Weak<PoolInner<T>>,
}
impl<T> Object<T> {
#[must_use]
pub fn take(mut this: Self) -> T {
if let Some(pool) = this.pool.upgrade() {
let _ = pool.size.fetch_sub(1, Ordering::Relaxed);
pool.size_semaphore.add_permits(1);
}
this.obj.take().unwrap()
}
}
impl<T> Drop for Object<T> {
fn drop(&mut self) {
if let Some(obj) = self.obj.take() {
if let Some(pool) = self.pool.upgrade() {
{
let mut queue = pool.queue.lock().unwrap();
queue.push(obj);
}
let _ = pool.available.fetch_add(1, Ordering::Relaxed);
pool.semaphore.add_permits(1);
pool.clean_up();
}
}
}
}
impl<T> Deref for Object<T> {
type Target = T;
fn deref(&self) -> &T {
self.obj.as_ref().unwrap()
}
}
impl<T> DerefMut for Object<T> {
fn deref_mut(&mut self) -> &mut T {
self.obj.as_mut().unwrap()
}
}
impl<T> AsRef<T> for Object<T> {
fn as_ref(&self) -> &T {
self
}
}
impl<T> AsMut<T> for Object<T> {
fn as_mut(&mut self) -> &mut T {
self
}
}
#[derive(Debug)]
pub struct Pool<T> {
inner: Arc<PoolInner<T>>,
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> Default for Pool<T> {
fn default() -> Self {
Self::from_config(&PoolConfig::default())
}
}
impl<T> Pool<T> {
#[must_use]
pub fn new(max_size: usize) -> Self {
Self::from_config(&PoolConfig::new(max_size))
}
#[must_use]
pub fn from_config(config: &PoolConfig) -> Self {
Self {
inner: Arc::new(PoolInner {
config: *config,
queue: Mutex::new(Vec::with_capacity(config.max_size)),
size: AtomicUsize::new(0),
size_semaphore: Semaphore::new(config.max_size),
available: AtomicIsize::new(0),
semaphore: Semaphore::new(0),
}),
}
}
pub async fn get(&self) -> Result<Object<T>, PoolError> {
self.timeout_get(self.inner.config.timeout).await
}
pub fn try_get(&self) -> Result<Object<T>, PoolError> {
let inner = self.inner.as_ref();
let permit = inner.semaphore.try_acquire().map_err(|e| match e {
TryAcquireError::NoPermits => PoolError::Timeout,
TryAcquireError::Closed => PoolError::Closed,
})?;
let obj = {
let mut queue = inner.queue.lock().unwrap();
queue.pop().unwrap()
};
permit.forget();
let _ = inner.available.fetch_sub(1, Ordering::Relaxed);
Ok(Object {
pool: Arc::downgrade(&self.inner),
obj: Some(obj),
})
}
pub async fn timeout_get(&self, timeout: Option<Duration>) -> Result<Object<T>, PoolError> {
let inner = self.inner.as_ref();
let permit = match (timeout, inner.config.runtime) {
(None, _) => inner
.semaphore
.acquire()
.await
.map_err(|_| PoolError::Closed),
(Some(timeout), _) if timeout.as_nanos() == 0 => {
inner.semaphore.try_acquire().map_err(|e| match e {
TryAcquireError::NoPermits => PoolError::Timeout,
TryAcquireError::Closed => PoolError::Closed,
})
}
(Some(timeout), Some(runtime)) => runtime
.timeout(timeout, inner.semaphore.acquire())
.await
.ok_or(PoolError::Timeout)?
.map_err(|_| PoolError::Closed),
(Some(_), None) => Err(PoolError::NoRuntimeSpecified),
}?;
let obj = {
let mut queue = inner.queue.lock().unwrap();
queue.pop().unwrap()
};
permit.forget();
let _ = inner.available.fetch_sub(1, Ordering::Relaxed);
Ok(Object {
pool: Arc::downgrade(&self.inner),
obj: Some(obj),
})
}
pub async fn add(&self, object: T) -> Result<(), (T, PoolError)> {
match self.inner.size_semaphore.acquire().await {
Ok(permit) => {
permit.forget();
self._add(object);
Ok(())
}
Err(_) => Err((object, PoolError::Closed)),
}
}
pub fn try_add(&self, object: T) -> Result<(), (T, PoolError)> {
match self.inner.size_semaphore.try_acquire() {
Ok(permit) => {
permit.forget();
self._add(object);
Ok(())
}
Err(e) => Err(match e {
TryAcquireError::NoPermits => (object, PoolError::Timeout),
TryAcquireError::Closed => (object, PoolError::Closed),
}),
}
}
fn _add(&self, object: T) {
let _ = self.inner.size.fetch_add(1, Ordering::Relaxed);
{
let mut queue = self.inner.queue.lock().unwrap();
queue.push(object);
}
let _ = self.inner.available.fetch_add(1, Ordering::Relaxed);
self.inner.semaphore.add_permits(1);
}
pub async fn remove(&self) -> Result<T, PoolError> {
self.get().await.map(Object::take)
}
pub fn try_remove(&self) -> Result<T, PoolError> {
self.try_get().map(Object::take)
}
pub async fn timeout_remove(&self, timeout: Option<Duration>) -> Result<T, PoolError> {
self.timeout_get(timeout).await.map(Object::take)
}
pub fn close(&self) {
self.inner.semaphore.close();
self.inner.size_semaphore.close();
self.inner.clear();
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
#[must_use]
pub fn status(&self) -> Status {
let max_size = self.inner.config.max_size;
let size = self.inner.size.load(Ordering::Relaxed);
let available = self.inner.available.load(Ordering::Relaxed);
Status {
max_size,
size,
available: if available > 0 { available as usize } else { 0 },
waiting: if available < 0 {
(-available) as usize
} else {
0
},
}
}
}
#[derive(Debug)]
struct PoolInner<T> {
config: PoolConfig,
queue: Mutex<Vec<T>>,
size: AtomicUsize,
size_semaphore: Semaphore,
available: AtomicIsize,
semaphore: Semaphore,
}
impl<T> PoolInner<T> {
fn clean_up(&self) {
if self.is_closed() {
self.clear();
}
}
fn clear(&self) {
let mut queue = self.queue.lock().unwrap();
let _ = self.size.fetch_sub(queue.len(), Ordering::Relaxed);
let _ = self
.available
.fetch_sub(queue.len() as isize, Ordering::Relaxed);
queue.clear();
}
fn is_closed(&self) -> bool {
matches!(
self.semaphore.try_acquire_many(0),
Err(TryAcquireError::Closed)
)
}
}
impl<T, I> From<I> for Pool<T>
where
I: IntoIterator<Item = T>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
fn from(iter: I) -> Self {
let queue = iter.into_iter().collect::<Vec<_>>();
let len = queue.len();
Self {
inner: Arc::new(PoolInner {
queue: Mutex::new(queue),
config: PoolConfig::new(len),
size: AtomicUsize::new(len),
size_semaphore: Semaphore::new(0),
available: AtomicIsize::new(len.try_into().unwrap()),
semaphore: Semaphore::new(len),
}),
}
}
}