1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};

use async_trait::async_trait;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{channel, Receiver, Sender};

#[cfg(feature = "postgres")]
pub mod postgres;

#[async_trait]
pub trait Manager<T, E> {
    async fn create(&self) -> Result<T, E>;
    async fn recycle(&self, obj: T) -> Result<T, E>;
}

pub struct Object<T, E> {
    obj: Option<T>,
    pool: Weak<PoolInner<T, E>>
}

impl<T, E> Object<T, E> {
    fn new(pool: &Pool<T, E>, obj: T) -> Object<T, E> {
        Object {
            obj: Some(obj),
            pool: Arc::downgrade(&pool.inner),
        }
    }
}

impl<T, E> Drop for Object<T, E> {
    fn drop(&mut self) {
        if let Some(pool) = self.pool.upgrade() {
            pool.return_obj(self.obj.take().unwrap());
        }
    }
}

impl<T, E> Deref for Object<T, E> {
    type Target = T;
    fn deref(&self) -> &T {
        self.obj.as_ref().unwrap()
    }
}

impl<T, E> DerefMut for Object<T, E> {
    fn deref_mut(&mut self) -> &mut T {
        self.obj.as_mut().unwrap()
    }
}

#[derive(Default)]
pub struct PoolSize {
    current: AtomicUsize,
    available: AtomicIsize,
}

pub struct PoolInner<T, E>
{
    manager: Box<dyn Manager<T, E> + Sync + Send>,
    max_size: usize,
    obj_sender: Sender<T>,
    obj_receiver: Mutex<Receiver<T>>,
    size: PoolSize,
}

impl<T, E> PoolInner<T, E> {
    fn return_obj(&self, obj: T) {
        self.size.available.fetch_add(1, Ordering::SeqCst);
        self.obj_sender.clone().try_send(obj).map_err(|_| ()).unwrap();
    }
}

pub struct Pool<T, E> {
    inner: Arc<PoolInner<T, E>>
}

impl<T, E> Clone for Pool<T, E> {
    fn clone(&self) -> Pool<T, E> {
        Pool {
            inner: self.inner.clone()
        }
    }
}

impl<T, E> Pool<T, E> {
    pub fn new(manager: impl Manager<T, E> + Send + Sync + 'static, max_size: usize) -> Pool<T, E> {
        let (obj_sender, obj_receiver) = channel::<T>(max_size);
        Pool {
            inner: Arc::new(PoolInner {
                max_size: max_size,
                manager: Box::new(manager),
                obj_sender: obj_sender,
                obj_receiver: Mutex::new(obj_receiver),
                size: PoolSize::default(),
            })
        }
    }
    pub async fn get(&self) -> Result<Object<T, E>, E> {
        let available = self.inner.size.available.fetch_sub(1, Ordering::SeqCst);
        if available <= 0 && self.inner.size.current.load(Ordering::SeqCst) < self.inner.max_size {
            let current = self.inner.size.current.fetch_add(1, Ordering::SeqCst);
            if current < self.inner.max_size {
                self.inner.size.available.fetch_add(1, Ordering::SeqCst);
                let obj = self.inner.manager.create().await?;
                return Ok(Object::new(&self, obj))
            }
        }
        let obj = self.inner.obj_receiver.lock().await.recv().await.unwrap();
        let obj = self.inner.manager.recycle(obj).await?;
        return Ok(Object::new(&self, obj));
    }
}