nexus_pool/local.rs
1//! Single-threaded object pools.
2//!
3//! Two variants:
4//! - [`BoundedPool`]: Fixed capacity, pre-initialized objects
5//! - [`Pool`]: Growable, creates objects on demand via factory
6//!
7//! Both use LIFO ordering for cache locality.
8
9use std::cell::UnsafeCell;
10use std::mem::{ManuallyDrop, MaybeUninit};
11use std::ops::{Deref, DerefMut};
12use std::rc::{Rc, Weak};
13
14// =============================================================================
15// Inner - shared storage for both pool types
16// =============================================================================
17
18#[repr(C)]
19struct Inner<T> {
20 /// Stack of available objects (LIFO)
21 data: UnsafeCell<Vec<T>>,
22
23 /// Reset function - called when object returns to pool
24 #[allow(clippy::type_complexity)]
25 reset: UnsafeCell<Box<dyn FnMut(&mut T)>>,
26
27 /// Factory function - only initialized for Pool, not BoundedPool
28 #[allow(clippy::type_complexity)]
29 factory: UnsafeCell<MaybeUninit<Box<dyn FnMut() -> T>>>,
30}
31
32impl<T> Inner<T> {
33 /// Create inner for BoundedPool - factory is NOT initialized
34 fn new_bounded<R>(data: Vec<T>, reset: R) -> Self
35 where
36 R: FnMut(&mut T) + 'static,
37 {
38 Self {
39 data: UnsafeCell::new(data),
40 reset: UnsafeCell::new(Box::new(reset)),
41 factory: UnsafeCell::new(MaybeUninit::uninit()),
42 }
43 }
44
45 /// Create inner for Pool - factory IS initialized
46 fn new_growable<F, R>(data: Vec<T>, factory: F, reset: R) -> Self
47 where
48 F: FnMut() -> T + 'static,
49 R: FnMut(&mut T) + 'static,
50 {
51 Self {
52 data: UnsafeCell::new(data),
53 reset: UnsafeCell::new(Box::new(reset)),
54 factory: UnsafeCell::new(MaybeUninit::new(Box::new(factory))),
55 }
56 }
57
58 /// Try to pop from available stack. Used by both pool types.
59 fn try_pop(&self) -> Option<T> {
60 // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool
61 // (both use Rc<Inner<T>> which is !Send + !Sync). No concurrent mutation.
62 let data = unsafe { &mut *self.data.get() };
63 data.pop()
64 }
65
66 /// Pop or create via factory.
67 ///
68 /// # Safety
69 ///
70 /// Caller must ensure factory was initialized (i.e., this is Pool, not BoundedPool)
71 #[allow(clippy::option_if_let_else)]
72 unsafe fn pop_or_create(&self) -> T {
73 // SAFETY: Single-threaded access enforced by !Sync on Pool (Rc-based).
74 // Caller guarantees factory is initialized (only called from Pool, not BoundedPool).
75 // assume_init_mut is sound because new_growable writes MaybeUninit::new(factory).
76 unsafe {
77 let data = &mut *self.data.get();
78 if let Some(value) = data.pop() {
79 value
80 } else {
81 let factory = &mut *self.factory.get();
82 (factory.assume_init_mut())()
83 }
84 }
85 }
86
87 /// Reset and return value to available stack
88 fn return_value(&self, value: &mut T) {
89 // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
90 // No concurrent mutation of the reset closure.
91 let reset = unsafe { &mut *self.reset.get() };
92 reset(value);
93 }
94
95 /// Push value back to available stack
96 fn push(&self, value: T) {
97 // SAFETY: Single-threaded access enforced by !Sync on BoundedPool/Pool (Rc-based).
98 // No concurrent mutation of the data vec.
99 let data = unsafe { &mut *self.data.get() };
100 data.push(value);
101 }
102
103 fn available(&self) -> usize {
104 // SAFETY: Single-threaded access enforced by !Sync (Rc-based). Reading len
105 // while no concurrent mutation is possible.
106 unsafe { (*self.data.get()).len() }
107 }
108
109 fn is_empty(&self) -> bool {
110 self.available() == 0
111 }
112}
113
114// =============================================================================
115// BoundedPool - fixed capacity, pre-initialized
116// =============================================================================
117
118/// Fixed-capacity object pool with LIFO reuse.
119///
120/// All objects are pre-initialized at construction. When all objects are
121/// acquired, `try_acquire()` returns `None`.
122///
123/// # Example
124///
125/// ```
126/// use nexus_pool::local::BoundedPool;
127///
128/// let pool = BoundedPool::new(
129/// 100,
130/// || Vec::<u8>::with_capacity(1024),
131/// |v| v.clear(),
132/// );
133///
134/// let mut buf = pool.try_acquire().unwrap();
135/// buf.extend_from_slice(b"hello");
136/// // buf auto-returns to pool on drop, clear() is called
137/// ```
138pub struct BoundedPool<T> {
139 inner: Rc<Inner<T>>,
140}
141
142impl<T> BoundedPool<T> {
143 /// Creates a pool with `capacity` pre-initialized objects.
144 ///
145 /// # Arguments
146 ///
147 /// * `capacity` - Number of objects to pre-allocate
148 /// * `init` - Factory function to create each object
149 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
150 ///
151 /// # Panics
152 ///
153 /// Panics if capacity is zero.
154 pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
155 where
156 I: FnMut() -> T,
157 R: FnMut(&mut T) + 'static,
158 {
159 assert!(capacity > 0, "capacity must be non-zero");
160
161 let mut data = Vec::with_capacity(capacity);
162 for _ in 0..capacity {
163 data.push(init());
164 }
165
166 Self {
167 inner: Rc::new(Inner::new_bounded(data, reset)),
168 }
169 }
170
171 /// Attempts to acquire an object from the pool.
172 ///
173 /// Returns `None` if all objects are currently in use.
174 pub fn try_acquire(&self) -> Option<Pooled<T>> {
175 self.inner.try_pop().map(|value| Pooled {
176 value: ManuallyDrop::new(value),
177 inner: Rc::downgrade(&self.inner),
178 })
179 }
180
181 /// Returns the number of available objects.
182 pub fn available(&self) -> usize {
183 self.inner.available()
184 }
185
186 /// Returns true if there are no more available objects.
187 pub fn is_empty(&self) -> bool {
188 self.inner.is_empty()
189 }
190}
191
192// =============================================================================
193// Pool - growable, creates on demand
194// =============================================================================
195
196/// Growable object pool with LIFO reuse.
197///
198/// Objects are created on demand via the factory function when the pool
199/// is empty. Use `try_acquire()` for the fast path that only returns
200/// pooled objects, or `acquire()` which may create new objects.
201///
202/// # Example
203///
204/// ```
205/// use nexus_pool::local::Pool;
206///
207/// let pool = Pool::new(
208/// || Vec::<u8>::with_capacity(1024),
209/// |v| v.clear(),
210/// );
211///
212/// let mut buf = pool.acquire(); // Creates new object
213/// buf.extend_from_slice(b"hello");
214/// drop(buf); // Returns to pool, clear() is called
215///
216/// let buf2 = pool.acquire(); // Reuses existing (now empty) object
217/// ```
218pub struct Pool<T> {
219 inner: Rc<Inner<T>>,
220}
221
222impl<T> Pool<T> {
223 /// Creates an empty pool with the given factory and reset functions.
224 ///
225 /// # Arguments
226 ///
227 /// * `factory` - Creates new objects when pool is empty
228 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
229 pub fn new<F, R>(factory: F, reset: R) -> Self
230 where
231 F: FnMut() -> T + 'static,
232 R: FnMut(&mut T) + 'static,
233 {
234 Self {
235 inner: Rc::new(Inner::new_growable(Vec::new(), factory, reset)),
236 }
237 }
238
239 /// Creates a pool pre-populated with `capacity` objects.
240 pub fn with_capacity<F, R>(capacity: usize, mut factory: F, reset: R) -> Self
241 where
242 F: FnMut() -> T + 'static,
243 R: FnMut(&mut T) + 'static,
244 {
245 let mut data = Vec::with_capacity(capacity);
246 for _ in 0..capacity {
247 data.push(factory());
248 }
249
250 Self {
251 inner: Rc::new(Inner::new_growable(data, factory, reset)),
252 }
253 }
254
255 /// Acquires an object from the pool, creating one if necessary.
256 ///
257 /// This always succeeds but may allocate if the pool is empty.
258 pub fn acquire(&self) -> Pooled<T> {
259 // SAFETY: Pool::new/with_capacity always calls new_growable, which
260 // initializes the factory via MaybeUninit::new. pop_or_create's
261 // precondition (factory initialized) is satisfied.
262 let value = unsafe { self.inner.pop_or_create() };
263 Pooled {
264 value: ManuallyDrop::new(value),
265 inner: Rc::downgrade(&self.inner),
266 }
267 }
268
269 /// Attempts to acquire an object from the pool without creating.
270 ///
271 /// Returns `None` if the pool is empty. This is the fast path.
272 pub fn try_acquire(&self) -> Option<Pooled<T>> {
273 self.inner.try_pop().map(|value| Pooled {
274 value: ManuallyDrop::new(value),
275 inner: Rc::downgrade(&self.inner),
276 })
277 }
278
279 /// Takes an object from the pool without an RAII guard, creating one
280 /// via the factory if the pool is empty.
281 ///
282 /// The caller is responsible for returning the object via [`put()`](Pool::put).
283 ///
284 /// # Example
285 ///
286 /// ```
287 /// use nexus_pool::local::Pool;
288 ///
289 /// let pool = Pool::new(
290 /// || Vec::<u8>::with_capacity(1024),
291 /// |v| v.clear(),
292 /// );
293 ///
294 /// let mut buf = pool.take();
295 /// buf.extend_from_slice(b"hello");
296 /// pool.put(buf); // manual return, reset is called
297 /// ```
298 pub fn take(&self) -> T {
299 // SAFETY: Pool::new/with_capacity always calls new_growable, which
300 // initializes the factory via MaybeUninit::new. pop_or_create's
301 // precondition (factory initialized) is satisfied.
302 unsafe { self.inner.pop_or_create() }
303 }
304
305 /// Takes an object from the pool if one is available, without creating.
306 ///
307 /// Returns `None` if the pool is empty. The caller is responsible for
308 /// returning the object via [`put()`](Pool::put).
309 pub fn try_take(&self) -> Option<T> {
310 self.inner.try_pop()
311 }
312
313 /// Returns an object to the pool.
314 ///
315 /// Calls the reset function, then pushes the value back onto the
316 /// available stack for reuse.
317 ///
318 /// # Panics
319 ///
320 /// If the reset closure panics, the value is leaked and the pool slot
321 /// is not returned. The panic propagates normally. Reset closures must
322 /// not panic — use simple operations like `Vec::clear()` or field resets.
323 pub fn put(&self, mut value: T) {
324 self.inner.return_value(&mut value);
325 self.inner.push(value);
326 }
327
328 /// Returns the number of available objects.
329 pub fn available(&self) -> usize {
330 self.inner.available()
331 }
332}
333
334impl<T> Drop for Pool<T> {
335 fn drop(&mut self) {
336 // SAFETY: Pool::new/with_capacity always calls new_growable, which
337 // initializes the factory via MaybeUninit::new. We must drop it here
338 // before Rc drops Inner, because Inner's Drop doesn't know whether
339 // factory was initialized (BoundedPool leaves it uninit).
340 unsafe {
341 let factory = &mut *self.inner.factory.get();
342 factory.assume_init_drop();
343 }
344 }
345}
346
347// =============================================================================
348// Pooled - RAII guard
349// =============================================================================
350
351/// RAII guard that returns the object to the pool on drop.
352///
353/// The object is always returned to the pool when the guard is dropped.
354/// There is no way to "take" the object out permanently.
355pub struct Pooled<T> {
356 value: ManuallyDrop<T>,
357 inner: Weak<Inner<T>>,
358}
359
360impl<T> Deref for Pooled<T> {
361 type Target = T;
362
363 #[inline]
364 fn deref(&self) -> &T {
365 &self.value
366 }
367}
368
369impl<T> DerefMut for Pooled<T> {
370 #[inline]
371 fn deref_mut(&mut self) -> &mut T {
372 &mut self.value
373 }
374}
375
376impl<T> Drop for Pooled<T> {
377 fn drop(&mut self) {
378 if let Some(inner) = self.inner.upgrade() {
379 // Reset and return to pool
380 inner.return_value(&mut self.value);
381 // SAFETY: value is valid (ManuallyDrop preserves it until explicit take/drop).
382 // After take, self.value is consumed and we never touch it again.
383 let value = unsafe { ManuallyDrop::take(&mut self.value) };
384 inner.push(value);
385 } else {
386 // SAFETY: Pool is gone. Value is valid (ManuallyDrop preserves it) and must
387 // be dropped to avoid a leak. After drop, we never touch self.value again.
388 unsafe { ManuallyDrop::drop(&mut self.value) };
389 }
390 }
391}
392
393// =============================================================================
394// Tests
395// =============================================================================
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use std::cell::Cell;
401 use std::rc::Rc as StdRc;
402
403 #[test]
404 fn bounded_pool_basic() {
405 let pool = BoundedPool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
406
407 assert_eq!(pool.available(), 3);
408
409 let mut a = pool.try_acquire().unwrap();
410 assert_eq!(pool.available(), 2);
411
412 a.extend_from_slice(b"hello");
413 assert_eq!(&*a, b"hello");
414
415 let _b = pool.try_acquire().unwrap();
416 let _c = pool.try_acquire().unwrap();
417
418 assert_eq!(pool.available(), 0);
419
420 // Pool exhausted
421 assert!(pool.try_acquire().is_none());
422
423 drop(a);
424 assert_eq!(pool.available(), 1);
425
426 // Can acquire again - and it's been cleared
427 let d = pool.try_acquire().unwrap();
428 assert!(d.is_empty()); // reset was called
429 }
430
431 #[test]
432 fn bounded_pool_reset_called() {
433 let reset_count = StdRc::new(Cell::new(0));
434 let reset_count_clone = reset_count.clone();
435
436 let pool = BoundedPool::new(
437 2,
438 || 0u32,
439 move |_| {
440 reset_count_clone.set(reset_count_clone.get() + 1);
441 },
442 );
443
444 let a = pool.try_acquire().unwrap();
445 assert_eq!(reset_count.get(), 0);
446
447 drop(a);
448 assert_eq!(reset_count.get(), 1);
449
450 let b = pool.try_acquire().unwrap();
451 let c = pool.try_acquire().unwrap();
452 drop(b);
453 drop(c);
454 assert_eq!(reset_count.get(), 3);
455 }
456
457 #[test]
458 fn bounded_pool_outlives_guard() {
459 let guard;
460 {
461 let pool = BoundedPool::new(1, || String::from("test"), String::clear);
462 guard = pool.try_acquire().unwrap();
463 }
464 // Pool dropped, guard still valid
465 assert_eq!(&*guard, "test");
466 // Drop guard - value is dropped, not returned (pool is gone)
467 drop(guard);
468 }
469
470 #[test]
471 fn growable_pool_basic() {
472 let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
473
474 assert_eq!(pool.available(), 0);
475
476 // acquire creates new object
477 let mut a = pool.acquire();
478 a.extend_from_slice(b"hello");
479
480 drop(a);
481 assert_eq!(pool.available(), 1);
482
483 // acquire reuses - and it's been cleared
484 let b = pool.acquire();
485 assert!(b.is_empty()); // reset was called
486 assert_eq!(pool.available(), 0);
487 }
488
489 #[test]
490 fn growable_pool_try_acquire() {
491 let pool = Pool::new(|| 42u32, |_| {});
492
493 // Empty pool, try_acquire returns None
494 assert!(pool.try_acquire().is_none());
495
496 // acquire creates
497 let a = pool.acquire();
498 drop(a);
499
500 // Now try_acquire succeeds
501 let b = pool.try_acquire().unwrap();
502 assert_eq!(*b, 42);
503 }
504
505 #[test]
506 fn growable_pool_with_capacity() {
507 let pool = Pool::with_capacity(5, String::new, String::clear);
508
509 assert_eq!(pool.available(), 5);
510
511 let _a = pool.try_acquire().unwrap();
512 let _b = pool.try_acquire().unwrap();
513 assert_eq!(pool.available(), 3);
514 }
515
516 #[test]
517 fn growable_pool_outlives_guard() {
518 let guard;
519 {
520 let pool = Pool::new(|| String::from("test"), String::clear);
521 guard = pool.acquire();
522 }
523 // Pool dropped, guard still valid
524 assert_eq!(&*guard, "test");
525 drop(guard);
526 }
527
528 #[test]
529 #[should_panic(expected = "capacity must be non-zero")]
530 fn bounded_pool_zero_capacity_panics() {
531 let _ = BoundedPool::new(0, || (), |()| {});
532 }
533
534 #[test]
535 fn take_put_basic() {
536 let pool = Pool::new(|| Vec::<u8>::with_capacity(16), Vec::clear);
537
538 let mut buf = pool.take();
539 buf.extend_from_slice(b"hello");
540 assert_eq!(&buf, b"hello");
541
542 pool.put(buf);
543 assert_eq!(pool.available(), 1);
544
545 let reused = pool.take();
546 assert!(reused.is_empty()); // reset was called
547 }
548
549 #[test]
550 fn try_take_empty_returns_none() {
551 let pool = Pool::new(|| 0u32, |_| {});
552
553 assert!(pool.try_take().is_none());
554
555 let v = pool.take(); // creates via factory
556 pool.put(v);
557
558 assert!(pool.try_take().is_some());
559 }
560
561 #[test]
562 fn take_put_reset_called() {
563 let reset_count = StdRc::new(Cell::new(0));
564 let rc = reset_count.clone();
565
566 let pool = Pool::new(
567 || 0u32,
568 move |_| {
569 rc.set(rc.get() + 1);
570 },
571 );
572
573 let v = pool.take();
574 assert_eq!(reset_count.get(), 0);
575
576 pool.put(v);
577 assert_eq!(reset_count.get(), 1);
578
579 let v = pool.take();
580 pool.put(v);
581 assert_eq!(reset_count.get(), 2);
582 }
583
584 #[test]
585 fn take_put_with_capacity() {
586 let pool = Pool::with_capacity(5, || String::from("init"), String::clear);
587 assert_eq!(pool.available(), 5);
588
589 let s = pool.try_take().unwrap();
590 assert_eq!(s, "init");
591 assert_eq!(pool.available(), 4);
592
593 pool.put(s);
594 assert_eq!(pool.available(), 5);
595 }
596
597 #[test]
598 fn mix_raii_and_manual() {
599 let pool = Pool::with_capacity(3, Vec::<u8>::new, Vec::clear);
600
601 // Take one manually
602 let mut manual = pool.take();
603 manual.push(1);
604
605 // Acquire one via RAII
606 let mut guard = pool.acquire();
607 guard.push(2);
608
609 assert_eq!(pool.available(), 1);
610
611 // Return manual
612 pool.put(manual);
613 assert_eq!(pool.available(), 2);
614
615 // Drop guard
616 drop(guard);
617 assert_eq!(pool.available(), 3);
618 }
619}