Skip to main content

graphos_common/memory/
pool.rs

1//! Object pool for frequently allocated types.
2//!
3//! Object pools reduce allocation overhead by reusing previously allocated
4//! objects instead of freeing and reallocating them.
5
6use parking_lot::Mutex;
7use std::ops::{Deref, DerefMut};
8
9/// A thread-safe object pool.
10///
11/// Maintains a pool of reusable objects of type T. When an object is needed,
12/// it's taken from the pool (or created if the pool is empty). When the object
13/// is dropped, it's returned to the pool for reuse.
14pub struct ObjectPool<T> {
15    /// The pool of available objects.
16    pool: Mutex<Vec<T>>,
17    /// Factory function to create new objects.
18    factory: Box<dyn Fn() -> T + Send + Sync>,
19    /// Optional reset function called when returning objects to the pool.
20    reset: Option<Box<dyn Fn(&mut T) + Send + Sync>>,
21    /// Maximum pool size.
22    max_size: usize,
23}
24
25impl<T> ObjectPool<T> {
26    /// Creates a new object pool with the given factory function.
27    pub fn new<F>(factory: F) -> Self
28    where
29        F: Fn() -> T + Send + Sync + 'static,
30    {
31        Self {
32            pool: Mutex::new(Vec::new()),
33            factory: Box::new(factory),
34            reset: None,
35            max_size: 1024,
36        }
37    }
38
39    /// Creates a new object pool with a factory and reset function.
40    ///
41    /// The reset function is called when an object is returned to the pool,
42    /// allowing you to clear or reinitialize the object for reuse.
43    pub fn with_reset<F, R>(factory: F, reset: R) -> Self
44    where
45        F: Fn() -> T + Send + Sync + 'static,
46        R: Fn(&mut T) + Send + Sync + 'static,
47    {
48        Self {
49            pool: Mutex::new(Vec::new()),
50            factory: Box::new(factory),
51            reset: Some(Box::new(reset)),
52            max_size: 1024,
53        }
54    }
55
56    /// Sets the maximum pool size.
57    ///
58    /// Objects returned when the pool is at capacity will be dropped instead.
59    #[must_use]
60    pub fn with_max_size(mut self, max_size: usize) -> Self {
61        self.max_size = max_size;
62        self
63    }
64
65    /// Pre-populates the pool with `count` objects.
66    pub fn prefill(&self, count: usize) {
67        let mut pool = self.pool.lock();
68        let to_add = count.saturating_sub(pool.len()).min(self.max_size - pool.len());
69        for _ in 0..to_add {
70            pool.push((self.factory)());
71        }
72    }
73
74    /// Takes an object from the pool, creating a new one if necessary.
75    ///
76    /// Returns a `Pooled` wrapper that will return the object to the pool
77    /// when dropped.
78    pub fn get(&self) -> Pooled<'_, T> {
79        let value = self.pool.lock().pop().unwrap_or_else(|| (self.factory)());
80        Pooled { pool: self, value: Some(value) }
81    }
82
83    /// Takes an object from the pool without wrapping it.
84    ///
85    /// The caller is responsible for returning the object via `put()` if desired.
86    pub fn take(&self) -> T {
87        self.pool.lock().pop().unwrap_or_else(|| (self.factory)())
88    }
89
90    /// Returns an object to the pool.
91    ///
92    /// If the pool is at capacity, the object is dropped instead.
93    pub fn put(&self, mut value: T) {
94        if let Some(ref reset) = self.reset {
95            reset(&mut value);
96        }
97
98        let mut pool = self.pool.lock();
99        if pool.len() < self.max_size {
100            pool.push(value);
101        }
102        // Otherwise, value is dropped
103    }
104
105    /// Returns the current number of objects in the pool.
106    #[must_use]
107    pub fn available(&self) -> usize {
108        self.pool.lock().len()
109    }
110
111    /// Returns the maximum pool size.
112    #[must_use]
113    pub fn max_size(&self) -> usize {
114        self.max_size
115    }
116
117    /// Clears all objects from the pool.
118    pub fn clear(&self) {
119        self.pool.lock().clear();
120    }
121}
122
123/// A wrapper around a pooled object that returns it to the pool when dropped.
124pub struct Pooled<'a, T> {
125    pool: &'a ObjectPool<T>,
126    value: Option<T>,
127}
128
129impl<T> Pooled<'_, T> {
130    /// Takes ownership of the inner value, preventing it from being returned to the pool.
131    pub fn take(mut self) -> T {
132        self.value.take().expect("Value already taken")
133    }
134}
135
136impl<T> Deref for Pooled<'_, T> {
137    type Target = T;
138
139    fn deref(&self) -> &Self::Target {
140        self.value.as_ref().expect("Value already taken")
141    }
142}
143
144impl<T> DerefMut for Pooled<'_, T> {
145    fn deref_mut(&mut self) -> &mut Self::Target {
146        self.value.as_mut().expect("Value already taken")
147    }
148}
149
150impl<T> Drop for Pooled<'_, T> {
151    fn drop(&mut self) {
152        if let Some(value) = self.value.take() {
153            self.pool.put(value);
154        }
155    }
156}
157
158/// A specialized pool for `Vec<T>` that clears vectors on return.
159pub type VecPool<T> = ObjectPool<Vec<T>>;
160
161impl<T: 'static> VecPool<T> {
162    /// Creates a new vector pool.
163    pub fn new_vec_pool() -> Self {
164        ObjectPool::with_reset(Vec::new, |v| v.clear())
165    }
166
167    /// Creates a new vector pool with pre-allocated capacity.
168    pub fn new_vec_pool_with_capacity(capacity: usize) -> Self {
169        ObjectPool::with_reset(move || Vec::with_capacity(capacity), |v| v.clear())
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn test_pool_basic() {
179        let pool: ObjectPool<Vec<u8>> = ObjectPool::new(Vec::new);
180
181        // Get an object
182        let mut obj = pool.get();
183        obj.push(1);
184        obj.push(2);
185        assert_eq!(&*obj, &[1, 2]);
186
187        // Object should be returned to pool when dropped
188        drop(obj);
189        assert_eq!(pool.available(), 1);
190
191        // Get should return the pooled object
192        let obj2 = pool.get();
193        assert_eq!(pool.available(), 0);
194
195        // The returned object still has data (no reset function)
196        assert_eq!(&*obj2, &[1, 2]);
197    }
198
199    #[test]
200    fn test_pool_with_reset() {
201        let pool: ObjectPool<Vec<u8>> = ObjectPool::with_reset(Vec::new, |v| v.clear());
202
203        let mut obj = pool.get();
204        obj.push(1);
205        obj.push(2);
206
207        drop(obj);
208
209        // Get should return a cleared object
210        let obj2 = pool.get();
211        assert!(obj2.is_empty());
212    }
213
214    #[test]
215    fn test_pool_prefill() {
216        let pool: ObjectPool<String> = ObjectPool::new(String::new);
217
218        pool.prefill(10);
219        assert_eq!(pool.available(), 10);
220
221        // Getting objects should reduce available count
222        // Note: we must keep the Pooled handle alive, otherwise it returns the object on drop
223        let _obj = pool.get();
224        assert_eq!(pool.available(), 9);
225    }
226
227    #[test]
228    fn test_pool_max_size() {
229        let pool: ObjectPool<u64> = ObjectPool::new(|| 0).with_max_size(3);
230
231        pool.prefill(10);
232        // Should only have 3 objects
233        assert_eq!(pool.available(), 3);
234
235        // Return more than max - extras should be dropped
236        let o1 = pool.take();
237        let o2 = pool.take();
238        let o3 = pool.take();
239
240        assert_eq!(pool.available(), 0);
241
242        pool.put(o1);
243        pool.put(o2);
244        pool.put(o3);
245        pool.put(99); // This one should be dropped
246
247        assert_eq!(pool.available(), 3);
248    }
249
250    #[test]
251    fn test_pool_take_ownership() {
252        let pool: ObjectPool<String> = ObjectPool::new(String::new);
253
254        let mut obj = pool.get();
255        obj.push_str("hello");
256
257        // Take ownership - should NOT return to pool
258        let owned = obj.take();
259        assert_eq!(owned, "hello");
260        assert_eq!(pool.available(), 0);
261    }
262
263    #[test]
264    fn test_pool_clear() {
265        let pool: ObjectPool<u64> = ObjectPool::new(|| 0);
266
267        pool.prefill(10);
268        assert_eq!(pool.available(), 10);
269
270        pool.clear();
271        assert_eq!(pool.available(), 0);
272    }
273
274    #[test]
275    fn test_vec_pool() {
276        let pool: VecPool<u8> = VecPool::new_vec_pool();
277
278        let mut v = pool.get();
279        v.extend_from_slice(&[1, 2, 3]);
280
281        drop(v);
282
283        let v2 = pool.get();
284        assert!(v2.is_empty()); // Should be cleared
285    }
286
287    #[test]
288    fn test_vec_pool_with_capacity() {
289        let pool: VecPool<u8> = VecPool::new_vec_pool_with_capacity(100);
290
291        let v = pool.get();
292        assert!(v.capacity() >= 100);
293    }
294
295    #[test]
296    fn test_pool_thread_safety() {
297        use std::sync::Arc;
298        use std::thread;
299
300        let pool: Arc<ObjectPool<Vec<u8>>> =
301            Arc::new(ObjectPool::with_reset(Vec::new, |v| v.clear()));
302
303        let handles: Vec<_> = (0..4)
304            .map(|_| {
305                let pool = Arc::clone(&pool);
306                thread::spawn(move || {
307                    for _ in 0..100 {
308                        let mut v = pool.get();
309                        v.push(42);
310                        // v is automatically returned on drop
311                    }
312                })
313            })
314            .collect();
315
316        for h in handles {
317            h.join().unwrap();
318        }
319
320        // Pool should have some objects
321        assert!(pool.available() > 0);
322    }
323}