#![warn(missing_docs)]
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
#[async_trait]
pub trait Manager<T, E> {
async fn create(&self) -> Result<T, E>;
async fn recycle(&self, obj: &mut T) -> Result<(), E>;
}
enum ObjectState {
New,
Creating,
Recycling,
Ready,
}
pub struct Object<T, E> {
obj: Option<T>,
state: ObjectState,
pool: Weak<PoolInner<T, E>>,
}
impl<T, E> Object<T, E> {
fn new(pool: &Pool<T, E>) -> Object<T, E> {
Object {
obj: None,
state: ObjectState::New,
pool: Arc::downgrade(&pool.inner),
}
}
}
impl<T, E> Drop for Object<T, E> {
fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() {
match self.state {
ObjectState::New => {
pool.available.fetch_add(1, Ordering::Relaxed);
}
ObjectState::Creating => {
pool.available.fetch_add(1, Ordering::Relaxed);
pool.size.fetch_sub(1, Ordering::Relaxed);
}
ObjectState::Recycling => {
pool.available.fetch_add(1, Ordering::Relaxed);
if let Err(e) = pool.obj_sender.clone().try_send(self.obj.take()) {
pool.available.fetch_sub(1, Ordering::Relaxed);
pool.size.fetch_sub(1, Ordering::Relaxed);
if !std::thread::panicking() {
unreachable!("Could not return object to pool: {}", e);
}
}
}
ObjectState::Ready => {
pool.return_obj(self.obj.take());
}
}
}
self.obj = None;
self.state = ObjectState::New;
}
}
impl<T, E> Deref for Object<T, E> {
type Target = T;
fn deref(&self) -> &T {
self.obj.as_ref().unwrap()
}
}
impl<T, E> DerefMut for Object<T, E> {
fn deref_mut(&mut self) -> &mut T {
self.obj.as_mut().unwrap()
}
}
struct PoolInner<T, E> {
manager: Box<dyn Manager<T, E> + Sync + Send>,
max_size: usize,
obj_sender: Sender<Option<T>>,
obj_receiver: Mutex<Receiver<Option<T>>>,
size: AtomicUsize,
available: AtomicIsize,
}
impl<T, E> PoolInner<T, E> {
fn return_obj(&self, obj: Option<T>) {
match self.obj_sender.clone().try_send(obj) {
Ok(_) => {
self.available.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
self.size.fetch_sub(1, Ordering::Relaxed);
if !std::thread::panicking() {
unreachable!("Could not return object to pool: {}", e);
}
}
}
}
}
pub struct Pool<T, E> {
inner: Arc<PoolInner<T, E>>,
}
#[derive(Debug)]
pub struct Status {
size: usize,
available: isize,
}
impl<T, E> Clone for Pool<T, E> {
fn clone(&self) -> Pool<T, E> {
Pool {
inner: self.inner.clone(),
}
}
}
impl<T, E> Pool<T, E> {
pub fn new(manager: impl Manager<T, E> + Send + Sync + 'static, max_size: usize) -> Pool<T, E> {
let (obj_sender, obj_receiver) = channel::<Option<T>>(max_size);
Pool {
inner: Arc::new(PoolInner {
max_size: max_size,
manager: Box::new(manager),
obj_sender: obj_sender,
obj_receiver: Mutex::new(obj_receiver),
size: AtomicUsize::new(0),
available: AtomicIsize::new(0),
}),
}
}
pub async fn get(&self) -> Result<Object<T, E>, E> {
let mut available = self.inner.available.fetch_sub(1, Ordering::Relaxed);
let mut size = self.inner.size.load(Ordering::Relaxed);
let mut obj = Object::new(&self);
loop {
if available <= 0 && size < self.inner.max_size {
if self.inner.size.fetch_add(1, Ordering::Relaxed) < self.inner.max_size {
self.inner.available.fetch_add(1, Ordering::Relaxed);
obj.state = ObjectState::Creating;
obj.obj = Some(self.inner.manager.create().await?);
obj.state = ObjectState::Ready;
break;
} else {
self.inner.size.fetch_sub(1, Ordering::Relaxed);
}
}
let inner_obj = self.inner.obj_receiver.lock().await.recv().await.unwrap();
if let Some(inner_obj) = inner_obj {
obj.obj = Some(inner_obj);
obj.state = ObjectState::Recycling;
if self.inner.manager.recycle(&mut obj).await.is_ok() {
obj.state = ObjectState::Ready;
break;
}
obj.state = ObjectState::New;
}
size = self.inner.size.fetch_sub(1, Ordering::Relaxed) - 1;
available = self.inner.available.fetch_sub(1, Ordering::Relaxed);
}
Ok(obj)
}
pub fn status(&self) -> Status {
let size = self.inner.size.load(Ordering::Relaxed);
let available = self.inner.available.load(Ordering::Relaxed);
Status { size, available }
}
}