nexus_pool/local.rs
1//! Single-threaded object pools.
2//!
3//! Two variants:
4//! - [`BoundedPool`]: Fixed capacity, pre-initialized objects
5//! - [`Pool`]: Growable, creates objects on demand via factory
6//!
7//! Both use LIFO ordering for cache locality.
8
9use std::cell::UnsafeCell;
10use std::mem::{ManuallyDrop, MaybeUninit};
11use std::ops::{Deref, DerefMut};
12use std::rc::Rc;
13
14// =============================================================================
15// Inner - shared storage for both pool types
16// =============================================================================
17
18#[repr(C)]
19struct Inner<T> {
20 /// Stack of available objects (LIFO)
21 data: UnsafeCell<Vec<T>>,
22
23 /// Reset function - called when object returns to pool
24 #[allow(clippy::type_complexity)]
25 reset: UnsafeCell<Box<dyn FnMut(&mut T)>>,
26
27 /// Factory function - only initialized for Pool, not BoundedPool
28 #[allow(clippy::type_complexity)]
29 factory: UnsafeCell<MaybeUninit<Box<dyn FnMut() -> T>>>,
30}
31
32impl<T> Inner<T> {
33 /// Create inner for BoundedPool - factory is NOT initialized
34 fn new_bounded<R>(data: Vec<T>, reset: R) -> Self
35 where
36 R: FnMut(&mut T) + 'static,
37 {
38 Self {
39 data: UnsafeCell::new(data),
40 reset: UnsafeCell::new(Box::new(reset)),
41 factory: UnsafeCell::new(MaybeUninit::uninit()),
42 }
43 }
44
45 /// Create inner for Pool - factory IS initialized
46 fn new_growable<F, R>(data: Vec<T>, factory: F, reset: R) -> Self
47 where
48 F: FnMut() -> T + 'static,
49 R: FnMut(&mut T) + 'static,
50 {
51 Self {
52 data: UnsafeCell::new(data),
53 reset: UnsafeCell::new(Box::new(reset)),
54 factory: UnsafeCell::new(MaybeUninit::new(Box::new(factory))),
55 }
56 }
57
58 /// Try to pop from available stack. Used by both pool types.
59 fn try_pop(&self) -> Option<T> {
60 // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool
61 // (both use Rc<Inner<T>> which is !Send + !Sync). No concurrent mutation.
62 let data = unsafe { &mut *self.data.get() };
63 data.pop()
64 }
65
66 /// Pop or create via factory.
67 ///
68 /// # Safety
69 ///
70 /// Caller must ensure factory was initialized (i.e., this is Pool, not BoundedPool)
71 #[allow(clippy::option_if_let_else)]
72 unsafe fn pop_or_create(&self) -> T {
73 // SAFETY: Single-threaded access enforced by !Sync on Pool (Rc-based).
74 // Caller guarantees factory is initialized (only called from Pool, not BoundedPool).
75 // assume_init_mut is sound because new_growable writes MaybeUninit::new(factory).
76 unsafe {
77 let data = &mut *self.data.get();
78 if let Some(value) = data.pop() {
79 value
80 } else {
81 let factory = &mut *self.factory.get();
82 (factory.assume_init_mut())()
83 }
84 }
85 }
86
87 /// Reset and return value to available stack
88 fn return_value(&self, value: &mut T) {
89 // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
90 // No concurrent mutation of the reset closure.
91 let reset = unsafe { &mut *self.reset.get() };
92 reset(value);
93 }
94
95 /// Push value back to available stack
96 fn push(&self, value: T) {
97 // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
98 // No concurrent mutation of the data vec.
99 let data = unsafe { &mut *self.data.get() };
100 data.push(value);
101 }
102
103 fn available(&self) -> usize {
104 // SAFETY: Single-threaded access enforced by !Sync (Rc-based). Reading len
105 // while no concurrent mutation is possible.
106 unsafe { (*self.data.get()).len() }
107 }
108
109 fn is_empty(&self) -> bool {
110 self.available() == 0
111 }
112}
113
114// =============================================================================
115// BoundedPool - fixed capacity, pre-initialized
116// =============================================================================
117
118/// Fixed-capacity object pool with LIFO reuse.
119///
120/// All objects are pre-initialized at construction. When all objects are
121/// acquired, `try_acquire()` returns `None`.
122///
123/// # Example
124///
125/// ```
126/// use nexus_pool::local::BoundedPool;
127///
128/// let pool = BoundedPool::new(
129/// 100,
130/// || Vec::<u8>::with_capacity(1024),
131/// |v| v.clear(),
132/// );
133///
134/// let mut buf = pool.try_acquire().unwrap();
135/// buf.extend_from_slice(b"hello");
136/// // buf auto-returns to pool on drop, clear() is called
137/// ```
138pub struct BoundedPool<T> {
139 inner: Rc<Inner<T>>,
140}
141
142impl<T> BoundedPool<T> {
143 /// Creates a pool with `capacity` pre-initialized objects.
144 ///
145 /// # Arguments
146 ///
147 /// * `capacity` - Number of objects to pre-allocate
148 /// * `init` - Factory function to create each object
149 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
150 ///
151 /// # Panics
152 ///
153 /// Panics if capacity is zero.
154 pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
155 where
156 I: FnMut() -> T,
157 R: FnMut(&mut T) + 'static,
158 {
159 assert!(capacity > 0, "capacity must be non-zero");
160
161 let mut data = Vec::with_capacity(capacity);
162 for _ in 0..capacity {
163 data.push(init());
164 }
165
166 Self {
167 inner: Rc::new(Inner::new_bounded(data, reset)),
168 }
169 }
170
171 /// Attempts to acquire an object from the pool.
172 ///
173 /// Returns `None` if all objects are currently in use.
174 #[inline]
175 pub fn try_acquire(&self) -> Option<Pooled<T>> {
176 self.inner.try_pop().map(|value| Pooled {
177 value: ManuallyDrop::new(value),
178 inner: Rc::clone(&self.inner),
179 })
180 }
181
182 /// Returns the number of available objects.
183 #[inline]
184 pub fn available(&self) -> usize {
185 self.inner.available()
186 }
187
188 /// Returns true if there are no more available objects.
189 pub fn is_empty(&self) -> bool {
190 self.inner.is_empty()
191 }
192}
193
194// =============================================================================
195// Pool - growable, creates on demand
196// =============================================================================
197
198/// Growable object pool with LIFO reuse.
199///
200/// Objects are created on demand via the factory function when the pool
201/// is empty. Use `try_acquire()` for the fast path that only returns
202/// pooled objects, or `acquire()` which may create new objects.
203///
204/// # Example
205///
206/// ```
207/// use nexus_pool::local::Pool;
208///
209/// let pool = Pool::new(
210/// || Vec::<u8>::with_capacity(1024),
211/// |v| v.clear(),
212/// );
213///
214/// let mut buf = pool.acquire(); // Creates new object
215/// buf.extend_from_slice(b"hello");
216/// drop(buf); // Returns to pool, clear() is called
217///
218/// let buf2 = pool.acquire(); // Reuses existing (now empty) object
219/// ```
220pub struct Pool<T> {
221 inner: Rc<Inner<T>>,
222}
223
224impl<T> Pool<T> {
225 /// Creates an empty pool with the given factory and reset functions.
226 ///
227 /// # Arguments
228 ///
229 /// * `factory` - Creates new objects when pool is empty
230 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
231 pub fn new<F, R>(factory: F, reset: R) -> Self
232 where
233 F: FnMut() -> T + 'static,
234 R: FnMut(&mut T) + 'static,
235 {
236 Self {
237 inner: Rc::new(Inner::new_growable(Vec::new(), factory, reset)),
238 }
239 }
240
241 /// Creates a pool pre-populated with `capacity` objects.
242 pub fn with_capacity<F, R>(capacity: usize, mut factory: F, reset: R) -> Self
243 where
244 F: FnMut() -> T + 'static,
245 R: FnMut(&mut T) + 'static,
246 {
247 let mut data = Vec::with_capacity(capacity);
248 for _ in 0..capacity {
249 data.push(factory());
250 }
251
252 Self {
253 inner: Rc::new(Inner::new_growable(data, factory, reset)),
254 }
255 }
256
257 /// Acquires an object from the pool, creating one if necessary.
258 ///
259 /// This always succeeds but may allocate if the pool is empty.
260 pub fn acquire(&self) -> Pooled<T> {
261 // SAFETY: Pool::new/with_capacity always calls new_growable, which
262 // initializes the factory via MaybeUninit::new. pop_or_create's
263 // precondition (factory initialized) is satisfied.
264 let value = unsafe { self.inner.pop_or_create() };
265 Pooled {
266 value: ManuallyDrop::new(value),
267 inner: Rc::clone(&self.inner),
268 }
269 }
270
271 /// Attempts to acquire an object from the pool without creating.
272 ///
273 /// Returns `None` if the pool is empty. This is the fast path.
274 #[inline]
275 pub fn try_acquire(&self) -> Option<Pooled<T>> {
276 self.inner.try_pop().map(|value| Pooled {
277 value: ManuallyDrop::new(value),
278 inner: Rc::clone(&self.inner),
279 })
280 }
281
282 /// Takes an object from the pool without an RAII guard, creating one
283 /// via the factory if the pool is empty.
284 ///
285 /// The caller is responsible for returning the object via [`put()`](Pool::put).
286 ///
287 /// # Example
288 ///
289 /// ```
290 /// use nexus_pool::local::Pool;
291 ///
292 /// let pool = Pool::new(
293 /// || Vec::<u8>::with_capacity(1024),
294 /// |v| v.clear(),
295 /// );
296 ///
297 /// let mut buf = pool.take();
298 /// buf.extend_from_slice(b"hello");
299 /// pool.put(buf); // manual return, reset is called
300 /// ```
301 #[inline]
302 pub fn take(&self) -> T {
303 // SAFETY: Pool::new/with_capacity always calls new_growable, which
304 // initializes the factory via MaybeUninit::new. pop_or_create's
305 // precondition (factory initialized) is satisfied.
306 unsafe { self.inner.pop_or_create() }
307 }
308
309 /// Takes an object from the pool if one is available, without creating.
310 ///
311 /// Returns `None` if the pool is empty. The caller is responsible for
312 /// returning the object via [`put()`](Pool::put).
313 #[inline]
314 pub fn try_take(&self) -> Option<T> {
315 self.inner.try_pop()
316 }
317
318 /// Returns an object to the pool.
319 ///
320 /// Calls the reset function, then pushes the value back onto the
321 /// available stack for reuse.
322 ///
323 /// # Panics
324 ///
325 /// If the reset closure panics, the value is leaked and the pool slot
326 /// is not returned. The panic propagates normally. Reset closures must
327 /// not panic — use simple operations like `Vec::clear()` or field resets.
328 #[inline]
329 pub fn put(&self, mut value: T) {
330 self.inner.return_value(&mut value);
331 self.inner.push(value);
332 }
333
334 /// Returns the number of available objects.
335 #[inline]
336 pub fn available(&self) -> usize {
337 self.inner.available()
338 }
339}
340
341impl<T> Drop for Pool<T> {
342 fn drop(&mut self) {
343 // SAFETY: Pool::new/with_capacity always calls new_growable, which
344 // initializes the factory via MaybeUninit::new. We must drop it here
345 // before the strong-count reaches zero, because Inner's Drop doesn't
346 // know whether factory was initialized (BoundedPool leaves it uninit).
347 //
348 // Factory-ownership invariant: the factory is reachable only through
349 // `Pool::acquire` / `Pool::take`, both of which take `&self` on
350 // `Pool`. Once `Pool` drops, no code path can touch the factory
351 // again — even if `Pooled` guards keep `Inner` alive past this
352 // point via the strong Rc, the factory has been destroyed and is
353 // unreachable. Inner has no `Drop` of its own, so the (now-empty)
354 // `MaybeUninit` slot dies harmlessly when the last guard drops.
355 unsafe {
356 let factory = &mut *self.inner.factory.get();
357 factory.assume_init_drop();
358 }
359 }
360}
361
362// =============================================================================
363// Pooled - RAII guard
364// =============================================================================
365
366/// RAII guard that returns the object to the pool on drop.
367///
368/// The object is always returned to the pool when the guard is dropped.
369/// There is no way to "take" the object out permanently.
370#[must_use = "dropping the guard immediately returns the object to the pool"]
371pub struct Pooled<T> {
372 value: ManuallyDrop<T>,
373 inner: Rc<Inner<T>>,
374}
375
376impl<T> Deref for Pooled<T> {
377 type Target = T;
378
379 #[inline]
380 fn deref(&self) -> &T {
381 &self.value
382 }
383}
384
385impl<T> DerefMut for Pooled<T> {
386 #[inline]
387 fn deref_mut(&mut self) -> &mut T {
388 &mut self.value
389 }
390}
391
392impl<T> Drop for Pooled<T> {
393 fn drop(&mut self) {
394 // Reset and return to pool. The strong Rc guarantees Inner is
395 // alive — no upgrade check needed. If `Pool` already dropped,
396 // Inner is now an "orphan": values keep flowing into its Vec
397 // and stay there until the last `Pooled` drops, at which point
398 // the strong-count hits zero and the Vec drops in one shot.
399 self.inner.return_value(&mut self.value);
400 // SAFETY: value is valid (ManuallyDrop preserves it until
401 // explicit take/drop). After take we never touch self.value
402 // again; Inner takes ownership via push.
403 let value = unsafe { ManuallyDrop::take(&mut self.value) };
404 self.inner.push(value);
405 // self.inner Rc drops here: one strong-count--. If we were the
406 // last guard, Inner's `data: UnsafeCell<Vec<T>>` drops, which
407 // drops every value still in the pool.
408 }
409}
410
411// =============================================================================
412// Tests
413// =============================================================================
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use std::cell::Cell;
419 use std::rc::Rc as StdRc;
420
421 #[test]
422 fn bounded_pool_basic() {
423 let pool = BoundedPool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
424
425 assert_eq!(pool.available(), 3);
426
427 let mut a = pool.try_acquire().unwrap();
428 assert_eq!(pool.available(), 2);
429
430 a.extend_from_slice(b"hello");
431 assert_eq!(&*a, b"hello");
432
433 let _b = pool.try_acquire().unwrap();
434 let _c = pool.try_acquire().unwrap();
435
436 assert_eq!(pool.available(), 0);
437
438 // Pool exhausted
439 assert!(pool.try_acquire().is_none());
440
441 drop(a);
442 assert_eq!(pool.available(), 1);
443
444 // Can acquire again - and it's been cleared
445 let d = pool.try_acquire().unwrap();
446 assert!(d.is_empty()); // reset was called
447 }
448
449 #[test]
450 fn bounded_pool_reset_called() {
451 let reset_count = StdRc::new(Cell::new(0));
452 let reset_count_clone = reset_count.clone();
453
454 let pool = BoundedPool::new(
455 2,
456 || 0u32,
457 move |_| {
458 reset_count_clone.set(reset_count_clone.get() + 1);
459 },
460 );
461
462 let a = pool.try_acquire().unwrap();
463 assert_eq!(reset_count.get(), 0);
464
465 drop(a);
466 assert_eq!(reset_count.get(), 1);
467
468 let b = pool.try_acquire().unwrap();
469 let c = pool.try_acquire().unwrap();
470 drop(b);
471 drop(c);
472 assert_eq!(reset_count.get(), 3);
473 }
474
475 #[test]
476 fn bounded_pool_outlives_guard() {
477 let guard;
478 {
479 let pool = BoundedPool::new(1, || String::from("test"), String::clear);
480 guard = pool.try_acquire().unwrap();
481 }
482 // Pool dropped, guard still valid (Inner survives via the
483 // strong Rc the guard holds).
484 assert_eq!(&*guard, "test");
485 // Drop guard — value returns to the orphaned Inner, then Inner
486 // dies (last strong ref) and drops the value. No leak, no UAF.
487 drop(guard);
488 }
489
490 #[test]
491 fn growable_pool_basic() {
492 let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
493
494 assert_eq!(pool.available(), 0);
495
496 // acquire creates new object
497 let mut a = pool.acquire();
498 a.extend_from_slice(b"hello");
499
500 drop(a);
501 assert_eq!(pool.available(), 1);
502
503 // acquire reuses - and it's been cleared
504 let b = pool.acquire();
505 assert!(b.is_empty()); // reset was called
506 assert_eq!(pool.available(), 0);
507 }
508
509 #[test]
510 fn growable_pool_try_acquire() {
511 let pool = Pool::new(|| 42u32, |_| {});
512
513 // Empty pool, try_acquire returns None
514 assert!(pool.try_acquire().is_none());
515
516 // acquire creates
517 let a = pool.acquire();
518 drop(a);
519
520 // Now try_acquire succeeds
521 let b = pool.try_acquire().unwrap();
522 assert_eq!(*b, 42);
523 }
524
525 #[test]
526 fn growable_pool_with_capacity() {
527 let pool = Pool::with_capacity(5, String::new, String::clear);
528
529 assert_eq!(pool.available(), 5);
530
531 let _a = pool.try_acquire().unwrap();
532 let _b = pool.try_acquire().unwrap();
533 assert_eq!(pool.available(), 3);
534 }
535
536 #[test]
537 fn growable_pool_outlives_guard() {
538 let guard;
539 {
540 let pool = Pool::new(|| String::from("test"), String::clear);
541 guard = pool.acquire();
542 }
543 // Pool dropped, guard still valid (Inner alive via guard's Rc).
544 assert_eq!(&*guard, "test");
545 // Guard drops; value returns to orphan, last strong ref dies,
546 // Inner's Vec drops the value. No leak, no UAF.
547 drop(guard);
548 }
549
550 #[test]
551 #[should_panic(expected = "capacity must be non-zero")]
552 fn bounded_pool_zero_capacity_panics() {
553 let _ = BoundedPool::new(0, || (), |()| {});
554 }
555
556 #[test]
557 fn take_put_basic() {
558 let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
559
560 let mut buf = pool.take();
561 buf.extend_from_slice(b"hello");
562 assert_eq!(&buf, b"hello");
563
564 pool.put(buf);
565 assert_eq!(pool.available(), 1);
566
567 let reused = pool.take();
568 assert!(reused.is_empty()); // reset was called
569 }
570
571 #[test]
572 fn try_take_empty_returns_none() {
573 let pool = Pool::new(|| 0u32, |_| {});
574
575 assert!(pool.try_take().is_none());
576
577 let v = pool.take(); // creates via factory
578 pool.put(v);
579
580 assert!(pool.try_take().is_some());
581 }
582
583 #[test]
584 fn take_put_reset_called() {
585 let reset_count = StdRc::new(Cell::new(0));
586 let rc = reset_count.clone();
587
588 let pool = Pool::new(
589 || 0u32,
590 move |_| {
591 rc.set(rc.get() + 1);
592 },
593 );
594
595 let v = pool.take();
596 assert_eq!(reset_count.get(), 0);
597
598 pool.put(v);
599 assert_eq!(reset_count.get(), 1);
600
601 let v = pool.take();
602 pool.put(v);
603 assert_eq!(reset_count.get(), 2);
604 }
605
606 #[test]
607 fn take_put_with_capacity() {
608 let pool = Pool::with_capacity(5, || String::from("init"), String::clear);
609 assert_eq!(pool.available(), 5);
610
611 let s = pool.try_take().unwrap();
612 assert_eq!(s, "init");
613 assert_eq!(pool.available(), 4);
614
615 pool.put(s);
616 assert_eq!(pool.available(), 5);
617 }
618
619 #[test]
620 fn mix_raii_and_manual() {
621 let pool = Pool::with_capacity(3, Vec::<u8>::new, Vec::clear);
622
623 // Take one manually
624 let mut manual = pool.take();
625 manual.push(1);
626
627 // Acquire one via RAII
628 let mut guard = pool.acquire();
629 guard.push(2);
630
631 assert_eq!(pool.available(), 1);
632
633 // Return manual
634 pool.put(manual);
635 assert_eq!(pool.available(), 2);
636
637 // Drop guard
638 drop(guard);
639 assert_eq!(pool.available(), 3);
640 }
641}