indexed_pool/
lib.rs

1//! A thread-safe indexed object pool with automatic return and attach/detach semantics.
2//!
3//! The goal of an object pool is to reuse expensive to allocate objects or frequently allocated objects.
4//! This pool implementation allows to index different object pools of the same type as subpools.
5//! For example you could use this to pool SSH connection objects, indexed by ipaddress:port.
6//!
7//! This pool is bound to sizes to avoid to keep allocations in control. There are 2 different
8//! sizes that can be setup at creation time, `max_pool_indexes` and `max_single_pool_size`, the
9//! former controlling how many indexes the pool can contain and the latter controlling the max
10//! size of the subpool of a single index.
11//!
12//! On top of that indexes have an expiration time expressed as a duration.
13//!
14//! # Examples
15//!
16//! ## Creating a Pool
17//!
18//! The default pool creation looks like this, with 32 indexes with a capacity of 8 elements
19//! expiring after 5 minutes from creation :
20//! ```no_run
21//! use indexed_pool::Pool;
22//!
23//! let pool: Pool<String> = Pool::default();
24//! ```
25//! Example pool with 64 `Vec<u8>` with capacity of 64 elements each expiring after 10 seconds:
26//! ```
27//! use std::time::Duration;
28//! use indexed_pool::Pool;
29//!
30//! let pool: Pool<u8> = Pool::new(64, 64, Duration::from_secs(10));
31//! ```
32//!
33//! ## Using a Pool
34//!
35//! Basic usage for pulling from the pool
36//! ```
37//! use indexed_pool::Pool;
38//!
39//! struct Obj {
40//!     size: usize,
41//! }
42//!
43//! let pool: Pool<Obj> = Pool::default();
44//! let obj = pool.pull("item1", || Obj{size: 1});
45//! assert_eq!(obj.size, 1);
46//! let obj2 = pool.pull("item1", || Obj{size: 1});
47//! assert_eq!(obj.size, 1);
48//! ```
49//! Pull from pool and `detach()`
50//! ```
51//! use indexed_pool::Pool;
52//!
53//! struct Obj {
54//!     size: usize,
55//! }
56//!
57//! let pool: Pool<Obj> = Pool::default();
58//! let obj = pool.pull("item1", || Obj{size: 1});
59//! assert_eq!(obj.size, 1);
60//! let obj2 = pool.pull("item1", || Obj{size: 1});
61//! assert_eq!(obj.size, 1);
62//! let (pool, obj) = obj.detach();
63//! assert_eq!(obj.size, 1);
64//! pool.attach("item1", obj);
65//! ```
66//!
67//! ## Using Across Threads
68//!
69//! You simply wrap the pool in a [`std::sync::Arc`]
70//! ```no_run
71//! use std::sync::Arc;
72//! use indexed_pool::Pool;
73//!
74//! let pool: Arc<Pool<String>> = Arc::new(Pool::default());
75//! ```
76//!
77//! # Warning
78//!
79//! Objects in the pool are not automatically reset, they are returned but NOT reset
80//! You may want to call `object.reset()` or  `object.clear()`
81//! or any other equivalent for the object that you are using, after pulling from the pool
82//!
83//! [`std::sync::Arc`]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
84extern crate log;
85extern crate parking_lot;
86
87use std::collections::BTreeMap;
88use std::mem::{forget, ManuallyDrop};
89use std::ops::{Deref, DerefMut};
90use std::time::{Duration, Instant};
91
92use parking_lot::RwLock;
93
94#[cfg(not(test))]
95use log::{debug, error, info};
96
97#[cfg(test)]
98use std::{println as info, println as error, println as debug};
99
100const DEFAULT_POOL_INDEXES: usize = 32;
101const DEFAULT_SINGLE_POOL_SIZE: usize = 8;
102const DEFAULT_EXPIRATION: Duration = Duration::from_secs(300);
103
104#[derive(Debug)]
105struct Inner<T> {
106    inner: T,
107    start_time: Instant,
108}
109
110impl<T> Inner<T> {
111    pub fn new(object: T, start_time: Instant) -> Self {
112        Self { inner: object, start_time }
113    }
114
115    pub fn expired(&self, duration: Duration) -> bool {
116        self.start_time.elapsed().as_millis() > duration.as_millis()
117    }
118}
119
120#[derive(Debug)]
121pub struct Pool<T> {
122    max_pool_indexes: usize,
123    max_single_pool_size: usize,
124    expiration: Duration,
125    objects: RwLock<BTreeMap<String, Vec<Inner<T>>>>,
126}
127
128impl<T> Pool<T> {
129    pub fn new(max_pool_indexes: usize, max_single_pool_size: usize, expiration: Duration) -> Pool<T> {
130        Pool { max_pool_indexes, max_single_pool_size, expiration, objects: RwLock::new(BTreeMap::new()) }
131    }
132
133    pub fn default() -> Pool<T> {
134        Pool::new(DEFAULT_POOL_INDEXES, DEFAULT_SINGLE_POOL_SIZE, DEFAULT_EXPIRATION)
135    }
136
137    pub fn size(&self) -> usize {
138        self.objects.read().len()
139    }
140
141    pub fn len(&self, item: &str) -> usize {
142        match self.objects.read().get(item) {
143            Some(item) => item.len(),
144            None => 0,
145        }
146    }
147
148    pub fn is_full(&self) -> bool {
149        self.size() >= self.max_pool_indexes
150    }
151
152    fn expunge_oldest(&self) {
153        if !self.is_full() {
154            debug!("Object pool is not full, nothing to remove");
155            return;
156        }
157        let mut last = String::new();
158        if let Some((obj, _)) = self.objects.read().iter().next() {
159            last = obj.clone();
160        }
161        if !last.is_empty() {
162            debug!("Removing oldest element in the queue: {}", last);
163            self.objects.write().remove(&last);
164        } else {
165            error!("Unable to find an element to remove from the queue, next allocation could fail");
166        }
167    }
168
169    fn try_pull(&self, item: &str) -> Option<Reusable<T>> {
170        match self.objects.write().get_mut(item) {
171            Some(objects) => {
172                info!("Pool for {} is currently of {} objects", item, objects.len());
173                if objects.len() > self.max_single_pool_size {
174                    objects.pop();
175                }
176                match objects.pop() {
177                    Some(object) => {
178                        if object.expired(self.expiration) {
179                            info!(
180                                "Element {} has reached expiration time of {} ms, evicting from pool",
181                                item,
182                                object.start_time.elapsed().as_millis()
183                            );
184                            None
185                        } else {
186                            info!(
187                                "Reusing element pool {} created {} ms ago",
188                                item,
189                                object.start_time.elapsed().as_millis()
190                            );
191                            Some(Reusable::new(self, item.to_string(), object.start_time, object.inner))
192                        }
193                    }
194                    None => {
195                        debug!("Element {} pool is empty", item);
196                        None
197                    }
198                }
199            }
200            None => {
201                debug!("Unable to find element {} in objects pool", item);
202                None
203            }
204        }
205    }
206
207    fn attach_time(&self, item: &str, start_time: Instant, t: T) {
208        self.expunge_oldest();
209        if self.objects.read().contains_key(item) {
210            debug!("Creating new pool of {} elements and attatching object to it", self.max_single_pool_size);
211            self.objects.write().get_mut(item).unwrap().push(Inner::new(t, start_time))
212        } else {
213            debug!(
214                "Attatching element {} to existing pool of max {} elements",
215                self.len(item) + 1,
216                self.max_single_pool_size
217            );
218            self.objects.write().insert(item.to_string(), vec![Inner::new(t, start_time)]);
219        }
220    }
221
222    pub fn pull<F: Fn() -> T>(&self, item: &str, fallback: F) -> Reusable<T> {
223        match self.try_pull(item) {
224            Some(object) => object,
225            None => {
226                info!("Creating new element {} with a pool of {} instances", item, self.max_single_pool_size);
227                for _ in 0..self.max_single_pool_size {
228                    self.attach(item, fallback())
229                }
230                self.pull(item, fallback)
231            }
232        }
233    }
234
235    pub fn attach(&self, item: &str, t: T) {
236        self.attach_time(item, Instant::now(), t);
237    }
238}
239
240pub struct Reusable<'a, T> {
241    item: String,
242    pool: &'a Pool<T>,
243    data: ManuallyDrop<T>,
244    start_time: Instant,
245}
246
247impl<'a, T> Reusable<'a, T> {
248    pub fn new(pool: &'a Pool<T>, item: String, start_time: Instant, t: T) -> Self {
249        Self { item, pool, data: ManuallyDrop::new(t), start_time }
250    }
251
252    pub fn detach(mut self) -> (&'a Pool<T>, T) {
253        let ret = unsafe { (self.pool, self.take()) };
254        info!("Detaching object from element {} pool", self.item);
255        forget(self);
256        ret
257    }
258
259    unsafe fn take(&mut self) -> T {
260        ManuallyDrop::take(&mut self.data)
261    }
262}
263
264impl<'a, T> Deref for Reusable<'a, T> {
265    type Target = T;
266
267    fn deref(&self) -> &Self::Target {
268        &self.data
269    }
270}
271
272impl<'a, T> DerefMut for Reusable<'a, T> {
273    fn deref_mut(&mut self) -> &mut Self::Target {
274        &mut self.data
275    }
276}
277
278impl<'a, T> Drop for Reusable<'a, T> {
279    fn drop(&mut self) {
280        let value = unsafe { self.take() };
281        info!("Re-attatching object to element {} object pool", self.item);
282        self.pool.attach_time(&self.item, self.start_time, value)
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use std::mem::drop;
290    use std::thread;
291
292    use pretty_assertions::assert_eq;
293
294    #[derive(Debug)]
295    struct Obj {
296        idx: usize,
297    }
298
299    impl Obj {
300        fn new(idx: usize) -> Self {
301            Self { idx }
302        }
303    }
304
305    #[test]
306    fn test_detach() {
307        let pool = Pool::default();
308        let (pool, object) = pool.pull("item1", || Obj::new(1)).detach();
309        assert_eq!(object.idx, 1);
310        assert_eq!(pool.len("item1"), 7);
311        drop(object);
312        assert_eq!(pool.len("item1"), 7);
313    }
314
315    #[test]
316    fn test_detach_then_attach() {
317        let pool = Pool::default();
318        let (pool, object) = pool.pull("item1", || Obj::new(1)).detach();
319        assert_eq!(object.idx, 1);
320        assert_eq!(pool.len("item1"), 7);
321        pool.attach("item1", object);
322        assert_eq!(pool.try_pull("item1").unwrap().idx, 1);
323        assert_eq!(pool.len("item1"), 8);
324    }
325
326    #[test]
327    fn test_pull_and_size() {
328        let pool = Pool::default();
329        pool.attach("item1", Obj::new(1));
330        assert_eq!(pool.size(), 1);
331
332        let object1 = pool.try_pull("item1");
333        let object2 = pool.try_pull("item1");
334        let object3 = pool.pull("item2", || Obj::new(2));
335        assert_eq!(pool.size(), 2);
336
337        assert_eq!(object1.is_some(), true);
338        assert_eq!(object2.is_none(), true);
339
340        assert_eq!(pool.len("item1"), 0);
341        drop(object1);
342        assert_eq!(pool.len("item1"), 1);
343        drop(object2);
344        assert_eq!(pool.len("item1"), 1);
345
346        assert_eq!(object3.idx, 2);
347        assert_eq!(pool.len("item2"), 7);
348        drop(object3);
349        assert_eq!(pool.len("item2"), 8);
350    }
351
352    #[test]
353    fn test_fill_up_pool() {
354        let pool = Pool::default();
355        for x in 0..DEFAULT_POOL_INDEXES {
356            pool.attach(&format!("item{}", x), Obj::new(x));
357            assert_eq!(pool.size(), x + 1)
358        }
359        for (_, obj) in pool.objects.read().iter() {
360            assert_eq!(obj.len(), 1);
361        }
362    }
363
364    #[test]
365    fn test_expire_pool() {
366        let pool = Pool::new(DEFAULT_POOL_INDEXES, DEFAULT_SINGLE_POOL_SIZE, Duration::from_secs(1));
367        for x in 1..7 {
368            pool.attach(&format!("item{}", x), Obj::new(x));
369        }
370        assert_eq!(pool.size(), 6);
371        thread::sleep(Duration::from_millis(1500));
372        for x in 1..7 {
373            assert_eq!(pool.try_pull(&format!("item{})", x)).is_none(), true);
374        }
375        for x in 1..7 {
376            pool.pull(&format!("item{})", x), || Obj::new(x));
377        }
378        for x in 1..7 {
379            assert_eq!(pool.try_pull(&format!("item{})", x)).is_some(), true);
380        }
381    }
382
383    #[test]
384    fn test_smoke() {
385        let pool = Pool::default();
386        for x in 0..10000 {
387            let obj = pool.pull(&format!("item{}", x), || Obj::new(x));
388            assert_eq!(obj.data.idx, x);
389            if x >= 32 {
390                assert!(pool.size() >= 31);
391            }
392        }
393    }
394}