Skip to main content

nexus_pool/
local.rs

1//! Single-threaded object pools.
2//!
3//! Two variants:
4//! - [`BoundedPool`]: Fixed capacity, pre-initialized objects
5//! - [`Pool`]: Growable, creates objects on demand via factory
6//!
7//! Both use LIFO ordering for cache locality.
8
9use std::cell::UnsafeCell;
10use std::mem::{ManuallyDrop, MaybeUninit};
11use std::ops::{Deref, DerefMut};
12use std::rc::{Rc, Weak};
13
14// =============================================================================
15// Inner - shared storage for both pool types
16// =============================================================================
17
18#[repr(C)]
19struct Inner<T> {
20    /// Stack of available objects (LIFO)
21    data: UnsafeCell<Vec<T>>,
22
23    /// Reset function - called when object returns to pool
24    #[allow(clippy::type_complexity)]
25    reset: UnsafeCell<Box<dyn FnMut(&mut T)>>,
26
27    /// Factory function - only initialized for Pool, not BoundedPool
28    #[allow(clippy::type_complexity)]
29    factory: UnsafeCell<MaybeUninit<Box<dyn FnMut() -> T>>>,
30}
31
32impl<T> Inner<T> {
33    /// Create inner for BoundedPool - factory is NOT initialized
34    fn new_bounded<R>(data: Vec<T>, reset: R) -> Self
35    where
36        R: FnMut(&mut T) + 'static,
37    {
38        Self {
39            data: UnsafeCell::new(data),
40            reset: UnsafeCell::new(Box::new(reset)),
41            factory: UnsafeCell::new(MaybeUninit::uninit()),
42        }
43    }
44
45    /// Create inner for Pool - factory IS initialized
46    fn new_growable<F, R>(data: Vec<T>, factory: F, reset: R) -> Self
47    where
48        F: FnMut() -> T + 'static,
49        R: FnMut(&mut T) + 'static,
50    {
51        Self {
52            data: UnsafeCell::new(data),
53            reset: UnsafeCell::new(Box::new(reset)),
54            factory: UnsafeCell::new(MaybeUninit::new(Box::new(factory))),
55        }
56    }
57
58    /// Try to pop from available stack. Used by both pool types.
59    fn try_pop(&self) -> Option<T> {
60        // Safety: single-threaded access
61        let data = unsafe { &mut *self.data.get() };
62        data.pop()
63    }
64
65    /// Pop or create via factory.
66    ///
67    /// # Safety
68    ///
69    /// Caller must ensure factory was initialized (i.e., this is Pool, not BoundedPool)
70    #[allow(clippy::option_if_let_else)]
71    unsafe fn pop_or_create(&self) -> T {
72        unsafe {
73            let data = &mut *self.data.get();
74            if let Some(value) = data.pop() {
75                value
76            } else {
77                let factory = &mut *self.factory.get();
78                (factory.assume_init_mut())()
79            }
80        }
81    }
82
83    /// Reset and return value to available stack
84    fn return_value(&self, value: &mut T) {
85        // Safety: single-threaded access
86        let reset = unsafe { &mut *self.reset.get() };
87        reset(value);
88    }
89
90    /// Push value back to available stack
91    fn push(&self, value: T) {
92        // Safety: single-threaded access
93        let data = unsafe { &mut *self.data.get() };
94        data.push(value);
95    }
96
97    fn available(&self) -> usize {
98        // Safety: single-threaded access
99        unsafe { (*self.data.get()).len() }
100    }
101
102    fn is_empty(&self) -> bool {
103        self.available() == 0
104    }
105}
106
107// =============================================================================
108// BoundedPool - fixed capacity, pre-initialized
109// =============================================================================
110
111/// Fixed-capacity object pool with LIFO reuse.
112///
113/// All objects are pre-initialized at construction. When all objects are
114/// acquired, `try_acquire()` returns `None`.
115///
116/// # Example
117///
118/// ```
119/// use nexus_pool::local::BoundedPool;
120///
121/// let pool = BoundedPool::new(
122///     100,
123///     || Vec::<u8>::with_capacity(1024),
124///     |v| v.clear(),
125/// );
126///
127/// let mut buf = pool.try_acquire().unwrap();
128/// buf.extend_from_slice(b"hello");
129/// // buf auto-returns to pool on drop, clear() is called
130/// ```
131pub struct BoundedPool<T> {
132    inner: Rc<Inner<T>>,
133}
134
135impl<T> BoundedPool<T> {
136    /// Creates a pool with `capacity` pre-initialized objects.
137    ///
138    /// # Arguments
139    ///
140    /// * `capacity` - Number of objects to pre-allocate
141    /// * `init` - Factory function to create each object
142    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
143    ///
144    /// # Panics
145    ///
146    /// Panics if capacity is zero.
147    pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
148    where
149        I: FnMut() -> T,
150        R: FnMut(&mut T) + 'static,
151    {
152        assert!(capacity > 0, "capacity must be non-zero");
153
154        let mut data = Vec::with_capacity(capacity);
155        for _ in 0..capacity {
156            data.push(init());
157        }
158
159        Self {
160            inner: Rc::new(Inner::new_bounded(data, reset)),
161        }
162    }
163
164    /// Attempts to acquire an object from the pool.
165    ///
166    /// Returns `None` if all objects are currently in use.
167    pub fn try_acquire(&self) -> Option<Pooled<T>> {
168        self.inner.try_pop().map(|value| Pooled {
169            value: ManuallyDrop::new(value),
170            inner: Rc::downgrade(&self.inner),
171        })
172    }
173
174    /// Returns the number of available objects.
175    pub fn available(&self) -> usize {
176        self.inner.available()
177    }
178
179    /// Returns true if there are no more available objects.
180    pub fn is_empty(&self) -> bool {
181        self.inner.is_empty()
182    }
183}
184
185// =============================================================================
186// Pool - growable, creates on demand
187// =============================================================================
188
189/// Growable object pool with LIFO reuse.
190///
191/// Objects are created on demand via the factory function when the pool
192/// is empty. Use `try_acquire()` for the fast path that only returns
193/// pooled objects, or `acquire()` which may create new objects.
194///
195/// # Example
196///
197/// ```
198/// use nexus_pool::local::Pool;
199///
200/// let pool = Pool::new(
201///     || Vec::<u8>::with_capacity(1024),
202///     |v| v.clear(),
203/// );
204///
205/// let mut buf = pool.acquire(); // Creates new object
206/// buf.extend_from_slice(b"hello");
207/// drop(buf); // Returns to pool, clear() is called
208///
209/// let buf2 = pool.acquire(); // Reuses existing (now empty) object
210/// ```
211pub struct Pool<T> {
212    inner: Rc<Inner<T>>,
213}
214
215impl<T> Pool<T> {
216    /// Creates an empty pool with the given factory and reset functions.
217    ///
218    /// # Arguments
219    ///
220    /// * `factory` - Creates new objects when pool is empty
221    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
222    pub fn new<F, R>(factory: F, reset: R) -> Self
223    where
224        F: FnMut() -> T + 'static,
225        R: FnMut(&mut T) + 'static,
226    {
227        Self {
228            inner: Rc::new(Inner::new_growable(Vec::new(), factory, reset)),
229        }
230    }
231
232    /// Creates a pool pre-populated with `capacity` objects.
233    pub fn with_capacity<F, R>(capacity: usize, mut factory: F, reset: R) -> Self
234    where
235        F: FnMut() -> T + 'static,
236        R: FnMut(&mut T) + 'static,
237    {
238        let mut data = Vec::with_capacity(capacity);
239        for _ in 0..capacity {
240            data.push(factory());
241        }
242
243        Self {
244            inner: Rc::new(Inner::new_growable(data, factory, reset)),
245        }
246    }
247
248    /// Acquires an object from the pool, creating one if necessary.
249    ///
250    /// This always succeeds but may allocate if the pool is empty.
251    pub fn acquire(&self) -> Pooled<T> {
252        // Safety: Pool always initializes the factory
253        let value = unsafe { self.inner.pop_or_create() };
254        Pooled {
255            value: ManuallyDrop::new(value),
256            inner: Rc::downgrade(&self.inner),
257        }
258    }
259
260    /// Attempts to acquire an object from the pool without creating.
261    ///
262    /// Returns `None` if the pool is empty. This is the fast path.
263    pub fn try_acquire(&self) -> Option<Pooled<T>> {
264        self.inner.try_pop().map(|value| Pooled {
265            value: ManuallyDrop::new(value),
266            inner: Rc::downgrade(&self.inner),
267        })
268    }
269
270    /// Takes an object from the pool without an RAII guard, creating one
271    /// via the factory if the pool is empty.
272    ///
273    /// The caller is responsible for returning the object via [`put()`](Pool::put).
274    ///
275    /// # Example
276    ///
277    /// ```
278    /// use nexus_pool::local::Pool;
279    ///
280    /// let pool = Pool::new(
281    ///     || Vec::<u8>::with_capacity(1024),
282    ///     |v| v.clear(),
283    /// );
284    ///
285    /// let mut buf = pool.take();
286    /// buf.extend_from_slice(b"hello");
287    /// pool.put(buf); // manual return, reset is called
288    /// ```
289    pub fn take(&self) -> T {
290        // Safety: Pool always initializes the factory
291        unsafe { self.inner.pop_or_create() }
292    }
293
294    /// Takes an object from the pool if one is available, without creating.
295    ///
296    /// Returns `None` if the pool is empty. The caller is responsible for
297    /// returning the object via [`put()`](Pool::put).
298    pub fn try_take(&self) -> Option<T> {
299        self.inner.try_pop()
300    }
301
302    /// Returns an object to the pool.
303    ///
304    /// Calls the reset function, then pushes the value back onto the
305    /// available stack for reuse.
306    pub fn put(&self, mut value: T) {
307        self.inner.return_value(&mut value);
308        self.inner.push(value);
309    }
310
311    /// Returns the number of available objects.
312    pub fn available(&self) -> usize {
313        self.inner.available()
314    }
315}
316
317impl<T> Drop for Pool<T> {
318    fn drop(&mut self) {
319        // Drop the factory before Rc drops Inner
320        // Safety: Pool always initializes the factory
321        unsafe {
322            let factory = &mut *self.inner.factory.get();
323            factory.assume_init_drop();
324        }
325    }
326}
327
328// =============================================================================
329// Pooled - RAII guard
330// =============================================================================
331
332/// RAII guard that returns the object to the pool on drop.
333///
334/// The object is always returned to the pool when the guard is dropped.
335/// There is no way to "take" the object out permanently.
336pub struct Pooled<T> {
337    value: ManuallyDrop<T>,
338    inner: Weak<Inner<T>>,
339}
340
341impl<T> Deref for Pooled<T> {
342    type Target = T;
343
344    #[inline]
345    fn deref(&self) -> &T {
346        &self.value
347    }
348}
349
350impl<T> DerefMut for Pooled<T> {
351    #[inline]
352    fn deref_mut(&mut self) -> &mut T {
353        &mut self.value
354    }
355}
356
357impl<T> Drop for Pooled<T> {
358    fn drop(&mut self) {
359        if let Some(inner) = self.inner.upgrade() {
360            // Reset and return to pool
361            inner.return_value(&mut self.value);
362            // Safety: value is valid, we're moving it back to the pool
363            let value = unsafe { ManuallyDrop::take(&mut self.value) };
364            inner.push(value);
365        } else {
366            // Pool is gone, drop the value
367            // Safety: value is valid, needs to be dropped
368            unsafe { ManuallyDrop::drop(&mut self.value) };
369        }
370    }
371}
372
373// =============================================================================
374// Tests
375// =============================================================================
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use std::cell::Cell;
381    use std::rc::Rc as StdRc;
382
383    #[test]
384    fn bounded_pool_basic() {
385        let pool = BoundedPool::new(3, || Vec::<u8>::with_capacity(16), |v| v.clear());
386
387        assert_eq!(pool.available(), 3);
388
389        let mut a = pool.try_acquire().unwrap();
390        assert_eq!(pool.available(), 2);
391
392        a.extend_from_slice(b"hello");
393        assert_eq!(&*a, b"hello");
394
395        let _b = pool.try_acquire().unwrap();
396        let _c = pool.try_acquire().unwrap();
397
398        assert_eq!(pool.available(), 0);
399
400        // Pool exhausted
401        assert!(pool.try_acquire().is_none());
402
403        drop(a);
404        assert_eq!(pool.available(), 1);
405
406        // Can acquire again - and it's been cleared
407        let d = pool.try_acquire().unwrap();
408        assert!(d.is_empty()); // reset was called
409    }
410
411    #[test]
412    fn bounded_pool_reset_called() {
413        let reset_count = StdRc::new(Cell::new(0));
414        let reset_count_clone = reset_count.clone();
415
416        let pool = BoundedPool::new(
417            2,
418            || 0u32,
419            move |_| {
420                reset_count_clone.set(reset_count_clone.get() + 1);
421            },
422        );
423
424        let a = pool.try_acquire().unwrap();
425        assert_eq!(reset_count.get(), 0);
426
427        drop(a);
428        assert_eq!(reset_count.get(), 1);
429
430        let b = pool.try_acquire().unwrap();
431        let c = pool.try_acquire().unwrap();
432        drop(b);
433        drop(c);
434        assert_eq!(reset_count.get(), 3);
435    }
436
437    #[test]
438    fn bounded_pool_outlives_guard() {
439        let guard;
440        {
441            let pool = BoundedPool::new(1, || String::from("test"), |s| s.clear());
442            guard = pool.try_acquire().unwrap();
443        }
444        // Pool dropped, guard still valid
445        assert_eq!(&*guard, "test");
446        // Drop guard - value is dropped, not returned (pool is gone)
447        drop(guard);
448    }
449
450    #[test]
451    fn growable_pool_basic() {
452        let pool = Pool::new(|| Vec::<u8>::with_capacity(16), |v| v.clear());
453
454        assert_eq!(pool.available(), 0);
455
456        // acquire creates new object
457        let mut a = pool.acquire();
458        a.extend_from_slice(b"hello");
459
460        drop(a);
461        assert_eq!(pool.available(), 1);
462
463        // acquire reuses - and it's been cleared
464        let b = pool.acquire();
465        assert!(b.is_empty()); // reset was called
466        assert_eq!(pool.available(), 0);
467    }
468
469    #[test]
470    fn growable_pool_try_acquire() {
471        let pool = Pool::new(|| 42u32, |_| {});
472
473        // Empty pool, try_acquire returns None
474        assert!(pool.try_acquire().is_none());
475
476        // acquire creates
477        let a = pool.acquire();
478        drop(a);
479
480        // Now try_acquire succeeds
481        let b = pool.try_acquire().unwrap();
482        assert_eq!(*b, 42);
483    }
484
485    #[test]
486    fn growable_pool_with_capacity() {
487        let pool = Pool::with_capacity(5, || String::new(), |s| s.clear());
488
489        assert_eq!(pool.available(), 5);
490
491        let _a = pool.try_acquire().unwrap();
492        let _b = pool.try_acquire().unwrap();
493        assert_eq!(pool.available(), 3);
494    }
495
496    #[test]
497    fn growable_pool_outlives_guard() {
498        let guard;
499        {
500            let pool = Pool::new(|| String::from("test"), |s| s.clear());
501            guard = pool.acquire();
502        }
503        // Pool dropped, guard still valid
504        assert_eq!(&*guard, "test");
505        drop(guard);
506    }
507
508    #[test]
509    #[should_panic(expected = "capacity must be non-zero")]
510    fn bounded_pool_zero_capacity_panics() {
511        let _ = BoundedPool::new(0, || (), |_| {});
512    }
513
514    #[test]
515    fn take_put_basic() {
516        let pool = Pool::new(|| Vec::<u8>::with_capacity(16), |v| v.clear());
517
518        let mut buf = pool.take();
519        buf.extend_from_slice(b"hello");
520        assert_eq!(&buf, b"hello");
521
522        pool.put(buf);
523        assert_eq!(pool.available(), 1);
524
525        let reused = pool.take();
526        assert!(reused.is_empty()); // reset was called
527    }
528
529    #[test]
530    fn try_take_empty_returns_none() {
531        let pool = Pool::new(|| 0u32, |_| {});
532
533        assert!(pool.try_take().is_none());
534
535        let v = pool.take(); // creates via factory
536        pool.put(v);
537
538        assert!(pool.try_take().is_some());
539    }
540
541    #[test]
542    fn take_put_reset_called() {
543        let reset_count = StdRc::new(Cell::new(0));
544        let rc = reset_count.clone();
545
546        let pool = Pool::new(
547            || 0u32,
548            move |_| {
549                rc.set(rc.get() + 1);
550            },
551        );
552
553        let v = pool.take();
554        assert_eq!(reset_count.get(), 0);
555
556        pool.put(v);
557        assert_eq!(reset_count.get(), 1);
558
559        let v = pool.take();
560        pool.put(v);
561        assert_eq!(reset_count.get(), 2);
562    }
563
564    #[test]
565    fn take_put_with_capacity() {
566        let pool = Pool::with_capacity(5, || String::from("init"), |s| s.clear());
567        assert_eq!(pool.available(), 5);
568
569        let s = pool.try_take().unwrap();
570        assert_eq!(s, "init");
571        assert_eq!(pool.available(), 4);
572
573        pool.put(s);
574        assert_eq!(pool.available(), 5);
575    }
576
577    #[test]
578    fn mix_raii_and_manual() {
579        let pool = Pool::with_capacity(3, || Vec::<u8>::new(), |v| v.clear());
580
581        // Take one manually
582        let mut manual = pool.take();
583        manual.push(1);
584
585        // Acquire one via RAII
586        let mut guard = pool.acquire();
587        guard.push(2);
588
589        assert_eq!(pool.available(), 1);
590
591        // Return manual
592        pool.put(manual);
593        assert_eq!(pool.available(), 2);
594
595        // Drop guard
596        drop(guard);
597        assert_eq!(pool.available(), 3);
598    }
599}