Skip to main content

nexus_pool/
sync.rs

1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::{Arc, Weak};
13
14const NONE: usize = usize::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21    value: UnsafeCell<MaybeUninit<T>>,
22    next: AtomicUsize,
23}
24
25// SAFETY: Slot is Send because value is only accessed when the slot is "owned"
26// (popped from free list), so no concurrent access occurs. next is AtomicUsize.
27unsafe impl<T: Send> Send for Slot<T> {}
28// SAFETY: Slot is Sync because value (UnsafeCell) is only accessed when owned
29// (not in the free list), so no data race is possible. next is AtomicUsize (Sync).
30unsafe impl<T: Send + Sync> Sync for Slot<T> {}
31
32// =============================================================================
33// Inner - shared pool state
34// =============================================================================
35
36struct Inner<T> {
37    slots: Box<[Slot<T>]>,
38    free_head: AtomicUsize,
39    free_count: AtomicUsize,
40    reset: Box<dyn Fn(&mut T) + Send + Sync>,
41}
42
43impl<T> Inner<T> {
44    /// Push a slot back onto the free list. Called from any thread.
45    fn push(&self, idx: usize, mut value: T) {
46        // Reset the value
47        (self.reset)(&mut value);
48
49        // SAFETY: We own this slot (it was popped from the free list via CAS).
50        // No other thread can access it until we push it back. MaybeUninit::write
51        // initializes the slot for the next acquirer.
52        unsafe {
53            (*self.slots[idx].value.get()).write(value);
54        }
55
56        // Link into free list with CAS loop
57        loop {
58            let head = self.free_head.load(Ordering::Relaxed);
59            self.slots[idx].next.store(head, Ordering::Relaxed);
60
61            match self.free_head.compare_exchange_weak(
62                head,
63                idx,
64                Ordering::Release, // Publishes value write + next write
65                Ordering::Relaxed, // Failure just retries
66            ) {
67                Ok(_) => {
68                    self.free_count.fetch_add(1, Ordering::Relaxed);
69                    return;
70                }
71                Err(_) => std::hint::spin_loop(),
72            }
73        }
74    }
75
76    /// Pop a slot from the free list. Called only from Acquirer thread.
77    fn pop(&self) -> Option<usize> {
78        loop {
79            let head = self.free_head.load(Ordering::Acquire);
80            if head == NONE {
81                return None;
82            }
83
84            // Read next - safe because we Acquired head, syncs with pusher's Release
85            let next = self.slots[head].next.load(Ordering::Relaxed);
86
87            match self.free_head.compare_exchange_weak(
88                head,
89                next,
90                Ordering::Acquire, // Syncs with pusher's Release
91                Ordering::Acquire, // On fail, need to see new head
92            ) {
93                Ok(_) => {
94                    self.free_count.fetch_sub(1, Ordering::Relaxed);
95                    return Some(head);
96                }
97                Err(_) => {
98                    // Pusher added something newer - retry for hotter item
99                    std::hint::spin_loop();
100                }
101            }
102        }
103    }
104
105    /// Get reference to value at index.
106    ///
107    /// # Safety
108    ///
109    /// Caller must own the slot (have popped it) and slot must contain valid value.
110    unsafe fn read_value(&self, idx: usize) -> T {
111        // SAFETY: Caller guarantees slot was popped (owned) and contains a valid value
112        // written by new() or push(). assume_init_read moves the value out without
113        // dropping the MaybeUninit, which is correct since the slot will be rewritten
114        // on the next push.
115        unsafe { (*self.slots[idx].value.get()).assume_init_read() }
116    }
117}
118
119impl<T> Drop for Inner<T> {
120    fn drop(&mut self) {
121        // Only drop values that are currently in the free list.
122        // Values that are "out" (held by Pooled) have been moved
123        // out of the slot, and the guard's Drop impl will handle them
124        // (either returning to pool, or dropping directly if pool is gone).
125        let mut idx = *self.free_head.get_mut();
126        while idx != NONE {
127            // SAFETY: Slots in the free list contain valid values (written by new()
128            // or push()). Slots NOT in the free list have been moved out via
129            // assume_init_read in pop and are handled by Pooled's Drop. get_mut is
130            // safe because we have &mut self (exclusive access during drop).
131            unsafe {
132                (*self.slots[idx].value.get()).assume_init_drop();
133            }
134            idx = *self.slots[idx].next.get_mut();
135        }
136        // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
137        // deallocate memory without double-dropping.
138    }
139}
140
141// =============================================================================
142// Pool - the pool and acquire handle combined
143// =============================================================================
144
145/// A bounded pool where one thread acquires and any thread can return.
146///
147/// Only one `Pool` exists per pool. It cannot be cloned or shared
148/// across threads (it is `Send` but not `Sync` or `Clone`).
149///
150/// When the `Pool` is dropped, outstanding `Pooled` guards
151/// will drop their values directly instead of returning them to the pool.
152///
153/// # Example
154///
155/// ```
156/// use nexus_pool::sync::Pool;
157///
158/// let acquirer = Pool::new(
159///     100,
160///     || Vec::<u8>::with_capacity(1024),
161///     |v| v.clear(),
162/// );
163///
164/// // Acquirer thread
165/// let mut buf = acquirer.try_acquire().unwrap();
166/// buf.extend_from_slice(b"hello");
167///
168/// // Can send buf to another thread
169/// std::thread::spawn(move || {
170///     println!("{:?}", &*buf);
171///     // buf returns to pool on drop
172/// }).join().unwrap();
173/// ```
174pub struct Pool<T> {
175    inner: Arc<Inner<T>>,
176}
177
178// SAFETY: Pool is Send (can be moved to another thread) but not Sync (not shared).
179// Inner uses atomics for the free list. Values in UnsafeCell are only accessed
180// when a slot is owned (popped via CAS). T: Send ensures values can cross threads.
181// Pool is not Clone — single acquirer enforced at the type level.
182#[allow(clippy::non_send_fields_in_send_ty)]
183unsafe impl<T: Send> Send for Pool<T> {}
184
185impl<T> Pool<T> {
186    /// Creates a pool with `capacity` pre-initialized objects.
187    ///
188    /// # Arguments
189    ///
190    /// * `capacity` - Number of objects to pre-allocate
191    /// * `init` - Factory function to create each object
192    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
193    ///
194    /// # Panics
195    ///
196    /// Panics if capacity is zero or exceeds `usize::MAX - 1`.
197    ///
198    /// The `reset` closure must not panic. If it does, the value is leaked
199    /// and the pool slot is not returned. Use simple operations like
200    /// `Vec::clear()` or field resets.
201    pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
202    where
203        I: FnMut() -> T,
204        R: Fn(&mut T) + Send + Sync + 'static,
205    {
206        assert!(capacity > 0, "capacity must be non-zero");
207        assert!(
208            capacity < NONE,
209            "capacity must be less than {}",
210            NONE
211        );
212
213        // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
214        let slots: Box<[Slot<T>]> = (0..capacity)
215            .map(|i| Slot {
216                value: UnsafeCell::new(MaybeUninit::new(init())),
217                next: AtomicUsize::new(if i + 1 < capacity { i + 1 } else { NONE }),
218            })
219            .collect();
220
221        Self {
222            inner: Arc::new(Inner {
223                slots,
224                free_head: AtomicUsize::new(0), // Head of free list
225                free_count: AtomicUsize::new(capacity),
226                reset: Box::new(reset),
227            }),
228        }
229    }
230
231    /// Attempts to acquire an object from the pool.
232    ///
233    /// Returns `None` if all objects are currently in use.
234    pub fn try_acquire(&self) -> Option<Pooled<T>> {
235        self.inner.pop().map(|idx| {
236            // SAFETY: We just popped this slot via CAS (exclusive ownership).
237            // The slot contains a valid value written by new() or a prior push().
238            let value = unsafe { self.inner.read_value(idx) };
239            Pooled {
240                value: ManuallyDrop::new(value),
241                idx,
242                inner: Arc::downgrade(&self.inner),
243            }
244        })
245    }
246
247    /// Returns the number of available objects.
248    ///
249    /// O(1) — backed by an atomic counter. This is a snapshot and may
250    /// be immediately outdated if other threads are returning objects
251    /// concurrently.
252    pub fn available(&self) -> usize {
253        self.inner.free_count.load(Ordering::Relaxed)
254    }
255}
256
257// =============================================================================
258// Pooled - RAII guard
259// =============================================================================
260
261/// RAII guard that returns the object to the pool on drop.
262///
263/// This guard can be sent to other threads. When dropped, the object
264/// is automatically returned to the pool (if the pool still exists).
265pub struct Pooled<T> {
266    value: ManuallyDrop<T>,
267    idx: usize,
268    inner: Weak<Inner<T>>,
269}
270
271// SAFETY: Pooled owns its value exclusively (ManuallyDrop<T>). The Weak<Inner<T>>
272// is only used during drop to push the slot back via atomic CAS. T: Send ensures
273// the value can cross threads.
274#[allow(clippy::non_send_fields_in_send_ty)]
275unsafe impl<T: Send> Send for Pooled<T> {}
276// SAFETY: Pooled's &self access goes through Deref to &T, which requires T: Sync.
277// The Weak and idx fields are not mutated through &self.
278unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
279
280impl<T> Deref for Pooled<T> {
281    type Target = T;
282
283    #[inline]
284    fn deref(&self) -> &T {
285        &self.value
286    }
287}
288
289impl<T> DerefMut for Pooled<T> {
290    #[inline]
291    fn deref_mut(&mut self) -> &mut T {
292        &mut self.value
293    }
294}
295
296impl<T> Drop for Pooled<T> {
297    fn drop(&mut self) {
298        if let Some(inner) = self.inner.upgrade() {
299            // SAFETY: Value is valid (ManuallyDrop preserves it until explicit take/drop).
300            // After take, self.value is consumed; inner.push writes it back to the slot.
301            let value = unsafe { ManuallyDrop::take(&mut self.value) };
302            inner.push(self.idx, value);
303        } else {
304            // SAFETY: Pool is gone. Value is valid and must be dropped to avoid a leak.
305            // After drop, we never touch self.value again.
306            unsafe { ManuallyDrop::drop(&mut self.value) };
307        }
308    }
309}
310
311// =============================================================================
312// Tests
313// =============================================================================
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use std::sync::atomic::AtomicUsize;
319    use std::thread;
320
321    #[test]
322    fn basic_acquire_release() {
323        let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
324
325        let mut a = acquirer.try_acquire().unwrap();
326        a.extend_from_slice(b"hello");
327        assert_eq!(&*a, b"hello");
328
329        let _b = acquirer.try_acquire().unwrap();
330        let _c = acquirer.try_acquire().unwrap();
331
332        // Pool exhausted
333        assert!(acquirer.try_acquire().is_none());
334
335        // Return one
336        drop(a);
337
338        // Can acquire again - and it's been cleared
339        let d = acquirer.try_acquire().unwrap();
340        assert!(d.is_empty());
341    }
342
343    #[test]
344    fn cross_thread_return() {
345        let acquirer = Pool::new(2, || 42u32, |_| {});
346
347        let item = acquirer.try_acquire().unwrap();
348        assert_eq!(*item, 42);
349
350        // Send to another thread to drop
351        thread::spawn(move || {
352            assert_eq!(*item, 42);
353            drop(item);
354        })
355        .join()
356        .unwrap();
357
358        // Should be back in pool
359        let item2 = acquirer.try_acquire().unwrap();
360        assert_eq!(*item2, 42);
361    }
362
363    #[test]
364    fn acquirer_dropped_first() {
365        let item;
366        {
367            let acquirer = Pool::new(1, || String::from("test"), String::clear);
368            item = acquirer.try_acquire().unwrap();
369            // acquirer drops here
370        }
371        // item still valid - we can access it
372        assert_eq!(&*item, "test");
373        // item drops here - should not panic
374    }
375
376    #[test]
377    fn reset_called_on_return() {
378        let reset_count = Arc::new(AtomicUsize::new(0));
379        let reset_count_clone = Arc::clone(&reset_count);
380
381        let acquirer = Pool::new(
382            2,
383            || 0u32,
384            move |_| {
385                reset_count_clone.fetch_add(1, Ordering::Relaxed);
386            },
387        );
388
389        let a = acquirer.try_acquire().unwrap();
390        assert_eq!(reset_count.load(Ordering::Relaxed), 0);
391
392        drop(a);
393        assert_eq!(reset_count.load(Ordering::Relaxed), 1);
394
395        let b = acquirer.try_acquire().unwrap();
396        let c = acquirer.try_acquire().unwrap();
397        drop(b);
398        drop(c);
399        assert_eq!(reset_count.load(Ordering::Relaxed), 3);
400    }
401
402    #[test]
403    fn lifo_ordering() {
404        let acquirer = Pool::new(3, Vec::<u8>::new, Vec::clear);
405
406        let mut guard_a = acquirer.try_acquire().unwrap();
407        let mut guard_b = acquirer.try_acquire().unwrap();
408        let mut guard_c = acquirer.try_acquire().unwrap();
409
410        guard_a.push(1);
411        guard_b.push(2);
412        guard_c.push(3);
413
414        // Return in order: a, b, c
415        drop(guard_a);
416        drop(guard_b);
417        drop(guard_c);
418
419        // Should get back in LIFO order: c, b, a
420        let reacquired_1 = acquirer.try_acquire().unwrap();
421        assert!(reacquired_1.is_empty()); // reset was called, but this was 'c'
422
423        let reacquired_2 = acquirer.try_acquire().unwrap();
424        assert!(reacquired_2.is_empty()); // this was 'b'
425
426        let reacquired_3 = acquirer.try_acquire().unwrap();
427        assert!(reacquired_3.is_empty()); // this was 'a'
428    }
429
430    #[test]
431    #[should_panic(expected = "capacity must be non-zero")]
432    fn zero_capacity_panics() {
433        let _ = Pool::new(0, || (), |()| {});
434    }
435
436    // =========================================================================
437    // Stress tests
438    // =========================================================================
439
440    #[test]
441    fn stress_single_thread() {
442        let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), Vec::clear);
443
444        for _ in 0..10_000 {
445            let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
446
447            for item in &mut items {
448                item.extend_from_slice(b"data");
449            }
450
451            drop(items);
452        }
453
454        // All items should be back
455        let count = acquirer.available();
456        assert_eq!(count, 100);
457    }
458
459    #[test]
460    fn stress_multi_thread_return() {
461        let acquirer = Pool::new(
462            100,
463            || AtomicUsize::new(0),
464            |v| {
465                v.store(0, Ordering::Relaxed);
466            },
467        );
468
469        let returned = Arc::new(AtomicUsize::new(0));
470
471        thread::scope(|s| {
472            let (tx, rx) = std::sync::mpsc::channel();
473            let returned_clone = Arc::clone(&returned);
474
475            // Single worker thread receives and returns items
476            s.spawn(move || {
477                while let Ok(item) = rx.recv() {
478                    let _item: Pooled<AtomicUsize> = item;
479                    returned_clone.fetch_add(1, Ordering::Relaxed);
480                    // item drops here, returns to pool
481                }
482            });
483
484            // Main thread acquires and sends to worker
485            let mut sent = 0;
486            while sent < 1000 {
487                if let Some(item) = acquirer.try_acquire() {
488                    tx.send(item).unwrap();
489                    sent += 1;
490                } else {
491                    // Pool exhausted, wait a bit for returns
492                    thread::yield_now();
493                }
494            }
495            // tx drops here, worker sees disconnect
496        });
497
498        assert_eq!(returned.load(Ordering::Relaxed), 1000);
499    }
500
501    #[test]
502    fn stress_concurrent_return() {
503        // Multiple threads returning simultaneously
504        let acquirer = Pool::new(1000, || 0u64, |_| {});
505
506        // Acquire all items
507        let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
508        assert_eq!(items.len(), 1000);
509
510        // Split items across threads and return concurrently
511        let items_per_thread = 250;
512        let mut item_chunks: Vec<Vec<_>> = Vec::new();
513        let mut iter = items.into_iter();
514        for _ in 0..4 {
515            item_chunks.push(iter.by_ref().take(items_per_thread).collect());
516        }
517
518        thread::scope(|s| {
519            for chunk in item_chunks {
520                s.spawn(move || {
521                    for item in chunk {
522                        drop(item);
523                    }
524                });
525            }
526        });
527
528        // All items should be back
529        let count = acquirer.available();
530        assert_eq!(count, 1000);
531    }
532}