nexus_pool/
sync.rs

1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::{Arc, Weak};
13
14const NONE: u32 = u32::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21    value: UnsafeCell<MaybeUninit<T>>,
22    next: AtomicU32,
23}
24
25// Safety: Slot is Send+Sync because:
26// - value is only accessed when slot is "owned" (popped from free list)
27// - next is atomic
28unsafe impl<T: Send> Send for Slot<T> {}
29unsafe impl<T: Send + Sync> Sync for Slot<T> {}
30
31// =============================================================================
32// Inner - shared pool state
33// =============================================================================
34
35struct Inner<T> {
36    slots: Box<[Slot<T>]>,
37    free_head: AtomicU32,
38    reset: Box<dyn Fn(&mut T) + Send + Sync>,
39}
40
41impl<T> Inner<T> {
42    /// Push a slot back onto the free list. Called from any thread.
43    #[inline]
44    fn push(&self, idx: u32, mut value: T) {
45        // Reset the value
46        (self.reset)(&mut value);
47
48        // Write value back to slot
49        // Safety: we own this slot (it was popped), no one else can access it
50        unsafe {
51            (*self.slots[idx as usize].value.get()).write(value);
52        }
53
54        // Link into free list with CAS loop
55        loop {
56            let head = self.free_head.load(Ordering::Relaxed);
57            self.slots[idx as usize].next.store(head, Ordering::Relaxed);
58
59            match self.free_head.compare_exchange_weak(
60                head,
61                idx,
62                Ordering::Release, // Publishes value write + next write
63                Ordering::Relaxed, // Failure just retries
64            ) {
65                Ok(_) => return,
66                Err(_) => std::hint::spin_loop(),
67            }
68        }
69    }
70
71    /// Pop a slot from the free list. Called only from Acquirer thread.
72    #[inline]
73    fn pop(&self) -> Option<u32> {
74        loop {
75            let head = self.free_head.load(Ordering::Acquire);
76            if head == NONE {
77                return None;
78            }
79
80            // Read next - safe because we Acquired head, syncs with pusher's Release
81            let next = self.slots[head as usize].next.load(Ordering::Relaxed);
82
83            match self.free_head.compare_exchange_weak(
84                head,
85                next,
86                Ordering::Acquire, // Syncs with pusher's Release
87                Ordering::Acquire, // On fail, need to see new head
88            ) {
89                Ok(_) => return Some(head),
90                Err(_) => {
91                    // Pusher added something newer - retry for hotter item
92                    std::hint::spin_loop();
93                }
94            }
95        }
96    }
97
98    /// Get reference to value at index.
99    ///
100    /// # Safety
101    ///
102    /// Caller must own the slot (have popped it) and slot must contain valid value.
103    #[inline]
104    unsafe fn read_value(&self, idx: u32) -> T {
105        unsafe { (*self.slots[idx as usize].value.get()).assume_init_read() }
106    }
107}
108
109impl<T> Drop for Inner<T> {
110    fn drop(&mut self) {
111        // Only drop values that are currently in the free list.
112        // Values that are "out" (held by Pooled) have been moved
113        // out of the slot, and the guard's Drop impl will handle them
114        // (either returning to pool, or dropping directly if pool is gone).
115        let mut idx = *self.free_head.get_mut();
116        while idx != NONE {
117            unsafe {
118                (*self.slots[idx as usize].value.get()).assume_init_drop();
119            }
120            idx = *self.slots[idx as usize].next.get_mut();
121        }
122        // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
123        // deallocate memory without double-dropping.
124    }
125}
126
127// =============================================================================
128// Pool - the pool and acquire handle combined
129// =============================================================================
130
131/// A bounded pool where one thread acquires and any thread can return.
132///
133/// Only one `Pool` exists per pool. It cannot be cloned or shared
134/// across threads (it is `Send` but not `Sync` or `Clone`).
135///
136/// When the `Pool` is dropped, outstanding `Pooled` guards
137/// will drop their values directly instead of returning them to the pool.
138///
139/// # Example
140///
141/// ```
142/// use nexus_pool::sync::Pool;
143///
144/// let acquirer = Pool::new(
145///     100,
146///     || Vec::<u8>::with_capacity(1024),
147///     |v| v.clear(),
148/// );
149///
150/// // Acquirer thread
151/// let mut buf = acquirer.try_acquire().unwrap();
152/// buf.extend_from_slice(b"hello");
153///
154/// // Can send buf to another thread
155/// std::thread::spawn(move || {
156///     println!("{:?}", &*buf);
157///     // buf returns to pool on drop
158/// }).join().unwrap();
159/// ```
160pub struct Pool<T> {
161    inner: Arc<Inner<T>>,
162}
163
164// Pool is Send (can be moved to another thread) but not Sync (not shared)
165// Not Clone - only one acquirer exists
166unsafe impl<T: Send> Send for Pool<T> {}
167
168impl<T> Pool<T> {
169    /// Creates a pool with `capacity` pre-initialized objects.
170    ///
171    /// # Arguments
172    ///
173    /// * `capacity` - Number of objects to pre-allocate
174    /// * `init` - Factory function to create each object
175    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
176    ///
177    /// # Panics
178    ///
179    /// Panics if capacity is zero or exceeds `u32::MAX - 1`.
180    pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
181    where
182        I: FnMut() -> T,
183        R: Fn(&mut T) + Send + Sync + 'static,
184    {
185        assert!(capacity > 0, "capacity must be non-zero");
186        assert!(
187            capacity < NONE as usize,
188            "capacity must be less than {}",
189            NONE
190        );
191
192        // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
193        let slots: Box<[Slot<T>]> = (0..capacity)
194            .map(|i| Slot {
195                value: UnsafeCell::new(MaybeUninit::new(init())),
196                next: AtomicU32::new(if i + 1 < capacity {
197                    (i + 1) as u32
198                } else {
199                    NONE
200                }),
201            })
202            .collect();
203
204        Self {
205            inner: Arc::new(Inner {
206                slots,
207                free_head: AtomicU32::new(0), // Head of free list
208                reset: Box::new(reset),
209            }),
210        }
211    }
212
213    /// Attempts to acquire an object from the pool.
214    ///
215    /// Returns `None` if all objects are currently in use.
216    #[inline]
217    pub fn try_acquire(&self) -> Option<Pooled<T>> {
218        self.inner.pop().map(|idx| {
219            // Take value from slot
220            // Safety: we just popped this slot, we own it, it contains valid value
221            let value = unsafe { self.inner.read_value(idx) };
222            Pooled {
223                value: ManuallyDrop::new(value),
224                idx,
225                inner: Arc::downgrade(&self.inner),
226            }
227        })
228    }
229
230    /// Returns the number of available objects.
231    ///
232    /// Note: This is a snapshot and may be immediately outdated if other
233    /// threads are returning objects concurrently.
234    pub fn available(&self) -> usize {
235        let mut count = 0;
236        let mut idx = self.inner.free_head.load(Ordering::Relaxed);
237        while idx != NONE {
238            count += 1;
239            idx = self.inner.slots[idx as usize].next.load(Ordering::Relaxed);
240        }
241        count
242    }
243}
244
245// =============================================================================
246// Pooled - RAII guard
247// =============================================================================
248
249/// RAII guard that returns the object to the pool on drop.
250///
251/// This guard can be sent to other threads. When dropped, the object
252/// is automatically returned to the pool (if the pool still exists).
253pub struct Pooled<T> {
254    value: ManuallyDrop<T>,
255    idx: u32,
256    inner: Weak<Inner<T>>,
257}
258
259// Pooled is Send + Sync - can be sent anywhere, dropped from anywhere
260unsafe impl<T: Send> Send for Pooled<T> {}
261unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
262
263impl<T> Deref for Pooled<T> {
264    type Target = T;
265
266    #[inline]
267    fn deref(&self) -> &T {
268        &self.value
269    }
270}
271
272impl<T> DerefMut for Pooled<T> {
273    #[inline]
274    fn deref_mut(&mut self) -> &mut T {
275        &mut self.value
276    }
277}
278
279impl<T> Drop for Pooled<T> {
280    fn drop(&mut self) {
281        if let Some(inner) = self.inner.upgrade() {
282            // Pool still alive - return value
283            let value = unsafe { ManuallyDrop::take(&mut self.value) };
284            inner.push(self.idx, value);
285        } else {
286            // Pool is gone - just drop the value
287            unsafe { ManuallyDrop::drop(&mut self.value) };
288        }
289    }
290}
291
292// =============================================================================
293// Tests
294// =============================================================================
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use std::sync::atomic::AtomicUsize;
300    use std::thread;
301
302    #[test]
303    fn basic_acquire_release() {
304        let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), |v| v.clear());
305
306        let mut a = acquirer.try_acquire().unwrap();
307        a.extend_from_slice(b"hello");
308        assert_eq!(&*a, b"hello");
309
310        let _b = acquirer.try_acquire().unwrap();
311        let _c = acquirer.try_acquire().unwrap();
312
313        // Pool exhausted
314        assert!(acquirer.try_acquire().is_none());
315
316        // Return one
317        drop(a);
318
319        // Can acquire again - and it's been cleared
320        let d = acquirer.try_acquire().unwrap();
321        assert!(d.is_empty());
322    }
323
324    #[test]
325    fn cross_thread_return() {
326        let acquirer = Pool::new(2, || 42u32, |_| {});
327
328        let item = acquirer.try_acquire().unwrap();
329        assert_eq!(*item, 42);
330
331        // Send to another thread to drop
332        thread::spawn(move || {
333            assert_eq!(*item, 42);
334            drop(item);
335        })
336        .join()
337        .unwrap();
338
339        // Should be back in pool
340        let item2 = acquirer.try_acquire().unwrap();
341        assert_eq!(*item2, 42);
342    }
343
344    #[test]
345    fn acquirer_dropped_first() {
346        let item;
347        {
348            let acquirer = Pool::new(1, || String::from("test"), |s| s.clear());
349            item = acquirer.try_acquire().unwrap();
350            // acquirer drops here
351        }
352        // item still valid - we can access it
353        assert_eq!(&*item, "test");
354        // item drops here - should not panic
355    }
356
357    #[test]
358    fn reset_called_on_return() {
359        let reset_count = Arc::new(AtomicUsize::new(0));
360        let reset_count_clone = Arc::clone(&reset_count);
361
362        let acquirer = Pool::new(
363            2,
364            || 0u32,
365            move |_| {
366                reset_count_clone.fetch_add(1, Ordering::Relaxed);
367            },
368        );
369
370        let a = acquirer.try_acquire().unwrap();
371        assert_eq!(reset_count.load(Ordering::Relaxed), 0);
372
373        drop(a);
374        assert_eq!(reset_count.load(Ordering::Relaxed), 1);
375
376        let b = acquirer.try_acquire().unwrap();
377        let c = acquirer.try_acquire().unwrap();
378        drop(b);
379        drop(c);
380        assert_eq!(reset_count.load(Ordering::Relaxed), 3);
381    }
382
383    #[test]
384    fn lifo_ordering() {
385        let acquirer = Pool::new(3, || Vec::<u8>::new(), |v| v.clear());
386
387        let mut a = acquirer.try_acquire().unwrap();
388        let mut b = acquirer.try_acquire().unwrap();
389        let mut c = acquirer.try_acquire().unwrap();
390
391        a.push(1);
392        b.push(2);
393        c.push(3);
394
395        // Return in order: a, b, c
396        drop(a);
397        drop(b);
398        drop(c);
399
400        // Should get back in LIFO order: c, b, a
401        let x = acquirer.try_acquire().unwrap();
402        assert!(x.is_empty()); // reset was called, but this was 'c'
403
404        let y = acquirer.try_acquire().unwrap();
405        assert!(y.is_empty()); // this was 'b'
406
407        let z = acquirer.try_acquire().unwrap();
408        assert!(z.is_empty()); // this was 'a'
409    }
410
411    #[test]
412    #[should_panic(expected = "capacity must be non-zero")]
413    fn zero_capacity_panics() {
414        let _ = Pool::new(0, || (), |_| {});
415    }
416
417    // =========================================================================
418    // Stress tests
419    // =========================================================================
420
421    #[test]
422    fn stress_single_thread() {
423        let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), |v| v.clear());
424
425        for _ in 0..10_000 {
426            let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
427
428            for item in &mut items {
429                item.extend_from_slice(b"data");
430            }
431
432            drop(items);
433        }
434
435        // All items should be back
436        let count = acquirer.available();
437        assert_eq!(count, 100);
438    }
439
440    #[test]
441    fn stress_multi_thread_return() {
442        let acquirer = Pool::new(
443            100,
444            || AtomicUsize::new(0),
445            |v| {
446                v.store(0, Ordering::Relaxed);
447            },
448        );
449
450        let returned = Arc::new(AtomicUsize::new(0));
451
452        thread::scope(|s| {
453            let (tx, rx) = std::sync::mpsc::channel();
454            let returned_clone = Arc::clone(&returned);
455
456            // Single worker thread receives and returns items
457            s.spawn(move || {
458                while let Ok(item) = rx.recv() {
459                    let _item: Pooled<AtomicUsize> = item;
460                    returned_clone.fetch_add(1, Ordering::Relaxed);
461                    // item drops here, returns to pool
462                }
463            });
464
465            // Main thread acquires and sends to worker
466            let mut sent = 0;
467            while sent < 1000 {
468                if let Some(item) = acquirer.try_acquire() {
469                    tx.send(item).unwrap();
470                    sent += 1;
471                } else {
472                    // Pool exhausted, wait a bit for returns
473                    thread::yield_now();
474                }
475            }
476            // tx drops here, worker sees disconnect
477        });
478
479        assert_eq!(returned.load(Ordering::Relaxed), 1000);
480    }
481
482    #[test]
483    fn stress_concurrent_return() {
484        // Multiple threads returning simultaneously
485        let acquirer = Pool::new(1000, || 0u64, |_| {});
486
487        // Acquire all items
488        let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
489        assert_eq!(items.len(), 1000);
490
491        // Split items across threads and return concurrently
492        let items_per_thread = 250;
493        let mut item_chunks: Vec<Vec<_>> = Vec::new();
494        let mut iter = items.into_iter();
495        for _ in 0..4 {
496            item_chunks.push(iter.by_ref().take(items_per_thread).collect());
497        }
498
499        thread::scope(|s| {
500            for chunk in item_chunks {
501                s.spawn(move || {
502                    for item in chunk {
503                        drop(item);
504                    }
505                });
506            }
507        });
508
509        // All items should be back
510        let count = acquirer.available();
511        assert_eq!(count, 1000);
512    }
513}