Skip to main content

nexus_pool/
local.rs

1//! Single-threaded object pools.
2//!
3//! Two variants:
4//! - [`BoundedPool`]: Fixed capacity, pre-initialized objects
5//! - [`Pool`]: Growable, creates objects on demand via factory
6//!
7//! Both use LIFO ordering for cache locality.
8
9use std::cell::UnsafeCell;
10use std::mem::{ManuallyDrop, MaybeUninit};
11use std::ops::{Deref, DerefMut};
12use std::rc::Rc;
13
14// =============================================================================
15// Inner - shared storage for both pool types
16// =============================================================================
17
18#[repr(C)]
19struct Inner<T> {
20    /// Stack of available objects (LIFO)
21    data: UnsafeCell<Vec<T>>,
22
23    /// Reset function - called when object returns to pool
24    #[allow(clippy::type_complexity)]
25    reset: UnsafeCell<Box<dyn FnMut(&mut T)>>,
26
27    /// Factory function - only initialized for Pool, not BoundedPool
28    #[allow(clippy::type_complexity)]
29    factory: UnsafeCell<MaybeUninit<Box<dyn FnMut() -> T>>>,
30}
31
32impl<T> Inner<T> {
33    /// Create inner for BoundedPool - factory is NOT initialized
34    fn new_bounded<R>(data: Vec<T>, reset: R) -> Self
35    where
36        R: FnMut(&mut T) + 'static,
37    {
38        Self {
39            data: UnsafeCell::new(data),
40            reset: UnsafeCell::new(Box::new(reset)),
41            factory: UnsafeCell::new(MaybeUninit::uninit()),
42        }
43    }
44
45    /// Create inner for Pool - factory IS initialized
46    fn new_growable<F, R>(data: Vec<T>, factory: F, reset: R) -> Self
47    where
48        F: FnMut() -> T + 'static,
49        R: FnMut(&mut T) + 'static,
50    {
51        Self {
52            data: UnsafeCell::new(data),
53            reset: UnsafeCell::new(Box::new(reset)),
54            factory: UnsafeCell::new(MaybeUninit::new(Box::new(factory))),
55        }
56    }
57
58    /// Try to pop from available stack. Used by both pool types.
59    fn try_pop(&self) -> Option<T> {
60        // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool
61        // (both use Rc<Inner<T>> which is !Send + !Sync). No concurrent mutation.
62        let data = unsafe { &mut *self.data.get() };
63        data.pop()
64    }
65
66    /// Pop or create via factory.
67    ///
68    /// # Safety
69    ///
70    /// Caller must ensure factory was initialized (i.e., this is Pool, not BoundedPool)
71    #[allow(clippy::option_if_let_else)]
72    unsafe fn pop_or_create(&self) -> T {
73        // SAFETY: Single-threaded access enforced by !Sync on Pool (Rc-based).
74        // Caller guarantees factory is initialized (only called from Pool, not BoundedPool).
75        // assume_init_mut is sound because new_growable writes MaybeUninit::new(factory).
76        unsafe {
77            let data = &mut *self.data.get();
78            if let Some(value) = data.pop() {
79                value
80            } else {
81                let factory = &mut *self.factory.get();
82                (factory.assume_init_mut())()
83            }
84        }
85    }
86
87    /// Reset and return value to available stack
88    fn return_value(&self, value: &mut T) {
89        // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
90        // No concurrent mutation of the reset closure.
91        let reset = unsafe { &mut *self.reset.get() };
92        reset(value);
93    }
94
95    /// Push value back to available stack
96    fn push(&self, value: T) {
97        // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
98        // No concurrent mutation of the data vec.
99        let data = unsafe { &mut *self.data.get() };
100        data.push(value);
101    }
102
103    fn available(&self) -> usize {
104        // SAFETY: Single-threaded access enforced by !Sync (Rc-based). Reading len
105        // while no concurrent mutation is possible.
106        unsafe { (*self.data.get()).len() }
107    }
108
109    fn is_empty(&self) -> bool {
110        self.available() == 0
111    }
112}
113
114// =============================================================================
115// BoundedPool - fixed capacity, pre-initialized
116// =============================================================================
117
118/// Fixed-capacity object pool with LIFO reuse.
119///
120/// All objects are pre-initialized at construction. When all objects are
121/// acquired, `try_acquire()` returns `None`.
122///
123/// # Example
124///
125/// ```
126/// use nexus_pool::local::BoundedPool;
127///
128/// let pool = BoundedPool::new(
129///     100,
130///     || Vec::<u8>::with_capacity(1024),
131///     |v| v.clear(),
132/// );
133///
134/// let mut buf = pool.try_acquire().unwrap();
135/// buf.extend_from_slice(b"hello");
136/// // buf auto-returns to pool on drop, clear() is called
137/// ```
138pub struct BoundedPool<T> {
139    inner: Rc<Inner<T>>,
140}
141
142impl<T> BoundedPool<T> {
143    /// Creates a pool with `capacity` pre-initialized objects.
144    ///
145    /// # Arguments
146    ///
147    /// * `capacity` - Number of objects to pre-allocate
148    /// * `init` - Factory function to create each object
149    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
150    ///
151    /// # Panics
152    ///
153    /// Panics if capacity is zero.
154    pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
155    where
156        I: FnMut() -> T,
157        R: FnMut(&mut T) + 'static,
158    {
159        assert!(capacity > 0, "capacity must be non-zero");
160
161        let mut data = Vec::with_capacity(capacity);
162        for _ in 0..capacity {
163            data.push(init());
164        }
165
166        Self {
167            inner: Rc::new(Inner::new_bounded(data, reset)),
168        }
169    }
170
171    /// Attempts to acquire an object from the pool.
172    ///
173    /// Returns `None` if all objects are currently in use.
174    #[inline]
175    pub fn try_acquire(&self) -> Option<Pooled<T>> {
176        self.inner.try_pop().map(|value| Pooled {
177            value: ManuallyDrop::new(value),
178            inner: Rc::clone(&self.inner),
179        })
180    }
181
182    /// Returns the number of available objects.
183    #[inline]
184    pub fn available(&self) -> usize {
185        self.inner.available()
186    }
187
188    /// Returns true if there are no more available objects.
189    pub fn is_empty(&self) -> bool {
190        self.inner.is_empty()
191    }
192}
193
194// =============================================================================
195// Pool - growable, creates on demand
196// =============================================================================
197
198/// Growable object pool with LIFO reuse.
199///
200/// Objects are created on demand via the factory function when the pool
201/// is empty. Use `try_acquire()` for the fast path that only returns
202/// pooled objects, or `acquire()` which may create new objects.
203///
204/// # Example
205///
206/// ```
207/// use nexus_pool::local::Pool;
208///
209/// let pool = Pool::new(
210///     || Vec::<u8>::with_capacity(1024),
211///     |v| v.clear(),
212/// );
213///
214/// let mut buf = pool.acquire(); // Creates new object
215/// buf.extend_from_slice(b"hello");
216/// drop(buf); // Returns to pool, clear() is called
217///
218/// let buf2 = pool.acquire(); // Reuses existing (now empty) object
219/// ```
220pub struct Pool<T> {
221    inner: Rc<Inner<T>>,
222}
223
224impl<T> Pool<T> {
225    /// Creates an empty pool with the given factory and reset functions.
226    ///
227    /// # Arguments
228    ///
229    /// * `factory` - Creates new objects when pool is empty
230    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
231    pub fn new<F, R>(factory: F, reset: R) -> Self
232    where
233        F: FnMut() -> T + 'static,
234        R: FnMut(&mut T) + 'static,
235    {
236        Self {
237            inner: Rc::new(Inner::new_growable(Vec::new(), factory, reset)),
238        }
239    }
240
241    /// Creates a pool pre-populated with `capacity` objects.
242    pub fn with_capacity<F, R>(capacity: usize, mut factory: F, reset: R) -> Self
243    where
244        F: FnMut() -> T + 'static,
245        R: FnMut(&mut T) + 'static,
246    {
247        let mut data = Vec::with_capacity(capacity);
248        for _ in 0..capacity {
249            data.push(factory());
250        }
251
252        Self {
253            inner: Rc::new(Inner::new_growable(data, factory, reset)),
254        }
255    }
256
257    /// Acquires an object from the pool, creating one if necessary.
258    ///
259    /// This always succeeds but may allocate if the pool is empty.
260    pub fn acquire(&self) -> Pooled<T> {
261        // SAFETY: Pool::new/with_capacity always calls new_growable, which
262        // initializes the factory via MaybeUninit::new. pop_or_create's
263        // precondition (factory initialized) is satisfied.
264        let value = unsafe { self.inner.pop_or_create() };
265        Pooled {
266            value: ManuallyDrop::new(value),
267            inner: Rc::clone(&self.inner),
268        }
269    }
270
271    /// Attempts to acquire an object from the pool without creating.
272    ///
273    /// Returns `None` if the pool is empty. This is the fast path.
274    #[inline]
275    pub fn try_acquire(&self) -> Option<Pooled<T>> {
276        self.inner.try_pop().map(|value| Pooled {
277            value: ManuallyDrop::new(value),
278            inner: Rc::clone(&self.inner),
279        })
280    }
281
282    /// Takes an object from the pool without an RAII guard, creating one
283    /// via the factory if the pool is empty.
284    ///
285    /// The caller is responsible for returning the object via [`put()`](Pool::put).
286    ///
287    /// # Example
288    ///
289    /// ```
290    /// use nexus_pool::local::Pool;
291    ///
292    /// let pool = Pool::new(
293    ///     || Vec::<u8>::with_capacity(1024),
294    ///     |v| v.clear(),
295    /// );
296    ///
297    /// let mut buf = pool.take();
298    /// buf.extend_from_slice(b"hello");
299    /// pool.put(buf); // manual return, reset is called
300    /// ```
301    #[inline]
302    pub fn take(&self) -> T {
303        // SAFETY: Pool::new/with_capacity always calls new_growable, which
304        // initializes the factory via MaybeUninit::new. pop_or_create's
305        // precondition (factory initialized) is satisfied.
306        unsafe { self.inner.pop_or_create() }
307    }
308
309    /// Takes an object from the pool if one is available, without creating.
310    ///
311    /// Returns `None` if the pool is empty. The caller is responsible for
312    /// returning the object via [`put()`](Pool::put).
313    #[inline]
314    pub fn try_take(&self) -> Option<T> {
315        self.inner.try_pop()
316    }
317
318    /// Returns an object to the pool.
319    ///
320    /// Calls the reset function, then pushes the value back onto the
321    /// available stack for reuse.
322    ///
323    /// # Panics
324    ///
325    /// If the reset closure panics, the value is leaked and the pool slot
326    /// is not returned. The panic propagates normally. Reset closures must
327    /// not panic — use simple operations like `Vec::clear()` or field resets.
328    #[inline]
329    pub fn put(&self, mut value: T) {
330        self.inner.return_value(&mut value);
331        self.inner.push(value);
332    }
333
334    /// Returns the number of available objects.
335    #[inline]
336    pub fn available(&self) -> usize {
337        self.inner.available()
338    }
339}
340
341impl<T> Drop for Pool<T> {
342    fn drop(&mut self) {
343        // SAFETY: Pool::new/with_capacity always calls new_growable, which
344        // initializes the factory via MaybeUninit::new. We must drop it here
345        // before the strong-count reaches zero, because Inner's Drop doesn't
346        // know whether factory was initialized (BoundedPool leaves it uninit).
347        //
348        // Factory-ownership invariant: the factory is reachable only through
349        // `Pool::acquire` / `Pool::take`, both of which take `&self` on
350        // `Pool`. Once `Pool` drops, no code path can touch the factory
351        // again — even if `Pooled` guards keep `Inner` alive past this
352        // point via the strong Rc, the factory has been destroyed and is
353        // unreachable. Inner has no `Drop` of its own, so the (now-empty)
354        // `MaybeUninit` slot dies harmlessly when the last guard drops.
355        unsafe {
356            let factory = &mut *self.inner.factory.get();
357            factory.assume_init_drop();
358        }
359    }
360}
361
362// =============================================================================
363// Pooled - RAII guard
364// =============================================================================
365
366/// RAII guard that returns the object to the pool on drop.
367///
368/// The object is always returned to the pool when the guard is dropped.
369/// There is no way to "take" the object out permanently.
370#[must_use = "dropping the guard immediately returns the object to the pool"]
371pub struct Pooled<T> {
372    value: ManuallyDrop<T>,
373    inner: Rc<Inner<T>>,
374}
375
376impl<T> Deref for Pooled<T> {
377    type Target = T;
378
379    #[inline]
380    fn deref(&self) -> &T {
381        &self.value
382    }
383}
384
385impl<T> DerefMut for Pooled<T> {
386    #[inline]
387    fn deref_mut(&mut self) -> &mut T {
388        &mut self.value
389    }
390}
391
392impl<T> Drop for Pooled<T> {
393    fn drop(&mut self) {
394        // Reset and return to pool. The strong Rc guarantees Inner is
395        // alive — no upgrade check needed. If `Pool` already dropped,
396        // Inner is now an "orphan": values keep flowing into its Vec
397        // and stay there until the last `Pooled` drops, at which point
398        // the strong-count hits zero and the Vec drops in one shot.
399        self.inner.return_value(&mut self.value);
400        // SAFETY: value is valid (ManuallyDrop preserves it until
401        // explicit take/drop). After take we never touch self.value
402        // again; Inner takes ownership via push.
403        let value = unsafe { ManuallyDrop::take(&mut self.value) };
404        self.inner.push(value);
405        // self.inner Rc drops here: one strong-count--. If we were the
406        // last guard, Inner's `data: UnsafeCell<Vec<T>>` drops, which
407        // drops every value still in the pool.
408    }
409}
410
411// =============================================================================
412// Tests
413// =============================================================================
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use std::cell::Cell;
419    use std::rc::Rc as StdRc;
420
421    #[test]
422    fn bounded_pool_basic() {
423        let pool = BoundedPool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
424
425        assert_eq!(pool.available(), 3);
426
427        let mut a = pool.try_acquire().unwrap();
428        assert_eq!(pool.available(), 2);
429
430        a.extend_from_slice(b"hello");
431        assert_eq!(&*a, b"hello");
432
433        let _b = pool.try_acquire().unwrap();
434        let _c = pool.try_acquire().unwrap();
435
436        assert_eq!(pool.available(), 0);
437
438        // Pool exhausted
439        assert!(pool.try_acquire().is_none());
440
441        drop(a);
442        assert_eq!(pool.available(), 1);
443
444        // Can acquire again - and it's been cleared
445        let d = pool.try_acquire().unwrap();
446        assert!(d.is_empty()); // reset was called
447    }
448
449    #[test]
450    fn bounded_pool_reset_called() {
451        let reset_count = StdRc::new(Cell::new(0));
452        let reset_count_clone = reset_count.clone();
453
454        let pool = BoundedPool::new(
455            2,
456            || 0u32,
457            move |_| {
458                reset_count_clone.set(reset_count_clone.get() + 1);
459            },
460        );
461
462        let a = pool.try_acquire().unwrap();
463        assert_eq!(reset_count.get(), 0);
464
465        drop(a);
466        assert_eq!(reset_count.get(), 1);
467
468        let b = pool.try_acquire().unwrap();
469        let c = pool.try_acquire().unwrap();
470        drop(b);
471        drop(c);
472        assert_eq!(reset_count.get(), 3);
473    }
474
475    #[test]
476    fn bounded_pool_outlives_guard() {
477        let guard;
478        {
479            let pool = BoundedPool::new(1, || String::from("test"), String::clear);
480            guard = pool.try_acquire().unwrap();
481        }
482        // Pool dropped, guard still valid (Inner survives via the
483        // strong Rc the guard holds).
484        assert_eq!(&*guard, "test");
485        // Drop guard — value returns to the orphaned Inner, then Inner
486        // dies (last strong ref) and drops the value. No leak, no UAF.
487        drop(guard);
488    }
489
490    #[test]
491    fn growable_pool_basic() {
492        let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
493
494        assert_eq!(pool.available(), 0);
495
496        // acquire creates new object
497        let mut a = pool.acquire();
498        a.extend_from_slice(b"hello");
499
500        drop(a);
501        assert_eq!(pool.available(), 1);
502
503        // acquire reuses - and it's been cleared
504        let b = pool.acquire();
505        assert!(b.is_empty()); // reset was called
506        assert_eq!(pool.available(), 0);
507    }
508
509    #[test]
510    fn growable_pool_try_acquire() {
511        let pool = Pool::new(|| 42u32, |_| {});
512
513        // Empty pool, try_acquire returns None
514        assert!(pool.try_acquire().is_none());
515
516        // acquire creates
517        let a = pool.acquire();
518        drop(a);
519
520        // Now try_acquire succeeds
521        let b = pool.try_acquire().unwrap();
522        assert_eq!(*b, 42);
523    }
524
525    #[test]
526    fn growable_pool_with_capacity() {
527        let pool = Pool::with_capacity(5, String::new, String::clear);
528
529        assert_eq!(pool.available(), 5);
530
531        let _a = pool.try_acquire().unwrap();
532        let _b = pool.try_acquire().unwrap();
533        assert_eq!(pool.available(), 3);
534    }
535
536    #[test]
537    fn growable_pool_outlives_guard() {
538        let guard;
539        {
540            let pool = Pool::new(|| String::from("test"), String::clear);
541            guard = pool.acquire();
542        }
543        // Pool dropped, guard still valid (Inner alive via guard's Rc).
544        assert_eq!(&*guard, "test");
545        // Guard drops; value returns to orphan, last strong ref dies,
546        // Inner's Vec drops the value. No leak, no UAF.
547        drop(guard);
548    }
549
550    #[test]
551    #[should_panic(expected = "capacity must be non-zero")]
552    fn bounded_pool_zero_capacity_panics() {
553        let _ = BoundedPool::new(0, || (), |()| {});
554    }
555
556    #[test]
557    fn take_put_basic() {
558        let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
559
560        let mut buf = pool.take();
561        buf.extend_from_slice(b"hello");
562        assert_eq!(&buf, b"hello");
563
564        pool.put(buf);
565        assert_eq!(pool.available(), 1);
566
567        let reused = pool.take();
568        assert!(reused.is_empty()); // reset was called
569    }
570
571    #[test]
572    fn try_take_empty_returns_none() {
573        let pool = Pool::new(|| 0u32, |_| {});
574
575        assert!(pool.try_take().is_none());
576
577        let v = pool.take(); // creates via factory
578        pool.put(v);
579
580        assert!(pool.try_take().is_some());
581    }
582
583    #[test]
584    fn take_put_reset_called() {
585        let reset_count = StdRc::new(Cell::new(0));
586        let rc = reset_count.clone();
587
588        let pool = Pool::new(
589            || 0u32,
590            move |_| {
591                rc.set(rc.get() + 1);
592            },
593        );
594
595        let v = pool.take();
596        assert_eq!(reset_count.get(), 0);
597
598        pool.put(v);
599        assert_eq!(reset_count.get(), 1);
600
601        let v = pool.take();
602        pool.put(v);
603        assert_eq!(reset_count.get(), 2);
604    }
605
606    #[test]
607    fn take_put_with_capacity() {
608        let pool = Pool::with_capacity(5, || String::from("init"), String::clear);
609        assert_eq!(pool.available(), 5);
610
611        let s = pool.try_take().unwrap();
612        assert_eq!(s, "init");
613        assert_eq!(pool.available(), 4);
614
615        pool.put(s);
616        assert_eq!(pool.available(), 5);
617    }
618
619    #[test]
620    fn mix_raii_and_manual() {
621        let pool = Pool::with_capacity(3, Vec::<u8>::new, Vec::clear);
622
623        // Take one manually
624        let mut manual = pool.take();
625        manual.push(1);
626
627        // Acquire one via RAII
628        let mut guard = pool.acquire();
629        guard.push(2);
630
631        assert_eq!(pool.available(), 1);
632
633        // Return manual
634        pool.put(manual);
635        assert_eq!(pool.available(), 2);
636
637        // Drop guard
638        drop(guard);
639        assert_eq!(pool.available(), 3);
640    }
641}