deadpool/unmanaged/
mod.rs

1//! Unmanaged version of the pool.
2//!
3//! "Unmanaged" means that no manager is used to create and recycle objects.
4//! Objects either need to be created upfront or by adding them using the
5//! [`Pool::add()`] or [`Pool::try_add()`] methods.
6//!
7//! # Example
8//!
9//! ```rust
10//! use deadpool::unmanaged::Pool;
11//!
12//! struct Computer {}
13//!
14//! impl Computer {
15//!     async fn get_answer(&self) -> i32 {
16//!         42
17//!     }
18//! }
19//!
20//! #[tokio::main]
21//! async fn main() {
22//!     let pool = Pool::from(vec![
23//!         Computer {},
24//!         Computer {},
25//!     ]);
26//!     let s = pool.get().await.unwrap();
27//!     assert_eq!(s.get_answer().await, 42);
28//! }
29//! ```
30
31mod config;
32mod errors;
33
34use std::{
35    convert::TryInto,
36    ops::{Deref, DerefMut},
37    sync::{
38        atomic::{AtomicIsize, AtomicUsize, Ordering},
39        Arc, Mutex, Weak,
40    },
41    time::Duration,
42};
43
44use tokio::sync::{Semaphore, TryAcquireError};
45
46pub use crate::Status;
47
48pub use self::{config::PoolConfig, errors::PoolError};
49
50/// Wrapper around the actual pooled object which implements [`Deref`],
51/// [`DerefMut`] and [`Drop`] traits.
52///
53/// Use this object just as if it was of type `T` and upon leaving a scope the
54/// [`Drop::drop()`] will take care of returning it to the pool.
55#[derive(Debug)]
56#[must_use]
57pub struct Object<T> {
58    /// Actual pooled object.
59    obj: Option<T>,
60
61    /// Pool to return the pooled object to.
62    pool: Weak<PoolInner<T>>,
63}
64
65impl<T> Object<T> {
66    /// Takes this object from the pool permanently. This reduces the size of
67    /// the pool. If needed, the object can later be added back to the pool
68    /// using the [`Pool::add()`] or [`Pool::try_add()`] methods.
69    #[must_use]
70    pub fn take(mut this: Self) -> T {
71        if let Some(pool) = this.pool.upgrade() {
72            let _ = pool.size.fetch_sub(1, Ordering::Relaxed);
73            pool.size_semaphore.add_permits(1);
74        }
75        this.obj.take().unwrap()
76    }
77}
78
79impl<T> Drop for Object<T> {
80    fn drop(&mut self) {
81        if let Some(obj) = self.obj.take() {
82            if let Some(pool) = self.pool.upgrade() {
83                {
84                    let mut queue = pool.queue.lock().unwrap();
85                    queue.push(obj);
86                }
87                let _ = pool.available.fetch_add(1, Ordering::Relaxed);
88                pool.semaphore.add_permits(1);
89                pool.clean_up();
90            }
91        }
92    }
93}
94
95impl<T> Deref for Object<T> {
96    type Target = T;
97    fn deref(&self) -> &T {
98        self.obj.as_ref().unwrap()
99    }
100}
101
102impl<T> DerefMut for Object<T> {
103    fn deref_mut(&mut self) -> &mut T {
104        self.obj.as_mut().unwrap()
105    }
106}
107
108impl<T> AsRef<T> for Object<T> {
109    fn as_ref(&self) -> &T {
110        self
111    }
112}
113
114impl<T> AsMut<T> for Object<T> {
115    fn as_mut(&mut self) -> &mut T {
116        self
117    }
118}
119
120/// Generic object and connection pool. This is the static version of the pool
121/// which doesn't include.
122///
123/// This struct can be cloned and transferred across thread boundaries and uses
124/// reference counting for its internal state.
125///
126/// A pool of existing objects can be created from an existing collection of
127/// objects if it has a known exact size:
128/// ```rust
129/// use deadpool::unmanaged::Pool;
130/// let pool = Pool::from(vec![1, 2, 3]);
131/// ```
132#[derive(Debug)]
133pub struct Pool<T> {
134    inner: Arc<PoolInner<T>>,
135}
136
137impl<T> Clone for Pool<T> {
138    fn clone(&self) -> Self {
139        Self {
140            inner: self.inner.clone(),
141        }
142    }
143}
144
145impl<T> Default for Pool<T> {
146    fn default() -> Self {
147        Self::from_config(&PoolConfig::default())
148    }
149}
150
151impl<T> Pool<T> {
152    /// Creates a new empty [`Pool`] with the given `max_size`.
153    #[must_use]
154    pub fn new(max_size: usize) -> Self {
155        Self::from_config(&PoolConfig::new(max_size))
156    }
157
158    /// Create a new empty [`Pool`] using the given [`PoolConfig`].
159    #[must_use]
160    pub fn from_config(config: &PoolConfig) -> Self {
161        Self {
162            inner: Arc::new(PoolInner {
163                config: *config,
164                queue: Mutex::new(Vec::with_capacity(config.max_size)),
165                size: AtomicUsize::new(0),
166                size_semaphore: Semaphore::new(config.max_size),
167                available: AtomicIsize::new(0),
168                semaphore: Semaphore::new(0),
169            }),
170        }
171    }
172
173    /// Retrieves an [`Object`] from this [`Pool`] or waits for the one to
174    /// become available.
175    ///
176    /// # Errors
177    ///
178    /// See [`PoolError`] for details.
179    pub async fn get(&self) -> Result<Object<T>, PoolError> {
180        self.timeout_get(self.inner.config.timeout).await
181    }
182
183    /// Retrieves an [`Object`] from this [`Pool`] and doesn't wait if there is
184    /// currently no [`Object`] is available and the maximum [`Pool`] size has
185    /// been reached.
186    ///
187    /// # Errors
188    ///
189    /// See [`PoolError`] for details.
190    pub fn try_get(&self) -> Result<Object<T>, PoolError> {
191        let inner = self.inner.as_ref();
192        let permit = inner.semaphore.try_acquire().map_err(|e| match e {
193            TryAcquireError::NoPermits => PoolError::Timeout,
194            TryAcquireError::Closed => PoolError::Closed,
195        })?;
196        let obj = {
197            let mut queue = inner.queue.lock().unwrap();
198            queue.pop().unwrap()
199        };
200        permit.forget();
201        let _ = inner.available.fetch_sub(1, Ordering::Relaxed);
202        Ok(Object {
203            pool: Arc::downgrade(&self.inner),
204            obj: Some(obj),
205        })
206    }
207
208    /// Retrieves an [`Object`] from this [`Pool`] using a different `timeout`
209    /// than the configured one.
210    ///
211    /// # Errors
212    ///
213    /// See [`PoolError`] for details.
214    pub async fn timeout_get(&self, timeout: Option<Duration>) -> Result<Object<T>, PoolError> {
215        let inner = self.inner.as_ref();
216        let permit = match (timeout, inner.config.runtime) {
217            (None, _) => inner
218                .semaphore
219                .acquire()
220                .await
221                .map_err(|_| PoolError::Closed),
222            (Some(timeout), _) if timeout.as_nanos() == 0 => {
223                inner.semaphore.try_acquire().map_err(|e| match e {
224                    TryAcquireError::NoPermits => PoolError::Timeout,
225                    TryAcquireError::Closed => PoolError::Closed,
226                })
227            }
228            (Some(timeout), Some(runtime)) => runtime
229                .timeout(timeout, inner.semaphore.acquire())
230                .await
231                .ok_or(PoolError::Timeout)?
232                .map_err(|_| PoolError::Closed),
233            (Some(_), None) => Err(PoolError::NoRuntimeSpecified),
234        }?;
235        let obj = {
236            let mut queue = inner.queue.lock().unwrap();
237            queue.pop().unwrap()
238        };
239        permit.forget();
240        let _ = inner.available.fetch_sub(1, Ordering::Relaxed);
241        Ok(Object {
242            pool: Arc::downgrade(&self.inner),
243            obj: Some(obj),
244        })
245    }
246
247    /// Adds an `object` to this [`Pool`].
248    ///
249    /// If the [`Pool`] size has already reached its maximum, then this function
250    /// blocks until the `object` can be added to the [`Pool`].
251    ///
252    /// # Errors
253    ///
254    /// If the [`Pool`] has been closed a tuple containing the `object` and
255    /// the [`PoolError`] is returned instead.
256    pub async fn add(&self, object: T) -> Result<(), (T, PoolError)> {
257        match self.inner.size_semaphore.acquire().await {
258            Ok(permit) => {
259                permit.forget();
260                self._add(object);
261                Ok(())
262            }
263            Err(_) => Err((object, PoolError::Closed)),
264        }
265    }
266
267    /// Tries to add an `object` to this [`Pool`].
268    ///
269    /// # Errors
270    ///
271    /// If the [`Pool`] size has already reached its maximum, or the [`Pool`]
272    /// has been closed, then a tuple containing the `object` and the
273    /// [`PoolError`] is returned instead.
274    pub fn try_add(&self, object: T) -> Result<(), (T, PoolError)> {
275        match self.inner.size_semaphore.try_acquire() {
276            Ok(permit) => {
277                permit.forget();
278                self._add(object);
279                Ok(())
280            }
281            Err(e) => Err(match e {
282                TryAcquireError::NoPermits => (object, PoolError::Timeout),
283                TryAcquireError::Closed => (object, PoolError::Closed),
284            }),
285        }
286    }
287
288    /// Internal function which adds an `object` to this [`Pool`].
289    ///
290    /// Prior calling this it must be guaranteed that `size` doesn't exceed
291    /// `max_size`. In the methods `add` and `try_add` this is ensured by using
292    /// the `size_semaphore`.
293    fn _add(&self, object: T) {
294        let _ = self.inner.size.fetch_add(1, Ordering::Relaxed);
295        {
296            let mut queue = self.inner.queue.lock().unwrap();
297            queue.push(object);
298        }
299        let _ = self.inner.available.fetch_add(1, Ordering::Relaxed);
300        self.inner.semaphore.add_permits(1);
301    }
302
303    /// Removes an [`Object`] from this [`Pool`].
304    pub async fn remove(&self) -> Result<T, PoolError> {
305        self.get().await.map(Object::take)
306    }
307
308    /// Tries to remove an [`Object`] from this [`Pool`].
309    pub fn try_remove(&self) -> Result<T, PoolError> {
310        self.try_get().map(Object::take)
311    }
312
313    /// Removes an [`Object`] from this [`Pool`] using a different `timeout`
314    /// than the configured one.
315    pub async fn timeout_remove(&self, timeout: Option<Duration>) -> Result<T, PoolError> {
316        self.timeout_get(timeout).await.map(Object::take)
317    }
318
319    /// Closes this [`Pool`].
320    ///
321    /// All current and future tasks waiting for [`Object`]s will return
322    /// [`PoolError::Closed`] immediately.
323    pub fn close(&self) {
324        self.inner.semaphore.close();
325        self.inner.size_semaphore.close();
326        self.inner.clear();
327    }
328
329    /// Indicates whether this [`Pool`] has been closed.
330    pub fn is_closed(&self) -> bool {
331        self.inner.is_closed()
332    }
333
334    /// Retrieves [`Status`] of this [`Pool`].
335    #[must_use]
336    pub fn status(&self) -> Status {
337        let max_size = self.inner.config.max_size;
338        let size = self.inner.size.load(Ordering::Relaxed);
339        let available = self.inner.available.load(Ordering::Relaxed);
340        Status {
341            max_size,
342            size,
343            available: if available > 0 { available as usize } else { 0 },
344            waiting: if available < 0 {
345                (-available) as usize
346            } else {
347                0
348            },
349        }
350    }
351}
352
353#[derive(Debug)]
354struct PoolInner<T> {
355    config: PoolConfig,
356    queue: Mutex<Vec<T>>,
357    size: AtomicUsize,
358    /// This semaphore has as many permits as `max_size - size`. Every time
359    /// an [`Object`] is added to the [`Pool`] a permit is removed from the
360    /// semaphore and every time an [`Object`] is removed a permit is returned
361    /// back.
362    size_semaphore: Semaphore,
363    /// Number of available [`Object`]s in the [`Pool`]. If there are no
364    /// [`Object`]s in the [`Pool`] this number can become negative and store
365    /// the number of [`Future`]s waiting for an [`Object`].
366    ///
367    /// [`Future`]: std::future::Future
368    available: AtomicIsize,
369    semaphore: Semaphore,
370}
371
372impl<T> PoolInner<T> {
373    /// Cleans up internals of this [`Pool`].
374    ///
375    /// This method is called after closing the [`Pool`] and whenever an
376    /// [`Object`] is returned to the [`Pool`] and makes sure closed [`Pool`]s
377    /// don't contain any [`Object`]s.
378    fn clean_up(&self) {
379        if self.is_closed() {
380            self.clear();
381        }
382    }
383
384    /// Removes all the [`Object`]s which are currently part of this [`Pool`].
385    fn clear(&self) {
386        let mut queue = self.queue.lock().unwrap();
387        let _ = self.size.fetch_sub(queue.len(), Ordering::Relaxed);
388        let _ = self
389            .available
390            .fetch_sub(queue.len() as isize, Ordering::Relaxed);
391        queue.clear();
392    }
393
394    /// Indicates whether this [`Pool`] has been closed.
395    fn is_closed(&self) -> bool {
396        matches!(
397            self.semaphore.try_acquire_many(0),
398            Err(TryAcquireError::Closed)
399        )
400    }
401}
402
403impl<T, I> From<I> for Pool<T>
404where
405    I: IntoIterator<Item = T>,
406    <I as IntoIterator>::IntoIter: ExactSizeIterator,
407{
408    /// Creates a new [`Pool`] from the given [`ExactSizeIterator`] of
409    /// [`Object`]s.
410    fn from(iter: I) -> Self {
411        let queue = iter.into_iter().collect::<Vec<_>>();
412        let len = queue.len();
413        Self {
414            inner: Arc::new(PoolInner {
415                queue: Mutex::new(queue),
416                config: PoolConfig::new(len),
417                size: AtomicUsize::new(len),
418                size_semaphore: Semaphore::new(0),
419                available: AtomicIsize::new(len.try_into().unwrap()),
420                semaphore: Semaphore::new(len),
421            }),
422        }
423    }
424}