Skip to main content

nexus_pool/
sync.rs

1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14const NONE: usize = usize::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21    value: UnsafeCell<MaybeUninit<T>>,
22    next: AtomicUsize,
23}
24
25// SAFETY: Slot is Send because value is only accessed when the slot is "owned"
26// (popped from free list), so no concurrent access occurs. next is AtomicUsize.
27unsafe impl<T: Send> Send for Slot<T> {}
28// SAFETY: Slot is Sync because value (UnsafeCell) is only accessed when owned
29// (not in the free list), so no data race is possible. next is AtomicUsize (Sync).
30unsafe impl<T: Send + Sync> Sync for Slot<T> {}
31
32// =============================================================================
33// Inner - shared pool state
34// =============================================================================
35
36struct Inner<T> {
37    slots: Box<[Slot<T>]>,
38    free_head: AtomicUsize,
39    free_count: AtomicUsize,
40    reset: Box<dyn Fn(&mut T) + Send + Sync>,
41}
42
43impl<T> Inner<T> {
44    /// Push a slot back onto the free list. Called from any thread.
45    fn push(&self, idx: usize, mut value: T) {
46        // Reset the value
47        (self.reset)(&mut value);
48
49        // SAFETY: We own this slot (it was popped from the free list via CAS).
50        // No other thread can access it until we push it back. MaybeUninit::write
51        // initializes the slot for the next acquirer.
52        unsafe {
53            (*self.slots[idx].value.get()).write(value);
54        }
55
56        // Link into free list with CAS loop
57        loop {
58            let head = self.free_head.load(Ordering::Relaxed);
59            self.slots[idx].next.store(head, Ordering::Relaxed);
60
61            match self.free_head.compare_exchange_weak(
62                head,
63                idx,
64                Ordering::Release, // Publishes value write + next write
65                Ordering::Relaxed, // Failure just retries
66            ) {
67                Ok(_) => {
68                    self.free_count.fetch_add(1, Ordering::Relaxed);
69                    return;
70                }
71                Err(_) => std::hint::spin_loop(),
72            }
73        }
74    }
75
76    /// Pop a slot from the free list. Called only from Acquirer thread.
77    fn pop(&self) -> Option<usize> {
78        loop {
79            let head = self.free_head.load(Ordering::Acquire);
80            if head == NONE {
81                return None;
82            }
83
84            // Read next - safe because we Acquired head, syncs with pusher's Release
85            let next = self.slots[head].next.load(Ordering::Relaxed);
86
87            match self.free_head.compare_exchange_weak(
88                head,
89                next,
90                Ordering::Acquire, // Syncs with pusher's Release
91                Ordering::Acquire, // On fail, need to see new head
92            ) {
93                Ok(_) => {
94                    self.free_count.fetch_sub(1, Ordering::Relaxed);
95                    return Some(head);
96                }
97                Err(_) => {
98                    // Pusher added something newer - retry for hotter item
99                    std::hint::spin_loop();
100                }
101            }
102        }
103    }
104
105    /// Get reference to value at index.
106    ///
107    /// # Safety
108    ///
109    /// Caller must own the slot (have popped it) and slot must contain valid value.
110    unsafe fn read_value(&self, idx: usize) -> T {
111        // SAFETY: Caller guarantees slot was popped (owned) and contains a valid value
112        // written by new() or push(). assume_init_read moves the value out without
113        // dropping the MaybeUninit, which is correct since the slot will be rewritten
114        // on the next push.
115        unsafe { (*self.slots[idx].value.get()).assume_init_read() }
116    }
117}
118
119impl<T> Drop for Inner<T> {
120    fn drop(&mut self) {
121        // Inner::drop runs only when the last Arc dies — which means
122        // no `Pooled` guards are alive (each guard holds a strong
123        // Arc). So every slot that was ever acquired has been pushed
124        // back via `Pooled::drop`, and the free list now contains
125        // every slot we own. Drop them all here. The only way a slot
126        // can be missing from the free list at this point is if a
127        // reset closure panicked during a guard's drop — that path
128        // leaks the value (documented in caveats.md §1) and is the
129        // same as the 1.0.x behavior.
130        let mut idx = *self.free_head.get_mut();
131        while idx != NONE {
132            // SAFETY: Slots in the free list contain valid values (written by new()
133            // or push()). Slots NOT in the free list have been moved out via
134            // assume_init_read in pop and are handled by Pooled's Drop. get_mut is
135            // safe because we have &mut self (exclusive access during drop).
136            unsafe {
137                (*self.slots[idx].value.get()).assume_init_drop();
138            }
139            idx = *self.slots[idx].next.get_mut();
140        }
141        // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
142        // deallocate memory without double-dropping.
143    }
144}
145
146// =============================================================================
147// Pool - the pool and acquire handle combined
148// =============================================================================
149
150/// A bounded pool where one thread acquires and any thread can return.
151///
152/// Only one `Pool` exists per pool. It cannot be cloned or shared
153/// across threads (it is `Send` but not `Sync` or `Clone`).
154///
155/// When the `Pool` is dropped while `Pooled` guards are still alive,
156/// the guards keep `Inner` alive via a strong `Arc`. Each guard, on
157/// drop, returns its value to the (now-orphaned) free list; the last
158/// guard to drop releases `Inner`, which drops every in-pool slot.
159/// See `docs/caveats.md` §2.
160///
161/// # Example
162///
163/// ```
164/// use nexus_pool::sync::Pool;
165///
166/// let acquirer = Pool::new(
167///     100,
168///     || Vec::<u8>::with_capacity(1024),
169///     |v| v.clear(),
170/// );
171///
172/// // Acquirer thread
173/// let mut buf = acquirer.try_acquire().unwrap();
174/// buf.extend_from_slice(b"hello");
175///
176/// // Can send buf to another thread
177/// std::thread::spawn(move || {
178///     println!("{:?}", &*buf);
179///     // buf returns to pool on drop
180/// }).join().unwrap();
181/// ```
182pub struct Pool<T> {
183    inner: Arc<Inner<T>>,
184}
185
186// SAFETY: Pool is Send (can be moved to another thread) but not Sync (not shared).
187// Inner uses atomics for the free list. Values in UnsafeCell are only accessed
188// when a slot is owned (popped via CAS). T: Send ensures values can cross threads.
189// Pool is not Clone — single acquirer enforced at the type level.
190#[allow(clippy::non_send_fields_in_send_ty)]
191unsafe impl<T: Send> Send for Pool<T> {}
192
193impl<T> Pool<T> {
194    /// Creates a pool with `capacity` pre-initialized objects.
195    ///
196    /// # Arguments
197    ///
198    /// * `capacity` - Number of objects to pre-allocate
199    /// * `init` - Factory function to create each object
200    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
201    ///
202    /// # Panics
203    ///
204    /// Panics if capacity is zero or exceeds `usize::MAX - 1`.
205    ///
206    /// The `reset` closure must not panic. If it does, the value is leaked
207    /// and the pool slot is not returned. Use simple operations like
208    /// `Vec::clear()` or field resets.
209    pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
210    where
211        I: FnMut() -> T,
212        R: Fn(&mut T) + Send + Sync + 'static,
213    {
214        assert!(capacity > 0, "capacity must be non-zero");
215        assert!(capacity < NONE, "capacity must be less than {}", NONE);
216
217        // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
218        let slots: Box<[Slot<T>]> = (0..capacity)
219            .map(|i| Slot {
220                value: UnsafeCell::new(MaybeUninit::new(init())),
221                next: AtomicUsize::new(if i + 1 < capacity { i + 1 } else { NONE }),
222            })
223            .collect();
224
225        Self {
226            inner: Arc::new(Inner {
227                slots,
228                free_head: AtomicUsize::new(0), // Head of free list
229                free_count: AtomicUsize::new(capacity),
230                reset: Box::new(reset),
231            }),
232        }
233    }
234
235    /// Attempts to acquire an object from the pool.
236    ///
237    /// Returns `None` if all objects are currently in use.
238    #[inline]
239    pub fn try_acquire(&self) -> Option<Pooled<T>> {
240        self.inner.pop().map(|idx| {
241            // SAFETY: We just popped this slot via CAS (exclusive ownership).
242            // The slot contains a valid value written by new() or a prior push().
243            let value = unsafe { self.inner.read_value(idx) };
244            Pooled {
245                value: ManuallyDrop::new(value),
246                idx,
247                inner: Arc::clone(&self.inner),
248            }
249        })
250    }
251
252    /// Returns the number of available objects.
253    ///
254    /// O(1) — backed by an atomic counter. This is a snapshot and may
255    /// be immediately outdated if other threads are returning objects
256    /// concurrently.
257    #[inline]
258    pub fn available(&self) -> usize {
259        self.inner.free_count.load(Ordering::Relaxed)
260    }
261}
262
263// =============================================================================
264// Pooled - RAII guard
265// =============================================================================
266
267/// RAII guard that returns the object to the pool on drop.
268///
269/// This guard can be sent to other threads. When dropped, the object
270/// is automatically returned to the pool's storage. If the pool was
271/// already dropped, the value goes back into the orphaned `Inner`,
272/// which finally dies (along with every in-pool value) when the last
273/// `Pooled` exits. See `docs/caveats.md` §2.
274#[must_use = "dropping the guard immediately returns the object to the pool"]
275pub struct Pooled<T> {
276    value: ManuallyDrop<T>,
277    idx: usize,
278    inner: Arc<Inner<T>>,
279}
280
281// SAFETY: Pooled owns its value exclusively (ManuallyDrop<T>). The Arc<Inner<T>>
282// is only used during drop to push the slot back via atomic CAS. T: Send ensures
283// the value can cross threads. Arc<T: Send> is itself Send (same as Weak was).
284#[allow(clippy::non_send_fields_in_send_ty)]
285unsafe impl<T: Send> Send for Pooled<T> {}
286// SAFETY: Pooled's &self access goes through Deref to &T, which requires T: Sync.
287// The Arc and idx fields are not mutated through &self.
288unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
289
290impl<T> Deref for Pooled<T> {
291    type Target = T;
292
293    #[inline]
294    fn deref(&self) -> &T {
295        &self.value
296    }
297}
298
299impl<T> DerefMut for Pooled<T> {
300    #[inline]
301    fn deref_mut(&mut self) -> &mut T {
302        &mut self.value
303    }
304}
305
306impl<T> Drop for Pooled<T> {
307    fn drop(&mut self) {
308        // SAFETY: self.inner is a strong Arc — Inner is alive by
309        // construction. Value is valid (ManuallyDrop preserves it
310        // until explicit take/drop). After take, self.value is
311        // consumed; inner.push writes it back to the slot.
312        let value = unsafe { ManuallyDrop::take(&mut self.value) };
313        self.inner.push(self.idx, value);
314        // self.inner Arc drops here: one strong fetch_sub. If we were
315        // the last reference, Inner::drop walks the free list and
316        // assume_init_drops every in-pool slot (including the one we
317        // just pushed). Arc's fetch_sub(Release) + last-drop
318        // fence(Acquire) is the canonical "single-writer drops after
319        // all readers" pattern — sufficient for the refcount alone;
320        // the value transfer is independently ordered by the Release
321        // CAS in `Inner::push`.
322    }
323}
324
325// =============================================================================
326// Tests
327// =============================================================================
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use std::sync::atomic::AtomicUsize;
333    use std::thread;
334
335    #[test]
336    fn basic_acquire_release() {
337        let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
338
339        let mut a = acquirer.try_acquire().unwrap();
340        a.extend_from_slice(b"hello");
341        assert_eq!(&*a, b"hello");
342
343        let _b = acquirer.try_acquire().unwrap();
344        let _c = acquirer.try_acquire().unwrap();
345
346        // Pool exhausted
347        assert!(acquirer.try_acquire().is_none());
348
349        // Return one
350        drop(a);
351
352        // Can acquire again - and it's been cleared
353        let d = acquirer.try_acquire().unwrap();
354        assert!(d.is_empty());
355    }
356
357    #[test]
358    fn cross_thread_return() {
359        let acquirer = Pool::new(2, || 42u32, |_| {});
360
361        let item = acquirer.try_acquire().unwrap();
362        assert_eq!(*item, 42);
363
364        // Send to another thread to drop
365        thread::spawn(move || {
366            assert_eq!(*item, 42);
367            drop(item);
368        })
369        .join()
370        .unwrap();
371
372        // Should be back in pool
373        let item2 = acquirer.try_acquire().unwrap();
374        assert_eq!(*item2, 42);
375    }
376
377    #[test]
378    fn acquirer_dropped_first() {
379        let item;
380        {
381            let acquirer = Pool::new(1, || String::from("test"), String::clear);
382            item = acquirer.try_acquire().unwrap();
383            // acquirer drops here — its Arc decrements but the item
384            // still holds a strong Arc, so Inner survives.
385        }
386        // item still valid — we can access it.
387        assert_eq!(&*item, "test");
388        // item drops here: returns slot to the orphan free list, Arc
389        // hits zero, Inner::drop walks the free list and drops every
390        // in-pool slot. No leak, no UAF.
391    }
392
393    #[test]
394    fn reset_called_on_return() {
395        let reset_count = Arc::new(AtomicUsize::new(0));
396        let reset_count_clone = Arc::clone(&reset_count);
397
398        let acquirer = Pool::new(
399            2,
400            || 0u32,
401            move |_| {
402                reset_count_clone.fetch_add(1, Ordering::Relaxed);
403            },
404        );
405
406        let a = acquirer.try_acquire().unwrap();
407        assert_eq!(reset_count.load(Ordering::Relaxed), 0);
408
409        drop(a);
410        assert_eq!(reset_count.load(Ordering::Relaxed), 1);
411
412        let b = acquirer.try_acquire().unwrap();
413        let c = acquirer.try_acquire().unwrap();
414        drop(b);
415        drop(c);
416        assert_eq!(reset_count.load(Ordering::Relaxed), 3);
417    }
418
419    #[test]
420    fn lifo_ordering() {
421        let acquirer = Pool::new(3, Vec::<u8>::new, Vec::clear);
422
423        let mut guard_a = acquirer.try_acquire().unwrap();
424        let mut guard_b = acquirer.try_acquire().unwrap();
425        let mut guard_c = acquirer.try_acquire().unwrap();
426
427        guard_a.push(1);
428        guard_b.push(2);
429        guard_c.push(3);
430
431        // Return in order: a, b, c
432        drop(guard_a);
433        drop(guard_b);
434        drop(guard_c);
435
436        // Should get back in LIFO order: c, b, a
437        let reacquired_1 = acquirer.try_acquire().unwrap();
438        assert!(reacquired_1.is_empty()); // reset was called, but this was 'c'
439
440        let reacquired_2 = acquirer.try_acquire().unwrap();
441        assert!(reacquired_2.is_empty()); // this was 'b'
442
443        let reacquired_3 = acquirer.try_acquire().unwrap();
444        assert!(reacquired_3.is_empty()); // this was 'a'
445    }
446
447    #[test]
448    #[should_panic(expected = "capacity must be non-zero")]
449    fn zero_capacity_panics() {
450        let _ = Pool::new(0, || (), |()| {});
451    }
452
453    // =========================================================================
454    // Stress tests
455    // =========================================================================
456
457    #[test]
458    fn stress_single_thread() {
459        let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), Vec::clear);
460
461        for _ in 0..10_000 {
462            let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
463
464            for item in &mut items {
465                item.extend_from_slice(b"data");
466            }
467
468            drop(items);
469        }
470
471        // All items should be back
472        let count = acquirer.available();
473        assert_eq!(count, 100);
474    }
475
476    #[test]
477    fn stress_multi_thread_return() {
478        let acquirer = Pool::new(
479            100,
480            || AtomicUsize::new(0),
481            |v| {
482                v.store(0, Ordering::Relaxed);
483            },
484        );
485
486        let returned = Arc::new(AtomicUsize::new(0));
487
488        thread::scope(|s| {
489            let (tx, rx) = std::sync::mpsc::channel();
490            let returned_clone = Arc::clone(&returned);
491
492            // Single worker thread receives and returns items
493            s.spawn(move || {
494                while let Ok(item) = rx.recv() {
495                    let _item: Pooled<AtomicUsize> = item;
496                    returned_clone.fetch_add(1, Ordering::Relaxed);
497                    // item drops here, returns to pool
498                }
499            });
500
501            // Main thread acquires and sends to worker
502            let mut sent = 0;
503            while sent < 1000 {
504                if let Some(item) = acquirer.try_acquire() {
505                    tx.send(item).unwrap();
506                    sent += 1;
507                } else {
508                    // Pool exhausted, wait a bit for returns
509                    thread::yield_now();
510                }
511            }
512            // tx drops here, worker sees disconnect
513        });
514
515        assert_eq!(returned.load(Ordering::Relaxed), 1000);
516    }
517
518    #[test]
519    fn stress_concurrent_return() {
520        // Multiple threads returning simultaneously
521        let acquirer = Pool::new(1000, || 0u64, |_| {});
522
523        // Acquire all items
524        let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
525        assert_eq!(items.len(), 1000);
526
527        // Split items across threads and return concurrently
528        let items_per_thread = 250;
529        let mut item_chunks: Vec<Vec<_>> = Vec::new();
530        let mut iter = items.into_iter();
531        for _ in 0..4 {
532            item_chunks.push(iter.by_ref().take(items_per_thread).collect());
533        }
534
535        thread::scope(|s| {
536            for chunk in item_chunks {
537                s.spawn(move || {
538                    for item in chunk {
539                        drop(item);
540                    }
541                });
542            }
543        });
544
545        // All items should be back
546        let count = acquirer.available();
547        assert_eq!(count, 1000);
548    }
549}