Skip to main content

nexus_pool/
local.rs

1//! Single-threaded object pools.
2//!
3//! Two variants:
4//! - [`BoundedPool`]: Fixed capacity, pre-initialized objects
5//! - [`Pool`]: Growable, creates objects on demand via factory
6//!
7//! Both use LIFO ordering for cache locality.
8
9use std::cell::UnsafeCell;
10use std::mem::{ManuallyDrop, MaybeUninit};
11use std::ops::{Deref, DerefMut};
12use std::rc::{Rc, Weak};
13
14// =============================================================================
15// Inner - shared storage for both pool types
16// =============================================================================
17
18#[repr(C)]
19struct Inner<T> {
20    /// Stack of available objects (LIFO)
21    data: UnsafeCell<Vec<T>>,
22
23    /// Reset function - called when object returns to pool
24    #[allow(clippy::type_complexity)]
25    reset: UnsafeCell<Box<dyn FnMut(&mut T)>>,
26
27    /// Factory function - only initialized for Pool, not BoundedPool
28    #[allow(clippy::type_complexity)]
29    factory: UnsafeCell<MaybeUninit<Box<dyn FnMut() -> T>>>,
30}
31
32impl<T> Inner<T> {
33    /// Create inner for BoundedPool - factory is NOT initialized
34    fn new_bounded<R>(data: Vec<T>, reset: R) -> Self
35    where
36        R: FnMut(&mut T) + 'static,
37    {
38        Self {
39            data: UnsafeCell::new(data),
40            reset: UnsafeCell::new(Box::new(reset)),
41            factory: UnsafeCell::new(MaybeUninit::uninit()),
42        }
43    }
44
45    /// Create inner for Pool - factory IS initialized
46    fn new_growable<F, R>(data: Vec<T>, factory: F, reset: R) -> Self
47    where
48        F: FnMut() -> T + 'static,
49        R: FnMut(&mut T) + 'static,
50    {
51        Self {
52            data: UnsafeCell::new(data),
53            reset: UnsafeCell::new(Box::new(reset)),
54            factory: UnsafeCell::new(MaybeUninit::new(Box::new(factory))),
55        }
56    }
57
58    /// Try to pop from available stack. Used by both pool types.
59    fn try_pop(&self) -> Option<T> {
60        // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool
61        // (both use Rc<Inner<T>> which is !Send + !Sync). No concurrent mutation.
62        let data = unsafe { &mut *self.data.get() };
63        data.pop()
64    }
65
66    /// Pop or create via factory.
67    ///
68    /// # Safety
69    ///
70    /// Caller must ensure factory was initialized (i.e., this is Pool, not BoundedPool)
71    #[allow(clippy::option_if_let_else)]
72    unsafe fn pop_or_create(&self) -> T {
73        // SAFETY: Single-threaded access enforced by !Sync on Pool (Rc-based).
74        // Caller guarantees factory is initialized (only called from Pool, not BoundedPool).
75        // assume_init_mut is sound because new_growable writes MaybeUninit::new(factory).
76        unsafe {
77            let data = &mut *self.data.get();
78            if let Some(value) = data.pop() {
79                value
80            } else {
81                let factory = &mut *self.factory.get();
82                (factory.assume_init_mut())()
83            }
84        }
85    }
86
87    /// Reset and return value to available stack
88    fn return_value(&self, value: &mut T) {
89        // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
90        // No concurrent mutation of the reset closure.
91        let reset = unsafe { &mut *self.reset.get() };
92        reset(value);
93    }
94
95    /// Push value back to available stack
96    fn push(&self, value: T) {
97        // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
98        // No concurrent mutation of the data vec.
99        let data = unsafe { &mut *self.data.get() };
100        data.push(value);
101    }
102
103    fn available(&self) -> usize {
104        // SAFETY: Single-threaded access enforced by !Sync (Rc-based). Reading len
105        // while no concurrent mutation is possible.
106        unsafe { (*self.data.get()).len() }
107    }
108
109    fn is_empty(&self) -> bool {
110        self.available() == 0
111    }
112}
113
114// =============================================================================
115// BoundedPool - fixed capacity, pre-initialized
116// =============================================================================
117
118/// Fixed-capacity object pool with LIFO reuse.
119///
120/// All objects are pre-initialized at construction. When all objects are
121/// acquired, `try_acquire()` returns `None`.
122///
123/// # Example
124///
125/// ```
126/// use nexus_pool::local::BoundedPool;
127///
128/// let pool = BoundedPool::new(
129///     100,
130///     || Vec::<u8>::with_capacity(1024),
131///     |v| v.clear(),
132/// );
133///
134/// let mut buf = pool.try_acquire().unwrap();
135/// buf.extend_from_slice(b"hello");
136/// // buf auto-returns to pool on drop, clear() is called
137/// ```
138pub struct BoundedPool<T> {
139    inner: Rc<Inner<T>>,
140}
141
142impl<T> BoundedPool<T> {
143    /// Creates a pool with `capacity` pre-initialized objects.
144    ///
145    /// # Arguments
146    ///
147    /// * `capacity` - Number of objects to pre-allocate
148    /// * `init` - Factory function to create each object
149    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
150    ///
151    /// # Panics
152    ///
153    /// Panics if capacity is zero.
154    pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
155    where
156        I: FnMut() -> T,
157        R: FnMut(&mut T) + 'static,
158    {
159        assert!(capacity > 0, "capacity must be non-zero");
160
161        let mut data = Vec::with_capacity(capacity);
162        for _ in 0..capacity {
163            data.push(init());
164        }
165
166        Self {
167            inner: Rc::new(Inner::new_bounded(data, reset)),
168        }
169    }
170
171    /// Attempts to acquire an object from the pool.
172    ///
173    /// Returns `None` if all objects are currently in use.
174    pub fn try_acquire(&self) -> Option<Pooled<T>> {
175        self.inner.try_pop().map(|value| Pooled {
176            value: ManuallyDrop::new(value),
177            inner: Rc::downgrade(&self.inner),
178        })
179    }
180
181    /// Returns the number of available objects.
182    pub fn available(&self) -> usize {
183        self.inner.available()
184    }
185
186    /// Returns true if there are no more available objects.
187    pub fn is_empty(&self) -> bool {
188        self.inner.is_empty()
189    }
190}
191
192// =============================================================================
193// Pool - growable, creates on demand
194// =============================================================================
195
196/// Growable object pool with LIFO reuse.
197///
198/// Objects are created on demand via the factory function when the pool
199/// is empty. Use `try_acquire()` for the fast path that only returns
200/// pooled objects, or `acquire()` which may create new objects.
201///
202/// # Example
203///
204/// ```
205/// use nexus_pool::local::Pool;
206///
207/// let pool = Pool::new(
208///     || Vec::<u8>::with_capacity(1024),
209///     |v| v.clear(),
210/// );
211///
212/// let mut buf = pool.acquire(); // Creates new object
213/// buf.extend_from_slice(b"hello");
214/// drop(buf); // Returns to pool, clear() is called
215///
216/// let buf2 = pool.acquire(); // Reuses existing (now empty) object
217/// ```
218pub struct Pool<T> {
219    inner: Rc<Inner<T>>,
220}
221
222impl<T> Pool<T> {
223    /// Creates an empty pool with the given factory and reset functions.
224    ///
225    /// # Arguments
226    ///
227    /// * `factory` - Creates new objects when pool is empty
228    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
229    pub fn new<F, R>(factory: F, reset: R) -> Self
230    where
231        F: FnMut() -> T + 'static,
232        R: FnMut(&mut T) + 'static,
233    {
234        Self {
235            inner: Rc::new(Inner::new_growable(Vec::new(), factory, reset)),
236        }
237    }
238
239    /// Creates a pool pre-populated with `capacity` objects.
240    pub fn with_capacity<F, R>(capacity: usize, mut factory: F, reset: R) -> Self
241    where
242        F: FnMut() -> T + 'static,
243        R: FnMut(&mut T) + 'static,
244    {
245        let mut data = Vec::with_capacity(capacity);
246        for _ in 0..capacity {
247            data.push(factory());
248        }
249
250        Self {
251            inner: Rc::new(Inner::new_growable(data, factory, reset)),
252        }
253    }
254
255    /// Acquires an object from the pool, creating one if necessary.
256    ///
257    /// This always succeeds but may allocate if the pool is empty.
258    pub fn acquire(&self) -> Pooled<T> {
259        // SAFETY: Pool::new/with_capacity always calls new_growable, which
260        // initializes the factory via MaybeUninit::new. pop_or_create's
261        // precondition (factory initialized) is satisfied.
262        let value = unsafe { self.inner.pop_or_create() };
263        Pooled {
264            value: ManuallyDrop::new(value),
265            inner: Rc::downgrade(&self.inner),
266        }
267    }
268
269    /// Attempts to acquire an object from the pool without creating.
270    ///
271    /// Returns `None` if the pool is empty. This is the fast path.
272    pub fn try_acquire(&self) -> Option<Pooled<T>> {
273        self.inner.try_pop().map(|value| Pooled {
274            value: ManuallyDrop::new(value),
275            inner: Rc::downgrade(&self.inner),
276        })
277    }
278
279    /// Takes an object from the pool without an RAII guard, creating one
280    /// via the factory if the pool is empty.
281    ///
282    /// The caller is responsible for returning the object via [`put()`](Pool::put).
283    ///
284    /// # Example
285    ///
286    /// ```
287    /// use nexus_pool::local::Pool;
288    ///
289    /// let pool = Pool::new(
290    ///     || Vec::<u8>::with_capacity(1024),
291    ///     |v| v.clear(),
292    /// );
293    ///
294    /// let mut buf = pool.take();
295    /// buf.extend_from_slice(b"hello");
296    /// pool.put(buf); // manual return, reset is called
297    /// ```
298    pub fn take(&self) -> T {
299        // SAFETY: Pool::new/with_capacity always calls new_growable, which
300        // initializes the factory via MaybeUninit::new. pop_or_create's
301        // precondition (factory initialized) is satisfied.
302        unsafe { self.inner.pop_or_create() }
303    }
304
305    /// Takes an object from the pool if one is available, without creating.
306    ///
307    /// Returns `None` if the pool is empty. The caller is responsible for
308    /// returning the object via [`put()`](Pool::put).
309    pub fn try_take(&self) -> Option<T> {
310        self.inner.try_pop()
311    }
312
313    /// Returns an object to the pool.
314    ///
315    /// Calls the reset function, then pushes the value back onto the
316    /// available stack for reuse.
317    ///
318    /// # Panics
319    ///
320    /// If the reset closure panics, the value is leaked and the pool slot
321    /// is not returned. The panic propagates normally. Reset closures must
322    /// not panic — use simple operations like `Vec::clear()` or field resets.
323    pub fn put(&self, mut value: T) {
324        self.inner.return_value(&mut value);
325        self.inner.push(value);
326    }
327
328    /// Returns the number of available objects.
329    pub fn available(&self) -> usize {
330        self.inner.available()
331    }
332}
333
334impl<T> Drop for Pool<T> {
335    fn drop(&mut self) {
336        // SAFETY: Pool::new/with_capacity always calls new_growable, which
337        // initializes the factory via MaybeUninit::new. We must drop it here
338        // before Rc drops Inner, because Inner's Drop doesn't know whether
339        // factory was initialized (BoundedPool leaves it uninit).
340        unsafe {
341            let factory = &mut *self.inner.factory.get();
342            factory.assume_init_drop();
343        }
344    }
345}
346
347// =============================================================================
348// Pooled - RAII guard
349// =============================================================================
350
351/// RAII guard that returns the object to the pool on drop.
352///
353/// The object is always returned to the pool when the guard is dropped.
354/// There is no way to "take" the object out permanently.
355pub struct Pooled<T> {
356    value: ManuallyDrop<T>,
357    inner: Weak<Inner<T>>,
358}
359
360impl<T> Deref for Pooled<T> {
361    type Target = T;
362
363    #[inline]
364    fn deref(&self) -> &T {
365        &self.value
366    }
367}
368
369impl<T> DerefMut for Pooled<T> {
370    #[inline]
371    fn deref_mut(&mut self) -> &mut T {
372        &mut self.value
373    }
374}
375
376impl<T> Drop for Pooled<T> {
377    fn drop(&mut self) {
378        if let Some(inner) = self.inner.upgrade() {
379            // Reset and return to pool
380            inner.return_value(&mut self.value);
381            // SAFETY: value is valid (ManuallyDrop preserves it until explicit take/drop).
382            // After take, self.value is consumed and we never touch it again.
383            let value = unsafe { ManuallyDrop::take(&mut self.value) };
384            inner.push(value);
385        } else {
386            // SAFETY: Pool is gone. Value is valid (ManuallyDrop preserves it) and must
387            // be dropped to avoid a leak. After drop, we never touch self.value again.
388            unsafe { ManuallyDrop::drop(&mut self.value) };
389        }
390    }
391}
392
393// =============================================================================
394// Tests
395// =============================================================================
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use std::cell::Cell;
401    use std::rc::Rc as StdRc;
402
403    #[test]
404    fn bounded_pool_basic() {
405        let pool = BoundedPool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
406
407        assert_eq!(pool.available(), 3);
408
409        let mut a = pool.try_acquire().unwrap();
410        assert_eq!(pool.available(), 2);
411
412        a.extend_from_slice(b"hello");
413        assert_eq!(&*a, b"hello");
414
415        let _b = pool.try_acquire().unwrap();
416        let _c = pool.try_acquire().unwrap();
417
418        assert_eq!(pool.available(), 0);
419
420        // Pool exhausted
421        assert!(pool.try_acquire().is_none());
422
423        drop(a);
424        assert_eq!(pool.available(), 1);
425
426        // Can acquire again - and it's been cleared
427        let d = pool.try_acquire().unwrap();
428        assert!(d.is_empty()); // reset was called
429    }
430
431    #[test]
432    fn bounded_pool_reset_called() {
433        let reset_count = StdRc::new(Cell::new(0));
434        let reset_count_clone = reset_count.clone();
435
436        let pool = BoundedPool::new(
437            2,
438            || 0u32,
439            move |_| {
440                reset_count_clone.set(reset_count_clone.get() + 1);
441            },
442        );
443
444        let a = pool.try_acquire().unwrap();
445        assert_eq!(reset_count.get(), 0);
446
447        drop(a);
448        assert_eq!(reset_count.get(), 1);
449
450        let b = pool.try_acquire().unwrap();
451        let c = pool.try_acquire().unwrap();
452        drop(b);
453        drop(c);
454        assert_eq!(reset_count.get(), 3);
455    }
456
457    #[test]
458    fn bounded_pool_outlives_guard() {
459        let guard;
460        {
461            let pool = BoundedPool::new(1, || String::from("test"), String::clear);
462            guard = pool.try_acquire().unwrap();
463        }
464        // Pool dropped, guard still valid
465        assert_eq!(&*guard, "test");
466        // Drop guard - value is dropped, not returned (pool is gone)
467        drop(guard);
468    }
469
470    #[test]
471    fn growable_pool_basic() {
472        let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
473
474        assert_eq!(pool.available(), 0);
475
476        // acquire creates new object
477        let mut a = pool.acquire();
478        a.extend_from_slice(b"hello");
479
480        drop(a);
481        assert_eq!(pool.available(), 1);
482
483        // acquire reuses - and it's been cleared
484        let b = pool.acquire();
485        assert!(b.is_empty()); // reset was called
486        assert_eq!(pool.available(), 0);
487    }
488
489    #[test]
490    fn growable_pool_try_acquire() {
491        let pool = Pool::new(|| 42u32, |_| {});
492
493        // Empty pool, try_acquire returns None
494        assert!(pool.try_acquire().is_none());
495
496        // acquire creates
497        let a = pool.acquire();
498        drop(a);
499
500        // Now try_acquire succeeds
501        let b = pool.try_acquire().unwrap();
502        assert_eq!(*b, 42);
503    }
504
505    #[test]
506    fn growable_pool_with_capacity() {
507        let pool = Pool::with_capacity(5, String::new, String::clear);
508
509        assert_eq!(pool.available(), 5);
510
511        let _a = pool.try_acquire().unwrap();
512        let _b = pool.try_acquire().unwrap();
513        assert_eq!(pool.available(), 3);
514    }
515
516    #[test]
517    fn growable_pool_outlives_guard() {
518        let guard;
519        {
520            let pool = Pool::new(|| String::from("test"), String::clear);
521            guard = pool.acquire();
522        }
523        // Pool dropped, guard still valid
524        assert_eq!(&*guard, "test");
525        drop(guard);
526    }
527
528    #[test]
529    #[should_panic(expected = "capacity must be non-zero")]
530    fn bounded_pool_zero_capacity_panics() {
531        let _ = BoundedPool::new(0, || (), |()| {});
532    }
533
534    #[test]
535    fn take_put_basic() {
536        let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
537
538        let mut buf = pool.take();
539        buf.extend_from_slice(b"hello");
540        assert_eq!(&buf, b"hello");
541
542        pool.put(buf);
543        assert_eq!(pool.available(), 1);
544
545        let reused = pool.take();
546        assert!(reused.is_empty()); // reset was called
547    }
548
549    #[test]
550    fn try_take_empty_returns_none() {
551        let pool = Pool::new(|| 0u32, |_| {});
552
553        assert!(pool.try_take().is_none());
554
555        let v = pool.take(); // creates via factory
556        pool.put(v);
557
558        assert!(pool.try_take().is_some());
559    }
560
561    #[test]
562    fn take_put_reset_called() {
563        let reset_count = StdRc::new(Cell::new(0));
564        let rc = reset_count.clone();
565
566        let pool = Pool::new(
567            || 0u32,
568            move |_| {
569                rc.set(rc.get() + 1);
570            },
571        );
572
573        let v = pool.take();
574        assert_eq!(reset_count.get(), 0);
575
576        pool.put(v);
577        assert_eq!(reset_count.get(), 1);
578
579        let v = pool.take();
580        pool.put(v);
581        assert_eq!(reset_count.get(), 2);
582    }
583
584    #[test]
585    fn take_put_with_capacity() {
586        let pool = Pool::with_capacity(5, || String::from("init"), String::clear);
587        assert_eq!(pool.available(), 5);
588
589        let s = pool.try_take().unwrap();
590        assert_eq!(s, "init");
591        assert_eq!(pool.available(), 4);
592
593        pool.put(s);
594        assert_eq!(pool.available(), 5);
595    }
596
597    #[test]
598    fn mix_raii_and_manual() {
599        let pool = Pool::with_capacity(3, Vec::<u8>::new, Vec::clear);
600
601        // Take one manually
602        let mut manual = pool.take();
603        manual.push(1);
604
605        // Acquire one via RAII
606        let mut guard = pool.acquire();
607        guard.push(2);
608
609        assert_eq!(pool.available(), 1);
610
611        // Return manual
612        pool.put(manual);
613        assert_eq!(pool.available(), 2);
614
615        // Drop guard
616        drop(guard);
617        assert_eq!(pool.available(), 3);
618    }
619}