1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use crate::pool_object::PoolObject;
use std::sync::{Arc, Condvar, Mutex, RwLock, Weak};
use std::time::Duration;
use crate::Error;

#[derive(Clone, Debug)]
pub struct Config {
    pub wait_duration: Duration,
}

impl Default for Config {
    fn default() -> Self {
        Self {
            wait_duration: Duration::MAX,
        }
    }
}

pub type ArcPool<T> = Arc<Pool<T>>;

pub struct Pool<T: Send> {
    config: Config,
    storage: Arc<(Mutex<Vec<T>>, Condvar)>,
    self_ptr: RwLock<Option<Weak<Self>>>,
}

impl<T: Send + 'static> Pool<T> {

    pub fn new<I>(items: I) -> Result<ArcPool<T>, Error>
    where
        I: IntoIterator<Item = T>,
    {
        Self::with_config(Config::default(), items)
    }

    pub fn with_config<I>(config: Config, items: I) -> Result<ArcPool<T>, Error>
    where
        I: IntoIterator<Item = T>,
    {
        let objects = items.into_iter().collect();
        let pool = Self {
            config,
            storage: Arc::new((Mutex::new(objects), Condvar::new())),
            self_ptr: RwLock::new(None),
        };
        let pool_ptr = Arc::new(pool);
        *pool_ptr.self_ptr.write()? = Some(Arc::downgrade(&pool_ptr));
        Ok(pool_ptr)
    }

    pub fn take(&self) -> Result<Option<PoolObject<T>>, Error> {
        let (mtx, cvar) = &*self.storage;
        let mut lock = mtx.lock()?;
        while lock.is_empty() {
            let (new_lock, is_timeout) = cvar.wait_timeout(lock, self.config.wait_duration)?;
            if is_timeout.timed_out() {
                return Ok(None);
            }
            lock = new_lock;
        }
        let inner = lock.pop().unwrap();
        drop(lock);
        let pool_ptr = self.self_ptr.read()?.as_ref().unwrap().clone();
        Ok(Some(PoolObject::new(inner, pool_ptr)))
    }

    pub fn size(&self) -> Result<usize, Error> {
        Ok(self.storage.0.lock()?.len())
    }

    pub(crate) fn put(&self, item: T) -> Result<(), Error> {
        self.storage.0.lock()?.push(item);
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::ops::Deref;
    use crate::pool::{Config, Pool};

    #[test]
    fn test_workflow() -> anyhow::Result<()> {
        let config = Config {
            wait_duration: std::time::Duration::from_millis(5),
        };
        let pool = Pool::with_config(config, [1, 2, 3])?;
        assert_eq!(pool.size()?, 3);

        let obj1 = pool.take()?;
        assert_eq!(pool.size()?, 2);
        assert_eq!(*obj1.as_ref().unwrap().deref(), 3);

        let obj2 = pool.take()?;
        assert_eq!(*obj2.as_ref().unwrap().deref(), 2);
        let obj3 = pool.take()?;
        assert_eq!(pool.size()?, 0);
        assert_eq!(*obj3.as_ref().unwrap().deref(), 1);

        let obj4 = pool.take()?;
        assert!(obj4.is_none());

        Ok(())
    }
}