1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use crate::pool_object::PoolObject;
use crate::{Config, Error};
use std::sync::{Arc, Condvar, Mutex, RwLock, Weak};

pub type ArcPool<T> = Arc<Pool<T>>;

/// A pool of objects.
/// After an object is taken from the pool, it is returned to the pool when it is dropped.
/// Pool items must be passed on creation by values:
/// # Examples
/// basic usage:
/// ```
/// let pool = autoreturn_pool::Pool::new([1, 2])?;
/// let item = pool.take()?.unwrap();
/// ```
/// with custom config:
/// ```
/// let config = autoreturn_pool::Config {
///    wait_duration: std::time::Duration::from_millis(5),
/// };
/// let pool = autoreturn_pool::Pool::with_config(config, [1, 2])?;
/// let item = pool.take()?.unwrap();
/// ```
pub struct Pool<T: Send> {
    config: Config,
    storage: Arc<(Mutex<Vec<T>>, Condvar)>,
    self_ptr: RwLock<Option<Weak<Self>>>,
}

impl<T: Send + 'static> Pool<T> {
    pub fn new<I>(items: I) -> Result<ArcPool<T>, Error>
    where
        I: IntoIterator<Item = T>,
    {
        Self::with_config(Config::default(), items)
    }

    pub fn with_config<I>(config: Config, items: I) -> Result<ArcPool<T>, Error>
    where
        I: IntoIterator<Item = T>,
    {
        let objects = items.into_iter().collect();
        let pool = Self {
            config,
            storage: Arc::new((Mutex::new(objects), Condvar::new())),
            self_ptr: RwLock::new(None),
        };
        let pool_ptr = Arc::new(pool);
        *pool_ptr.self_ptr.write()? = Some(Arc::downgrade(&pool_ptr));
        Ok(pool_ptr)
    }

    // Take an object from the pool.
    // If the pool is empty, the method will wait for the object to be returned to the pool.
    // If the wait duration is exceeded, the method will return `None`.
    pub fn take(&self) -> Result<Option<PoolObject<T>>, Error> {
        let (mtx, cvar) = &*self.storage;
        let mut lock = mtx.lock()?;
        while lock.is_empty() {
            let (new_lock, is_timeout) = cvar.wait_timeout(lock, self.config.wait_duration)?;
            if is_timeout.timed_out() {
                return Ok(None);
            }
            lock = new_lock;
        }
        let inner = lock.pop().unwrap();
        drop(lock);
        let pool_ptr = self.self_ptr.read()?.as_ref().unwrap().clone();
        Ok(Some(PoolObject::new(inner, pool_ptr)))
    }

    /// Get the number of available objects in the pool.
    pub fn size(&self) -> Result<usize, Error> {
        Ok(self.storage.0.lock()?.len())
    }

    /// Put an object back into the pool and notify one waiting thread.
    pub(crate) fn put(&self, item: T) -> Result<(), Error> {
        self.storage.0.lock()?.push(item);
        self.storage.1.notify_one();
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use crate::pool::Pool;
    use crate::Config;
    use std::ops::Deref;

    #[test]
    fn test_workflow() -> anyhow::Result<()> {
        let config = Config {
            wait_duration: std::time::Duration::from_millis(5),
        };
        let pool = Pool::with_config(config, [1, 2, 3])?;
        assert_eq!(pool.size()?, 3);

        let obj1 = pool.take()?;
        assert_eq!(pool.size()?, 2);
        assert_eq!(*obj1.as_ref().unwrap().deref(), 3);

        let obj2 = pool.take()?;
        assert_eq!(*obj2.as_ref().unwrap().deref(), 2);
        let obj3 = pool.take()?;
        assert_eq!(pool.size()?, 0);
        assert_eq!(*obj3.as_ref().unwrap().deref(), 1);

        let obj4 = pool.take()?;
        assert!(obj4.is_none());

        Ok(())
    }
}