Skip to main content

nexus_pool/
sync.rs

1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::{Arc, Weak};
13
14const NONE: u32 = u32::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21    value: UnsafeCell<MaybeUninit<T>>,
22    next: AtomicU32,
23}
24
25// Safety: Slot is Send+Sync because:
26// - value is only accessed when slot is "owned" (popped from free list)
27// - next is atomic
28unsafe impl<T: Send> Send for Slot<T> {}
29unsafe impl<T: Send + Sync> Sync for Slot<T> {}
30
31// =============================================================================
32// Inner - shared pool state
33// =============================================================================
34
35struct Inner<T> {
36    slots: Box<[Slot<T>]>,
37    free_head: AtomicU32,
38    reset: Box<dyn Fn(&mut T) + Send + Sync>,
39}
40
41impl<T> Inner<T> {
42    /// Push a slot back onto the free list. Called from any thread.
43    fn push(&self, idx: u32, mut value: T) {
44        // Reset the value
45        (self.reset)(&mut value);
46
47        // Write value back to slot
48        // Safety: we own this slot (it was popped), no one else can access it
49        unsafe {
50            (*self.slots[idx as usize].value.get()).write(value);
51        }
52
53        // Link into free list with CAS loop
54        loop {
55            let head = self.free_head.load(Ordering::Relaxed);
56            self.slots[idx as usize].next.store(head, Ordering::Relaxed);
57
58            match self.free_head.compare_exchange_weak(
59                head,
60                idx,
61                Ordering::Release, // Publishes value write + next write
62                Ordering::Relaxed, // Failure just retries
63            ) {
64                Ok(_) => return,
65                Err(_) => std::hint::spin_loop(),
66            }
67        }
68    }
69
70    /// Pop a slot from the free list. Called only from Acquirer thread.
71    fn pop(&self) -> Option<u32> {
72        loop {
73            let head = self.free_head.load(Ordering::Acquire);
74            if head == NONE {
75                return None;
76            }
77
78            // Read next - safe because we Acquired head, syncs with pusher's Release
79            let next = self.slots[head as usize].next.load(Ordering::Relaxed);
80
81            match self.free_head.compare_exchange_weak(
82                head,
83                next,
84                Ordering::Acquire, // Syncs with pusher's Release
85                Ordering::Acquire, // On fail, need to see new head
86            ) {
87                Ok(_) => return Some(head),
88                Err(_) => {
89                    // Pusher added something newer - retry for hotter item
90                    std::hint::spin_loop();
91                }
92            }
93        }
94    }
95
96    /// Get reference to value at index.
97    ///
98    /// # Safety
99    ///
100    /// Caller must own the slot (have popped it) and slot must contain valid value.
101    unsafe fn read_value(&self, idx: u32) -> T {
102        unsafe { (*self.slots[idx as usize].value.get()).assume_init_read() }
103    }
104}
105
106impl<T> Drop for Inner<T> {
107    fn drop(&mut self) {
108        // Only drop values that are currently in the free list.
109        // Values that are "out" (held by Pooled) have been moved
110        // out of the slot, and the guard's Drop impl will handle them
111        // (either returning to pool, or dropping directly if pool is gone).
112        let mut idx = *self.free_head.get_mut();
113        while idx != NONE {
114            unsafe {
115                (*self.slots[idx as usize].value.get()).assume_init_drop();
116            }
117            idx = *self.slots[idx as usize].next.get_mut();
118        }
119        // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
120        // deallocate memory without double-dropping.
121    }
122}
123
124// =============================================================================
125// Pool - the pool and acquire handle combined
126// =============================================================================
127
128/// A bounded pool where one thread acquires and any thread can return.
129///
130/// Only one `Pool` exists per pool. It cannot be cloned or shared
131/// across threads (it is `Send` but not `Sync` or `Clone`).
132///
133/// When the `Pool` is dropped, outstanding `Pooled` guards
134/// will drop their values directly instead of returning them to the pool.
135///
136/// # Example
137///
138/// ```
139/// use nexus_pool::sync::Pool;
140///
141/// let acquirer = Pool::new(
142///     100,
143///     || Vec::<u8>::with_capacity(1024),
144///     |v| v.clear(),
145/// );
146///
147/// // Acquirer thread
148/// let mut buf = acquirer.try_acquire().unwrap();
149/// buf.extend_from_slice(b"hello");
150///
151/// // Can send buf to another thread
152/// std::thread::spawn(move || {
153///     println!("{:?}", &*buf);
154///     // buf returns to pool on drop
155/// }).join().unwrap();
156/// ```
157pub struct Pool<T> {
158    inner: Arc<Inner<T>>,
159}
160
161// Pool is Send (can be moved to another thread) but not Sync (not shared)
162// Not Clone - only one acquirer exists
163// Safety: Inner uses atomics for the free list and values are only accessed
164// when a slot is owned (popped). T: Send ensures values can cross threads.
165#[allow(clippy::non_send_fields_in_send_ty)]
166unsafe impl<T: Send> Send for Pool<T> {}
167
168impl<T> Pool<T> {
169    /// Creates a pool with `capacity` pre-initialized objects.
170    ///
171    /// # Arguments
172    ///
173    /// * `capacity` - Number of objects to pre-allocate
174    /// * `init` - Factory function to create each object
175    /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
176    ///
177    /// # Panics
178    ///
179    /// Panics if capacity is zero or exceeds `u32::MAX - 1`.
180    pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
181    where
182        I: FnMut() -> T,
183        R: Fn(&mut T) + Send + Sync + 'static,
184    {
185        assert!(capacity > 0, "capacity must be non-zero");
186        assert!(
187            capacity < NONE as usize,
188            "capacity must be less than {}",
189            NONE
190        );
191
192        // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
193        let slots: Box<[Slot<T>]> = (0..capacity)
194            .map(|i| Slot {
195                value: UnsafeCell::new(MaybeUninit::new(init())),
196                next: AtomicU32::new(if i + 1 < capacity {
197                    (i + 1) as u32
198                } else {
199                    NONE
200                }),
201            })
202            .collect();
203
204        Self {
205            inner: Arc::new(Inner {
206                slots,
207                free_head: AtomicU32::new(0), // Head of free list
208                reset: Box::new(reset),
209            }),
210        }
211    }
212
213    /// Attempts to acquire an object from the pool.
214    ///
215    /// Returns `None` if all objects are currently in use.
216    pub fn try_acquire(&self) -> Option<Pooled<T>> {
217        self.inner.pop().map(|idx| {
218            // Take value from slot
219            // Safety: we just popped this slot, we own it, it contains valid value
220            let value = unsafe { self.inner.read_value(idx) };
221            Pooled {
222                value: ManuallyDrop::new(value),
223                idx,
224                inner: Arc::downgrade(&self.inner),
225            }
226        })
227    }
228
229    /// Returns the number of available objects.
230    ///
231    /// Note: This is a snapshot and may be immediately outdated if other
232    /// threads are returning objects concurrently.
233    pub fn available(&self) -> usize {
234        let mut count = 0;
235        let mut idx = self.inner.free_head.load(Ordering::Relaxed);
236        while idx != NONE {
237            count += 1;
238            idx = self.inner.slots[idx as usize].next.load(Ordering::Relaxed);
239        }
240        count
241    }
242}
243
244// =============================================================================
245// Pooled - RAII guard
246// =============================================================================
247
248/// RAII guard that returns the object to the pool on drop.
249///
250/// This guard can be sent to other threads. When dropped, the object
251/// is automatically returned to the pool (if the pool still exists).
252pub struct Pooled<T> {
253    value: ManuallyDrop<T>,
254    idx: u32,
255    inner: Weak<Inner<T>>,
256}
257
258// Pooled is Send + Sync - can be sent anywhere, dropped from anywhere
259// Safety: Pooled owns its value (ManuallyDrop<T>). The Weak<Inner<T>> is only
260// used during drop to push the slot back via atomic CAS. T: Send ensures the
261// value can cross threads; T: Sync ensures shared references are safe.
262#[allow(clippy::non_send_fields_in_send_ty)]
263unsafe impl<T: Send> Send for Pooled<T> {}
264unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
265
266impl<T> Deref for Pooled<T> {
267    type Target = T;
268
269    #[inline]
270    fn deref(&self) -> &T {
271        &self.value
272    }
273}
274
275impl<T> DerefMut for Pooled<T> {
276    #[inline]
277    fn deref_mut(&mut self) -> &mut T {
278        &mut self.value
279    }
280}
281
282impl<T> Drop for Pooled<T> {
283    fn drop(&mut self) {
284        if let Some(inner) = self.inner.upgrade() {
285            // Pool still alive - return value
286            let value = unsafe { ManuallyDrop::take(&mut self.value) };
287            inner.push(self.idx, value);
288        } else {
289            // Pool is gone - just drop the value
290            unsafe { ManuallyDrop::drop(&mut self.value) };
291        }
292    }
293}
294
295// =============================================================================
296// Tests
297// =============================================================================
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use std::sync::atomic::AtomicUsize;
303    use std::thread;
304
305    #[test]
306    fn basic_acquire_release() {
307        let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), |v| v.clear());
308
309        let mut a = acquirer.try_acquire().unwrap();
310        a.extend_from_slice(b"hello");
311        assert_eq!(&*a, b"hello");
312
313        let _b = acquirer.try_acquire().unwrap();
314        let _c = acquirer.try_acquire().unwrap();
315
316        // Pool exhausted
317        assert!(acquirer.try_acquire().is_none());
318
319        // Return one
320        drop(a);
321
322        // Can acquire again - and it's been cleared
323        let d = acquirer.try_acquire().unwrap();
324        assert!(d.is_empty());
325    }
326
327    #[test]
328    fn cross_thread_return() {
329        let acquirer = Pool::new(2, || 42u32, |_| {});
330
331        let item = acquirer.try_acquire().unwrap();
332        assert_eq!(*item, 42);
333
334        // Send to another thread to drop
335        thread::spawn(move || {
336            assert_eq!(*item, 42);
337            drop(item);
338        })
339        .join()
340        .unwrap();
341
342        // Should be back in pool
343        let item2 = acquirer.try_acquire().unwrap();
344        assert_eq!(*item2, 42);
345    }
346
347    #[test]
348    fn acquirer_dropped_first() {
349        let item;
350        {
351            let acquirer = Pool::new(1, || String::from("test"), |s| s.clear());
352            item = acquirer.try_acquire().unwrap();
353            // acquirer drops here
354        }
355        // item still valid - we can access it
356        assert_eq!(&*item, "test");
357        // item drops here - should not panic
358    }
359
360    #[test]
361    fn reset_called_on_return() {
362        let reset_count = Arc::new(AtomicUsize::new(0));
363        let reset_count_clone = Arc::clone(&reset_count);
364
365        let acquirer = Pool::new(
366            2,
367            || 0u32,
368            move |_| {
369                reset_count_clone.fetch_add(1, Ordering::Relaxed);
370            },
371        );
372
373        let a = acquirer.try_acquire().unwrap();
374        assert_eq!(reset_count.load(Ordering::Relaxed), 0);
375
376        drop(a);
377        assert_eq!(reset_count.load(Ordering::Relaxed), 1);
378
379        let b = acquirer.try_acquire().unwrap();
380        let c = acquirer.try_acquire().unwrap();
381        drop(b);
382        drop(c);
383        assert_eq!(reset_count.load(Ordering::Relaxed), 3);
384    }
385
386    #[test]
387    fn lifo_ordering() {
388        let acquirer = Pool::new(3, || Vec::<u8>::new(), |v| v.clear());
389
390        let mut a = acquirer.try_acquire().unwrap();
391        let mut b = acquirer.try_acquire().unwrap();
392        let mut c = acquirer.try_acquire().unwrap();
393
394        a.push(1);
395        b.push(2);
396        c.push(3);
397
398        // Return in order: a, b, c
399        drop(a);
400        drop(b);
401        drop(c);
402
403        // Should get back in LIFO order: c, b, a
404        let x = acquirer.try_acquire().unwrap();
405        assert!(x.is_empty()); // reset was called, but this was 'c'
406
407        let y = acquirer.try_acquire().unwrap();
408        assert!(y.is_empty()); // this was 'b'
409
410        let z = acquirer.try_acquire().unwrap();
411        assert!(z.is_empty()); // this was 'a'
412    }
413
414    #[test]
415    #[should_panic(expected = "capacity must be non-zero")]
416    fn zero_capacity_panics() {
417        let _ = Pool::new(0, || (), |_| {});
418    }
419
420    // =========================================================================
421    // Stress tests
422    // =========================================================================
423
424    #[test]
425    fn stress_single_thread() {
426        let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), |v| v.clear());
427
428        for _ in 0..10_000 {
429            let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
430
431            for item in &mut items {
432                item.extend_from_slice(b"data");
433            }
434
435            drop(items);
436        }
437
438        // All items should be back
439        let count = acquirer.available();
440        assert_eq!(count, 100);
441    }
442
443    #[test]
444    fn stress_multi_thread_return() {
445        let acquirer = Pool::new(
446            100,
447            || AtomicUsize::new(0),
448            |v| {
449                v.store(0, Ordering::Relaxed);
450            },
451        );
452
453        let returned = Arc::new(AtomicUsize::new(0));
454
455        thread::scope(|s| {
456            let (tx, rx) = std::sync::mpsc::channel();
457            let returned_clone = Arc::clone(&returned);
458
459            // Single worker thread receives and returns items
460            s.spawn(move || {
461                while let Ok(item) = rx.recv() {
462                    let _item: Pooled<AtomicUsize> = item;
463                    returned_clone.fetch_add(1, Ordering::Relaxed);
464                    // item drops here, returns to pool
465                }
466            });
467
468            // Main thread acquires and sends to worker
469            let mut sent = 0;
470            while sent < 1000 {
471                if let Some(item) = acquirer.try_acquire() {
472                    tx.send(item).unwrap();
473                    sent += 1;
474                } else {
475                    // Pool exhausted, wait a bit for returns
476                    thread::yield_now();
477                }
478            }
479            // tx drops here, worker sees disconnect
480        });
481
482        assert_eq!(returned.load(Ordering::Relaxed), 1000);
483    }
484
485    #[test]
486    fn stress_concurrent_return() {
487        // Multiple threads returning simultaneously
488        let acquirer = Pool::new(1000, || 0u64, |_| {});
489
490        // Acquire all items
491        let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
492        assert_eq!(items.len(), 1000);
493
494        // Split items across threads and return concurrently
495        let items_per_thread = 250;
496        let mut item_chunks: Vec<Vec<_>> = Vec::new();
497        let mut iter = items.into_iter();
498        for _ in 0..4 {
499            item_chunks.push(iter.by_ref().take(items_per_thread).collect());
500        }
501
502        thread::scope(|s| {
503            for chunk in item_chunks {
504                s.spawn(move || {
505                    for item in chunk {
506                        drop(item);
507                    }
508                });
509            }
510        });
511
512        // All items should be back
513        let count = acquirer.available();
514        assert_eq!(count, 1000);
515    }
516}