nexus_pool/sync.rs
1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14const NONE: usize = usize::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21 value: UnsafeCell<MaybeUninit<T>>,
22 next: AtomicUsize,
23}
24
25// SAFETY: Slot is Send because value is only accessed when the slot is "owned"
26// (popped from free list), so no concurrent access occurs. next is AtomicUsize.
27unsafe impl<T: Send> Send for Slot<T> {}
28// SAFETY: Slot is Sync because value (UnsafeCell) is only accessed when owned
29// (not in the free list), so no data race is possible. next is AtomicUsize (Sync).
30unsafe impl<T: Send + Sync> Sync for Slot<T> {}
31
32// =============================================================================
33// Inner - shared pool state
34// =============================================================================
35
36struct Inner<T> {
37 slots: Box<[Slot<T>]>,
38 free_head: AtomicUsize,
39 free_count: AtomicUsize,
40 reset: Box<dyn Fn(&mut T) + Send + Sync>,
41}
42
43impl<T> Inner<T> {
44 /// Push a slot back onto the free list. Called from any thread.
45 fn push(&self, idx: usize, mut value: T) {
46 // Reset the value
47 (self.reset)(&mut value);
48
49 // SAFETY: We own this slot (it was popped from the free list via CAS).
50 // No other thread can access it until we push it back. MaybeUninit::write
51 // initializes the slot for the next acquirer.
52 unsafe {
53 (*self.slots[idx].value.get()).write(value);
54 }
55
56 // Link into free list with CAS loop
57 loop {
58 let head = self.free_head.load(Ordering::Relaxed);
59 self.slots[idx].next.store(head, Ordering::Relaxed);
60
61 match self.free_head.compare_exchange_weak(
62 head,
63 idx,
64 Ordering::Release, // Publishes value write + next write
65 Ordering::Relaxed, // Failure just retries
66 ) {
67 Ok(_) => {
68 self.free_count.fetch_add(1, Ordering::Relaxed);
69 return;
70 }
71 Err(_) => std::hint::spin_loop(),
72 }
73 }
74 }
75
76 /// Pop a slot from the free list. Called only from Acquirer thread.
77 fn pop(&self) -> Option<usize> {
78 loop {
79 let head = self.free_head.load(Ordering::Acquire);
80 if head == NONE {
81 return None;
82 }
83
84 // Read next - safe because we Acquired head, syncs with pusher's Release
85 let next = self.slots[head].next.load(Ordering::Relaxed);
86
87 match self.free_head.compare_exchange_weak(
88 head,
89 next,
90 Ordering::Acquire, // Syncs with pusher's Release
91 Ordering::Acquire, // On fail, need to see new head
92 ) {
93 Ok(_) => {
94 self.free_count.fetch_sub(1, Ordering::Relaxed);
95 return Some(head);
96 }
97 Err(_) => {
98 // Pusher added something newer - retry for hotter item
99 std::hint::spin_loop();
100 }
101 }
102 }
103 }
104
105 /// Get reference to value at index.
106 ///
107 /// # Safety
108 ///
109 /// Caller must own the slot (have popped it) and slot must contain valid value.
110 unsafe fn read_value(&self, idx: usize) -> T {
111 // SAFETY: Caller guarantees slot was popped (owned) and contains a valid value
112 // written by new() or push(). assume_init_read moves the value out without
113 // dropping the MaybeUninit, which is correct since the slot will be rewritten
114 // on the next push.
115 unsafe { (*self.slots[idx].value.get()).assume_init_read() }
116 }
117}
118
119impl<T> Drop for Inner<T> {
120 fn drop(&mut self) {
121 // Inner::drop runs only when the last Arc dies — which means
122 // no `Pooled` guards are alive (each guard holds a strong
123 // Arc). So every slot that was ever acquired has been pushed
124 // back via `Pooled::drop`, and the free list now contains
125 // every slot we own. Drop them all here. The only way a slot
126 // can be missing from the free list at this point is if a
127 // reset closure panicked during a guard's drop — that path
128 // leaks the value (documented in caveats.md §1) and is the
129 // same as the 1.0.x behavior.
130 let mut idx = *self.free_head.get_mut();
131 while idx != NONE {
132 // SAFETY: Slots in the free list contain valid values (written by new()
133 // or push()). Slots NOT in the free list have been moved out via
134 // assume_init_read in pop and are handled by Pooled's Drop. get_mut is
135 // safe because we have &mut self (exclusive access during drop).
136 unsafe {
137 (*self.slots[idx].value.get()).assume_init_drop();
138 }
139 idx = *self.slots[idx].next.get_mut();
140 }
141 // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
142 // deallocate memory without double-dropping.
143 }
144}
145
146// =============================================================================
147// Pool - the pool and acquire handle combined
148// =============================================================================
149
150/// A bounded pool where one thread acquires and any thread can return.
151///
152/// Only one `Pool` exists per pool. It cannot be cloned or shared
153/// across threads (it is `Send` but not `Sync` or `Clone`).
154///
155/// When the `Pool` is dropped while `Pooled` guards are still alive,
156/// the guards keep `Inner` alive via a strong `Arc`. Each guard, on
157/// drop, returns its value to the (now-orphaned) free list; the last
158/// guard to drop releases `Inner`, which drops every in-pool slot.
159/// See `docs/caveats.md` §2.
160///
161/// # Example
162///
163/// ```
164/// use nexus_pool::sync::Pool;
165///
166/// let acquirer = Pool::new(
167/// 100,
168/// || Vec::<u8>::with_capacity(1024),
169/// |v| v.clear(),
170/// );
171///
172/// // Acquirer thread
173/// let mut buf = acquirer.try_acquire().unwrap();
174/// buf.extend_from_slice(b"hello");
175///
176/// // Can send buf to another thread
177/// std::thread::spawn(move || {
178/// println!("{:?}", &*buf);
179/// // buf returns to pool on drop
180/// }).join().unwrap();
181/// ```
182pub struct Pool<T> {
183 inner: Arc<Inner<T>>,
184}
185
186// SAFETY: Pool is Send (can be moved to another thread) but not Sync (not shared).
187// Inner uses atomics for the free list. Values in UnsafeCell are only accessed
188// when a slot is owned (popped via CAS). T: Send ensures values can cross threads.
189// Pool is not Clone — single acquirer enforced at the type level.
190#[allow(clippy::non_send_fields_in_send_ty)]
191unsafe impl<T: Send> Send for Pool<T> {}
192
193impl<T> Pool<T> {
194 /// Creates a pool with `capacity` pre-initialized objects.
195 ///
196 /// # Arguments
197 ///
198 /// * `capacity` - Number of objects to pre-allocate
199 /// * `init` - Factory function to create each object
200 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
201 ///
202 /// # Panics
203 ///
204 /// Panics if capacity is zero or exceeds `usize::MAX - 1`.
205 ///
206 /// The `reset` closure must not panic. If it does, the value is leaked
207 /// and the pool slot is not returned. Use simple operations like
208 /// `Vec::clear()` or field resets.
209 pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
210 where
211 I: FnMut() -> T,
212 R: Fn(&mut T) + Send + Sync + 'static,
213 {
214 assert!(capacity > 0, "capacity must be non-zero");
215 assert!(capacity < NONE, "capacity must be less than {}", NONE);
216
217 // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
218 let slots: Box<[Slot<T>]> = (0..capacity)
219 .map(|i| Slot {
220 value: UnsafeCell::new(MaybeUninit::new(init())),
221 next: AtomicUsize::new(if i + 1 < capacity { i + 1 } else { NONE }),
222 })
223 .collect();
224
225 Self {
226 inner: Arc::new(Inner {
227 slots,
228 free_head: AtomicUsize::new(0), // Head of free list
229 free_count: AtomicUsize::new(capacity),
230 reset: Box::new(reset),
231 }),
232 }
233 }
234
235 /// Attempts to acquire an object from the pool.
236 ///
237 /// Returns `None` if all objects are currently in use.
238 #[inline]
239 pub fn try_acquire(&self) -> Option<Pooled<T>> {
240 self.inner.pop().map(|idx| {
241 // SAFETY: We just popped this slot via CAS (exclusive ownership).
242 // The slot contains a valid value written by new() or a prior push().
243 let value = unsafe { self.inner.read_value(idx) };
244 Pooled {
245 value: ManuallyDrop::new(value),
246 idx,
247 inner: Arc::clone(&self.inner),
248 }
249 })
250 }
251
252 /// Returns the number of available objects.
253 ///
254 /// O(1) — backed by an atomic counter. This is a snapshot and may
255 /// be immediately outdated if other threads are returning objects
256 /// concurrently.
257 #[inline]
258 pub fn available(&self) -> usize {
259 self.inner.free_count.load(Ordering::Relaxed)
260 }
261}
262
263// =============================================================================
264// Pooled - RAII guard
265// =============================================================================
266
267/// RAII guard that returns the object to the pool on drop.
268///
269/// This guard can be sent to other threads. When dropped, the object
270/// is automatically returned to the pool's storage. If the pool was
271/// already dropped, the value goes back into the orphaned `Inner`,
272/// which finally dies (along with every in-pool value) when the last
273/// `Pooled` exits. See `docs/caveats.md` §2.
274#[must_use = "dropping the guard immediately returns the object to the pool"]
275pub struct Pooled<T> {
276 value: ManuallyDrop<T>,
277 idx: usize,
278 inner: Arc<Inner<T>>,
279}
280
281// SAFETY: Pooled owns its value exclusively (ManuallyDrop<T>). The Arc<Inner<T>>
282// is only used during drop to push the slot back via atomic CAS. T: Send ensures
283// the value can cross threads. Arc<T: Send> is itself Send (same as Weak was).
284#[allow(clippy::non_send_fields_in_send_ty)]
285unsafe impl<T: Send> Send for Pooled<T> {}
286// SAFETY: Pooled's &self access goes through Deref to &T, which requires T: Sync.
287// The Arc and idx fields are not mutated through &self.
288unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
289
290impl<T> Deref for Pooled<T> {
291 type Target = T;
292
293 #[inline]
294 fn deref(&self) -> &T {
295 &self.value
296 }
297}
298
299impl<T> DerefMut for Pooled<T> {
300 #[inline]
301 fn deref_mut(&mut self) -> &mut T {
302 &mut self.value
303 }
304}
305
306impl<T> Drop for Pooled<T> {
307 fn drop(&mut self) {
308 // SAFETY: self.inner is a strong Arc — Inner is alive by
309 // construction. Value is valid (ManuallyDrop preserves it
310 // until explicit take/drop). After take, self.value is
311 // consumed; inner.push writes it back to the slot.
312 let value = unsafe { ManuallyDrop::take(&mut self.value) };
313 self.inner.push(self.idx, value);
314 // self.inner Arc drops here: one strong fetch_sub. If we were
315 // the last reference, Inner::drop walks the free list and
316 // assume_init_drops every in-pool slot (including the one we
317 // just pushed). Arc's fetch_sub(Release) + last-drop
318 // fence(Acquire) is the canonical "single-writer drops after
319 // all readers" pattern — sufficient for the refcount alone;
320 // the value transfer is independently ordered by the Release
321 // CAS in `Inner::push`.
322 }
323}
324
325// =============================================================================
326// Tests
327// =============================================================================
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use std::sync::atomic::AtomicUsize;
333 use std::thread;
334
335 #[test]
336 fn basic_acquire_release() {
337 let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
338
339 let mut a = acquirer.try_acquire().unwrap();
340 a.extend_from_slice(b"hello");
341 assert_eq!(&*a, b"hello");
342
343 let _b = acquirer.try_acquire().unwrap();
344 let _c = acquirer.try_acquire().unwrap();
345
346 // Pool exhausted
347 assert!(acquirer.try_acquire().is_none());
348
349 // Return one
350 drop(a);
351
352 // Can acquire again - and it's been cleared
353 let d = acquirer.try_acquire().unwrap();
354 assert!(d.is_empty());
355 }
356
357 #[test]
358 fn cross_thread_return() {
359 let acquirer = Pool::new(2, || 42u32, |_| {});
360
361 let item = acquirer.try_acquire().unwrap();
362 assert_eq!(*item, 42);
363
364 // Send to another thread to drop
365 thread::spawn(move || {
366 assert_eq!(*item, 42);
367 drop(item);
368 })
369 .join()
370 .unwrap();
371
372 // Should be back in pool
373 let item2 = acquirer.try_acquire().unwrap();
374 assert_eq!(*item2, 42);
375 }
376
377 #[test]
378 fn acquirer_dropped_first() {
379 let item;
380 {
381 let acquirer = Pool::new(1, || String::from("test"), String::clear);
382 item = acquirer.try_acquire().unwrap();
383 // acquirer drops here — its Arc decrements but the item
384 // still holds a strong Arc, so Inner survives.
385 }
386 // item still valid — we can access it.
387 assert_eq!(&*item, "test");
388 // item drops here: returns slot to the orphan free list, Arc
389 // hits zero, Inner::drop walks the free list and drops every
390 // in-pool slot. No leak, no UAF.
391 }
392
393 #[test]
394 fn reset_called_on_return() {
395 let reset_count = Arc::new(AtomicUsize::new(0));
396 let reset_count_clone = Arc::clone(&reset_count);
397
398 let acquirer = Pool::new(
399 2,
400 || 0u32,
401 move |_| {
402 reset_count_clone.fetch_add(1, Ordering::Relaxed);
403 },
404 );
405
406 let a = acquirer.try_acquire().unwrap();
407 assert_eq!(reset_count.load(Ordering::Relaxed), 0);
408
409 drop(a);
410 assert_eq!(reset_count.load(Ordering::Relaxed), 1);
411
412 let b = acquirer.try_acquire().unwrap();
413 let c = acquirer.try_acquire().unwrap();
414 drop(b);
415 drop(c);
416 assert_eq!(reset_count.load(Ordering::Relaxed), 3);
417 }
418
419 #[test]
420 fn lifo_ordering() {
421 let acquirer = Pool::new(3, Vec::<u8>::new, Vec::clear);
422
423 let mut guard_a = acquirer.try_acquire().unwrap();
424 let mut guard_b = acquirer.try_acquire().unwrap();
425 let mut guard_c = acquirer.try_acquire().unwrap();
426
427 guard_a.push(1);
428 guard_b.push(2);
429 guard_c.push(3);
430
431 // Return in order: a, b, c
432 drop(guard_a);
433 drop(guard_b);
434 drop(guard_c);
435
436 // Should get back in LIFO order: c, b, a
437 let reacquired_1 = acquirer.try_acquire().unwrap();
438 assert!(reacquired_1.is_empty()); // reset was called, but this was 'c'
439
440 let reacquired_2 = acquirer.try_acquire().unwrap();
441 assert!(reacquired_2.is_empty()); // this was 'b'
442
443 let reacquired_3 = acquirer.try_acquire().unwrap();
444 assert!(reacquired_3.is_empty()); // this was 'a'
445 }
446
447 #[test]
448 #[should_panic(expected = "capacity must be non-zero")]
449 fn zero_capacity_panics() {
450 let _ = Pool::new(0, || (), |()| {});
451 }
452
453 // =========================================================================
454 // Stress tests
455 // =========================================================================
456
457 #[test]
458 fn stress_single_thread() {
459 let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), Vec::clear);
460
461 for _ in 0..10_000 {
462 let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
463
464 for item in &mut items {
465 item.extend_from_slice(b"data");
466 }
467
468 drop(items);
469 }
470
471 // All items should be back
472 let count = acquirer.available();
473 assert_eq!(count, 100);
474 }
475
476 #[test]
477 fn stress_multi_thread_return() {
478 let acquirer = Pool::new(
479 100,
480 || AtomicUsize::new(0),
481 |v| {
482 v.store(0, Ordering::Relaxed);
483 },
484 );
485
486 let returned = Arc::new(AtomicUsize::new(0));
487
488 thread::scope(|s| {
489 let (tx, rx) = std::sync::mpsc::channel();
490 let returned_clone = Arc::clone(&returned);
491
492 // Single worker thread receives and returns items
493 s.spawn(move || {
494 while let Ok(item) = rx.recv() {
495 let _item: Pooled<AtomicUsize> = item;
496 returned_clone.fetch_add(1, Ordering::Relaxed);
497 // item drops here, returns to pool
498 }
499 });
500
501 // Main thread acquires and sends to worker
502 let mut sent = 0;
503 while sent < 1000 {
504 if let Some(item) = acquirer.try_acquire() {
505 tx.send(item).unwrap();
506 sent += 1;
507 } else {
508 // Pool exhausted, wait a bit for returns
509 thread::yield_now();
510 }
511 }
512 // tx drops here, worker sees disconnect
513 });
514
515 assert_eq!(returned.load(Ordering::Relaxed), 1000);
516 }
517
518 #[test]
519 fn stress_concurrent_return() {
520 // Multiple threads returning simultaneously
521 let acquirer = Pool::new(1000, || 0u64, |_| {});
522
523 // Acquire all items
524 let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
525 assert_eq!(items.len(), 1000);
526
527 // Split items across threads and return concurrently
528 let items_per_thread = 250;
529 let mut item_chunks: Vec<Vec<_>> = Vec::new();
530 let mut iter = items.into_iter();
531 for _ in 0..4 {
532 item_chunks.push(iter.by_ref().take(items_per_thread).collect());
533 }
534
535 thread::scope(|s| {
536 for chunk in item_chunks {
537 s.spawn(move || {
538 for item in chunk {
539 drop(item);
540 }
541 });
542 }
543 });
544
545 // All items should be back
546 let count = acquirer.available();
547 assert_eq!(count, 1000);
548 }
549}