Skip to main content

grafeo_common/memory/
pool.rs

1//! Object pool for reusing frequently allocated types.
2//!
3//! If you're creating and destroying the same type of object repeatedly
4//! (like temporary buffers during query execution), a pool avoids the
5//! allocation overhead. Objects are reset and returned to the pool instead
6//! of being freed.
7
8use std::ops::{Deref, DerefMut};
9
10use parking_lot::Mutex;
11
12/// A thread-safe object pool for reusing allocations.
13///
14/// Use [`get()`](Self::get) to grab an object (created fresh if the pool is
15/// empty). When you drop the returned [`Pooled`] wrapper, the object goes
16/// back to the pool for reuse.
17///
18/// # Examples
19///
20/// ```
21/// use grafeo_common::memory::ObjectPool;
22///
23/// // Pool of vectors that get cleared on return
24/// let pool = ObjectPool::with_reset(Vec::<u8>::new, |v| v.clear());
25///
26/// let mut buf = pool.get();
27/// buf.extend_from_slice(&[1, 2, 3]);
28/// // buf is returned to pool when dropped, and cleared
29/// ```
30pub struct ObjectPool<T> {
31    /// The pool of available objects.
32    pool: Mutex<Vec<T>>,
33    /// Factory function to create new objects.
34    factory: Box<dyn Fn() -> T + Send + Sync>,
35    /// Optional reset function called when returning objects to the pool.
36    reset: Option<Box<dyn Fn(&mut T) + Send + Sync>>,
37    /// Maximum pool size.
38    max_size: usize,
39}
40
41impl<T> ObjectPool<T> {
42    /// Creates a new object pool with the given factory function.
43    pub fn new<F>(factory: F) -> Self
44    where
45        F: Fn() -> T + Send + Sync + 'static,
46    {
47        Self {
48            pool: Mutex::new(Vec::new()),
49            factory: Box::new(factory),
50            reset: None,
51            max_size: 1024,
52        }
53    }
54
55    /// Creates a new object pool with a factory and reset function.
56    ///
57    /// The reset function is called when an object is returned to the pool,
58    /// allowing you to clear or reinitialize the object for reuse.
59    pub fn with_reset<F, R>(factory: F, reset: R) -> Self
60    where
61        F: Fn() -> T + Send + Sync + 'static,
62        R: Fn(&mut T) + Send + Sync + 'static,
63    {
64        Self {
65            pool: Mutex::new(Vec::new()),
66            factory: Box::new(factory),
67            reset: Some(Box::new(reset)),
68            max_size: 1024,
69        }
70    }
71
72    /// Sets the maximum pool size.
73    ///
74    /// Objects returned when the pool is at capacity will be dropped instead.
75    #[must_use]
76    pub fn with_max_size(mut self, max_size: usize) -> Self {
77        self.max_size = max_size;
78        self
79    }
80
81    /// Pre-populates the pool with `count` objects.
82    pub fn prefill(&self, count: usize) {
83        let mut pool = self.pool.lock();
84        let to_add = count
85            .saturating_sub(pool.len())
86            .min(self.max_size - pool.len());
87        for _ in 0..to_add {
88            pool.push((self.factory)());
89        }
90    }
91
92    /// Takes an object from the pool, creating a new one if necessary.
93    ///
94    /// Returns a `Pooled` wrapper that will return the object to the pool
95    /// when dropped.
96    pub fn get(&self) -> Pooled<'_, T> {
97        let value = self.pool.lock().pop().unwrap_or_else(|| (self.factory)());
98        Pooled {
99            pool: self,
100            value: Some(value),
101        }
102    }
103
104    /// Takes an object from the pool without wrapping it.
105    ///
106    /// The caller is responsible for returning the object via `put()` if desired.
107    pub fn take(&self) -> T {
108        self.pool.lock().pop().unwrap_or_else(|| (self.factory)())
109    }
110
111    /// Returns an object to the pool.
112    ///
113    /// If the pool is at capacity, the object is dropped instead.
114    pub fn put(&self, mut value: T) {
115        if let Some(ref reset) = self.reset {
116            reset(&mut value);
117        }
118
119        let mut pool = self.pool.lock();
120        if pool.len() < self.max_size {
121            pool.push(value);
122        }
123        // Otherwise, value is dropped
124    }
125
126    /// Returns the current number of objects in the pool.
127    #[must_use]
128    pub fn available(&self) -> usize {
129        self.pool.lock().len()
130    }
131
132    /// Returns the maximum pool size.
133    #[must_use]
134    pub fn max_size(&self) -> usize {
135        self.max_size
136    }
137
138    /// Clears all objects from the pool.
139    pub fn clear(&self) {
140        self.pool.lock().clear();
141    }
142}
143
144/// A borrowed object from the pool - returns automatically when dropped.
145///
146/// Use [`take()`](Self::take) if you need to keep the object instead of
147/// returning it to the pool.
148pub struct Pooled<'a, T> {
149    pool: &'a ObjectPool<T>,
150    value: Option<T>,
151}
152
153impl<T> Pooled<'_, T> {
154    /// Takes ownership of the inner value, preventing it from being returned to the pool.
155    ///
156    /// # Panics
157    ///
158    /// Panics if the value has already been taken from this `Pooled` handle.
159    pub fn take(mut self) -> T {
160        self.value.take().expect("Value already taken")
161    }
162}
163
164impl<T> Deref for Pooled<'_, T> {
165    type Target = T;
166
167    fn deref(&self) -> &Self::Target {
168        self.value.as_ref().expect("Value already taken")
169    }
170}
171
172impl<T> DerefMut for Pooled<'_, T> {
173    fn deref_mut(&mut self) -> &mut Self::Target {
174        self.value.as_mut().expect("Value already taken")
175    }
176}
177
178impl<T> Drop for Pooled<'_, T> {
179    fn drop(&mut self) {
180        if let Some(value) = self.value.take() {
181            self.pool.put(value);
182        }
183    }
184}
185
186/// A specialized pool for `Vec<T>` that clears vectors on return.
187pub type VecPool<T> = ObjectPool<Vec<T>>;
188
189impl<T: 'static> VecPool<T> {
190    /// Creates a new vector pool.
191    pub fn new_vec_pool() -> Self {
192        ObjectPool::with_reset(Vec::new, |v| v.clear())
193    }
194
195    /// Creates a new vector pool with pre-allocated capacity.
196    pub fn new_vec_pool_with_capacity(capacity: usize) -> Self {
197        ObjectPool::with_reset(move || Vec::with_capacity(capacity), |v| v.clear())
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    #[test]
206    fn test_pool_basic() {
207        let pool: ObjectPool<Vec<u8>> = ObjectPool::new(Vec::new);
208
209        // Get an object
210        let mut obj = pool.get();
211        obj.push(1);
212        obj.push(2);
213        assert_eq!(&*obj, &[1, 2]);
214
215        // Object should be returned to pool when dropped
216        drop(obj);
217        assert_eq!(pool.available(), 1);
218
219        // Get should return the pooled object
220        let obj2 = pool.get();
221        assert_eq!(pool.available(), 0);
222
223        // The returned object still has data (no reset function)
224        assert_eq!(&*obj2, &[1, 2]);
225    }
226
227    #[test]
228    fn test_pool_with_reset() {
229        let pool: ObjectPool<Vec<u8>> = ObjectPool::with_reset(Vec::new, Vec::clear);
230
231        let mut obj = pool.get();
232        obj.push(1);
233        obj.push(2);
234
235        drop(obj);
236
237        // Get should return a cleared object
238        let obj2 = pool.get();
239        assert!(obj2.is_empty());
240    }
241
242    #[test]
243    fn test_pool_prefill() {
244        let pool: ObjectPool<String> = ObjectPool::new(String::new);
245
246        pool.prefill(10);
247        assert_eq!(pool.available(), 10);
248
249        // Getting objects should reduce available count
250        // Note: we must keep the Pooled handle alive, otherwise it returns the object on drop
251        let _obj = pool.get();
252        assert_eq!(pool.available(), 9);
253    }
254
255    #[test]
256    fn test_pool_max_size() {
257        let pool: ObjectPool<u64> = ObjectPool::new(|| 0).with_max_size(3);
258
259        pool.prefill(10);
260        // Should only have 3 objects
261        assert_eq!(pool.available(), 3);
262
263        // Return more than max - extras should be dropped
264        let o1 = pool.take();
265        let o2 = pool.take();
266        let o3 = pool.take();
267
268        assert_eq!(pool.available(), 0);
269
270        pool.put(o1);
271        pool.put(o2);
272        pool.put(o3);
273        pool.put(99); // This one should be dropped
274
275        assert_eq!(pool.available(), 3);
276    }
277
278    #[test]
279    fn test_pool_take_ownership() {
280        let pool: ObjectPool<String> = ObjectPool::new(String::new);
281
282        let mut obj = pool.get();
283        obj.push_str("hello");
284
285        // Take ownership - should NOT return to pool
286        let owned = obj.take();
287        assert_eq!(owned, "hello");
288        assert_eq!(pool.available(), 0);
289    }
290
291    #[test]
292    fn test_pool_clear() {
293        let pool: ObjectPool<u64> = ObjectPool::new(|| 0);
294
295        pool.prefill(10);
296        assert_eq!(pool.available(), 10);
297
298        pool.clear();
299        assert_eq!(pool.available(), 0);
300    }
301
302    #[test]
303    fn test_vec_pool() {
304        let pool: VecPool<u8> = VecPool::new_vec_pool();
305
306        let mut v = pool.get();
307        v.extend_from_slice(&[1, 2, 3]);
308
309        drop(v);
310
311        let v2 = pool.get();
312        assert!(v2.is_empty()); // Should be cleared
313    }
314
315    #[test]
316    fn test_vec_pool_with_capacity() {
317        let pool: VecPool<u8> = VecPool::new_vec_pool_with_capacity(100);
318
319        let v = pool.get();
320        assert!(v.capacity() >= 100);
321    }
322
323    #[test]
324    #[cfg(not(miri))] // parking_lot uses integer-to-pointer casts incompatible with Miri strict provenance
325    fn test_pool_thread_safety() {
326        use std::sync::Arc;
327        use std::thread;
328
329        let pool: Arc<ObjectPool<Vec<u8>>> = Arc::new(ObjectPool::with_reset(Vec::new, Vec::clear));
330
331        let handles: Vec<_> = (0..4)
332            .map(|_| {
333                let pool = Arc::clone(&pool);
334                thread::spawn(move || {
335                    for _ in 0..100 {
336                        let mut v = pool.get();
337                        v.push(42);
338                        // v is automatically returned on drop
339                    }
340                })
341            })
342            .collect();
343
344        for h in handles {
345            h.join().unwrap();
346        }
347
348        // Pool should have some objects
349        assert!(pool.available() > 0);
350    }
351}