use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use std::{future::Future, marker::PhantomData};
use async_trait::async_trait;
use tokio::sync::{Semaphore, TryAcquireError};
mod config;
pub use self::config::{PoolConfig, Timeouts};
mod errors;
pub use errors::{PoolError, RecycleError, TimeoutType};
use crate::runtime::{Runtime, TimeoutError};
pub use crate::Status;
pub type RecycleResult<E> = Result<(), RecycleError<E>>;
#[async_trait]
pub trait Manager {
type Type;
type Error;
async fn create(&self) -> Result<Self::Type, Self::Error>;
async fn recycle(&self, obj: &mut Self::Type) -> RecycleResult<Self::Error>;
fn detach(&self, _obj: &mut Self::Type) {}
}
enum ObjectState {
Waiting,
Receiving,
Creating,
Recycling,
Ready,
Taken,
Dropped,
}
pub struct Object<M: Manager> {
obj: Option<M::Type>,
state: ObjectState,
pool: Weak<PoolInner<M>>,
}
impl<M: Manager> Object<M> {
pub fn take(mut this: Self) -> M::Type {
this.state = ObjectState::Taken;
if let Some(pool) = this.pool.upgrade() {
pool.manager.detach(&mut this);
}
this.obj.take().unwrap()
}
}
impl<M: Manager> Drop for Object<M> {
fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() {
match self.state {
ObjectState::Waiting => {
pool.available.fetch_add(1, Ordering::Relaxed);
}
ObjectState::Receiving => {
pool.available.fetch_add(1, Ordering::Relaxed);
pool.semaphore.add_permits(1);
}
ObjectState::Creating | ObjectState::Taken => {
pool.size.fetch_sub(1, Ordering::Relaxed);
pool.semaphore.add_permits(1);
}
ObjectState::Recycling | ObjectState::Ready => {
pool.available.fetch_add(1, Ordering::Relaxed);
let obj = self.obj.take().unwrap();
{
let mut queue = pool.queue.lock().unwrap();
queue.push_back(obj);
}
pool.semaphore.add_permits(1);
pool.clean_up();
}
ObjectState::Dropped => {
}
}
}
self.obj = None;
self.state = ObjectState::Dropped;
}
}
impl<M: Manager> Deref for Object<M> {
type Target = M::Type;
fn deref(&self) -> &M::Type {
self.obj.as_ref().unwrap()
}
}
impl<M: Manager> DerefMut for Object<M> {
fn deref_mut(&mut self) -> &mut M::Type {
self.obj.as_mut().unwrap()
}
}
impl<M: Manager> AsRef<M::Type> for Object<M> {
fn as_ref(&self) -> &M::Type {
self
}
}
impl<M: Manager> AsMut<M::Type> for Object<M> {
fn as_mut(&mut self) -> &mut M::Type {
self
}
}
struct PoolInner<M: Manager> {
manager: Box<M>,
queue: std::sync::Mutex<VecDeque<M::Type>>,
size: AtomicUsize,
available: AtomicIsize,
semaphore: Semaphore,
config: PoolConfig,
}
pub struct Pool<M: Manager, W: From<Object<M>> = Object<M>> {
inner: Arc<PoolInner<M>>,
_wrapper: PhantomData<W>,
}
impl<M: Manager, W: From<Object<M>>> Clone for Pool<M, W> {
fn clone(&self) -> Pool<M, W> {
Pool {
inner: self.inner.clone(),
_wrapper: PhantomData::default(),
}
}
}
impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
pub fn new(manager: M, max_size: usize) -> Pool<M, W> {
Self::from_config(manager, PoolConfig::new(max_size))
}
pub fn from_config(manager: M, config: PoolConfig) -> Pool<M, W> {
Pool {
inner: Arc::new(PoolInner {
manager: Box::new(manager),
queue: std::sync::Mutex::new(VecDeque::with_capacity(config.max_size)),
size: AtomicUsize::new(0),
available: AtomicIsize::new(0),
semaphore: Semaphore::new(config.max_size),
config,
}),
_wrapper: PhantomData::default(),
}
}
pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
self.timeout_get(&self.inner.config.timeouts).await
}
pub async fn try_get(&self) -> Result<W, PoolError<M::Error>> {
let mut timeouts = self.inner.config.timeouts.clone();
timeouts.wait = Some(Duration::from_secs(0));
self.timeout_get(&timeouts).await
}
pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
self.inner.available.fetch_sub(1, Ordering::Relaxed);
let mut obj = Object {
obj: None,
state: ObjectState::Waiting,
pool: Arc::downgrade(&self.inner),
};
let non_blocking = match timeouts.wait {
Some(t) => t.as_nanos() == 0,
None => false,
};
let permit = if non_blocking {
self.inner.semaphore.try_acquire().map_err(|e| match e {
TryAcquireError::Closed => PoolError::Closed,
TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
})?
} else {
apply_timeout(
&self.inner.config.runtime,
TimeoutType::Wait,
self.inner.config.timeouts.wait,
async {
self.inner
.semaphore
.acquire()
.await
.map_err(|_| PoolError::Closed)
},
)
.await?
};
permit.forget();
loop {
obj.state = ObjectState::Receiving;
let inner_obj = {
let mut queue = self.inner.queue.lock().unwrap();
queue.pop_front()
};
match inner_obj {
Some(inner_obj) => {
obj.state = ObjectState::Recycling;
obj.obj = Some(inner_obj);
match apply_timeout(
&self.inner.config.runtime,
TimeoutType::Recycle,
self.inner.config.timeouts.recycle,
self.inner.manager.recycle(&mut obj),
)
.await
{
Ok(_) => break,
Err(_) => {
self.inner.available.fetch_sub(1, Ordering::Relaxed);
self.inner.size.fetch_sub(1, Ordering::Relaxed);
continue;
}
}
}
None => {
obj.state = ObjectState::Creating;
self.inner.available.fetch_add(1, Ordering::Relaxed);
self.inner.size.fetch_add(1, Ordering::Relaxed);
obj.obj = Some(
apply_timeout(
&self.inner.config.runtime,
TimeoutType::Create,
self.inner.config.timeouts.create,
self.inner.manager.create(),
)
.await?,
);
break;
}
}
}
obj.state = ObjectState::Ready;
Ok(obj.into())
}
pub fn close(&self) {
self.inner.semaphore.close();
self.inner.clean_up();
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn status(&self) -> Status {
let max_size = self.inner.config.max_size;
let size = self.inner.size.load(Ordering::Relaxed);
let available = self.inner.available.load(Ordering::Relaxed);
Status {
max_size,
size,
available,
}
}
pub fn manager(&self) -> &M {
&*self.inner.manager
}
}
impl<M: Manager> PoolInner<M> {
fn clean_up(&self) {
if self.is_closed() {
self.clear();
}
}
fn clear(&self) {
let mut queue = self.queue.lock().unwrap();
self.size.fetch_sub(queue.len(), Ordering::Relaxed);
self.available
.fetch_sub(queue.len() as isize, Ordering::Relaxed);
queue.clear();
}
fn is_closed(&self) -> bool {
matches!(
self.semaphore.try_acquire_many(0),
Err(TryAcquireError::Closed)
)
}
}
async fn apply_timeout<O, E>(
runtime: &Runtime,
timeout_type: TimeoutType,
duration: Option<Duration>,
future: impl Future<Output = Result<O, impl Into<PoolError<E>>>>,
) -> Result<O, PoolError<E>> {
match duration {
Some(duration) => match runtime.timeout(duration, future).await {
Ok(result) => result.map_err(Into::into),
Err(e) => Err(match e {
TimeoutError::NoRuntime => PoolError::NoRuntimeSpecified,
TimeoutError::Timeout => PoolError::Timeout(timeout_type),
}),
},
None => future.await.map_err(Into::into),
}
}