nexus_pool/sync.rs
1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::{Arc, Weak};
13
14const NONE: usize = usize::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21 value: UnsafeCell<MaybeUninit<T>>,
22 next: AtomicUsize,
23}
24
25// SAFETY: Slot is Send because value is only accessed when the slot is "owned"
26// (popped from free list), so no concurrent access occurs. next is AtomicUsize.
27unsafe impl<T: Send> Send for Slot<T> {}
28// SAFETY: Slot is Sync because value (UnsafeCell) is only accessed when owned
29// (not in the free list), so no data race is possible. next is AtomicUsize (Sync).
30unsafe impl<T: Send + Sync> Sync for Slot<T> {}
31
32// =============================================================================
33// Inner - shared pool state
34// =============================================================================
35
36struct Inner<T> {
37 slots: Box<[Slot<T>]>,
38 free_head: AtomicUsize,
39 free_count: AtomicUsize,
40 reset: Box<dyn Fn(&mut T) + Send + Sync>,
41}
42
43impl<T> Inner<T> {
44 /// Push a slot back onto the free list. Called from any thread.
45 fn push(&self, idx: usize, mut value: T) {
46 // Reset the value
47 (self.reset)(&mut value);
48
49 // SAFETY: We own this slot (it was popped from the free list via CAS).
50 // No other thread can access it until we push it back. MaybeUninit::write
51 // initializes the slot for the next acquirer.
52 unsafe {
53 (*self.slots[idx].value.get()).write(value);
54 }
55
56 // Link into free list with CAS loop
57 loop {
58 let head = self.free_head.load(Ordering::Relaxed);
59 self.slots[idx].next.store(head, Ordering::Relaxed);
60
61 match self.free_head.compare_exchange_weak(
62 head,
63 idx,
64 Ordering::Release, // Publishes value write + next write
65 Ordering::Relaxed, // Failure just retries
66 ) {
67 Ok(_) => {
68 self.free_count.fetch_add(1, Ordering::Relaxed);
69 return;
70 }
71 Err(_) => std::hint::spin_loop(),
72 }
73 }
74 }
75
76 /// Pop a slot from the free list. Called only from Acquirer thread.
77 fn pop(&self) -> Option<usize> {
78 loop {
79 let head = self.free_head.load(Ordering::Acquire);
80 if head == NONE {
81 return None;
82 }
83
84 // Read next - safe because we Acquired head, syncs with pusher's Release
85 let next = self.slots[head].next.load(Ordering::Relaxed);
86
87 match self.free_head.compare_exchange_weak(
88 head,
89 next,
90 Ordering::Acquire, // Syncs with pusher's Release
91 Ordering::Acquire, // On fail, need to see new head
92 ) {
93 Ok(_) => {
94 self.free_count.fetch_sub(1, Ordering::Relaxed);
95 return Some(head);
96 }
97 Err(_) => {
98 // Pusher added something newer - retry for hotter item
99 std::hint::spin_loop();
100 }
101 }
102 }
103 }
104
105 /// Get reference to value at index.
106 ///
107 /// # Safety
108 ///
109 /// Caller must own the slot (have popped it) and slot must contain valid value.
110 unsafe fn read_value(&self, idx: usize) -> T {
111 // SAFETY: Caller guarantees slot was popped (owned) and contains a valid value
112 // written by new() or push(). assume_init_read moves the value out without
113 // dropping the MaybeUninit, which is correct since the slot will be rewritten
114 // on the next push.
115 unsafe { (*self.slots[idx].value.get()).assume_init_read() }
116 }
117}
118
119impl<T> Drop for Inner<T> {
120 fn drop(&mut self) {
121 // Only drop values that are currently in the free list.
122 // Values that are "out" (held by Pooled) have been moved
123 // out of the slot, and the guard's Drop impl will handle them
124 // (either returning to pool, or dropping directly if pool is gone).
125 let mut idx = *self.free_head.get_mut();
126 while idx != NONE {
127 // SAFETY: Slots in the free list contain valid values (written by new()
128 // or push()). Slots NOT in the free list have been moved out via
129 // assume_init_read in pop and are handled by Pooled's Drop. get_mut is
130 // safe because we have &mut self (exclusive access during drop).
131 unsafe {
132 (*self.slots[idx].value.get()).assume_init_drop();
133 }
134 idx = *self.slots[idx].next.get_mut();
135 }
136 // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
137 // deallocate memory without double-dropping.
138 }
139}
140
141// =============================================================================
142// Pool - the pool and acquire handle combined
143// =============================================================================
144
145/// A bounded pool where one thread acquires and any thread can return.
146///
147/// Only one `Pool` exists per pool. It cannot be cloned or shared
148/// across threads (it is `Send` but not `Sync` or `Clone`).
149///
150/// When the `Pool` is dropped, outstanding `Pooled` guards
151/// will drop their values directly instead of returning them to the pool.
152///
153/// # Example
154///
155/// ```
156/// use nexus_pool::sync::Pool;
157///
158/// let acquirer = Pool::new(
159/// 100,
160/// || Vec::<u8>::with_capacity(1024),
161/// |v| v.clear(),
162/// );
163///
164/// // Acquirer thread
165/// let mut buf = acquirer.try_acquire().unwrap();
166/// buf.extend_from_slice(b"hello");
167///
168/// // Can send buf to another thread
169/// std::thread::spawn(move || {
170/// println!("{:?}", &*buf);
171/// // buf returns to pool on drop
172/// }).join().unwrap();
173/// ```
174pub struct Pool<T> {
175 inner: Arc<Inner<T>>,
176}
177
178// SAFETY: Pool is Send (can be moved to another thread) but not Sync (not shared).
179// Inner uses atomics for the free list. Values in UnsafeCell are only accessed
180// when a slot is owned (popped via CAS). T: Send ensures values can cross threads.
181// Pool is not Clone — single acquirer enforced at the type level.
182#[allow(clippy::non_send_fields_in_send_ty)]
183unsafe impl<T: Send> Send for Pool<T> {}
184
185impl<T> Pool<T> {
186 /// Creates a pool with `capacity` pre-initialized objects.
187 ///
188 /// # Arguments
189 ///
190 /// * `capacity` - Number of objects to pre-allocate
191 /// * `init` - Factory function to create each object
192 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
193 ///
194 /// # Panics
195 ///
196 /// Panics if capacity is zero or exceeds `usize::MAX - 1`.
197 ///
198 /// The `reset` closure must not panic. If it does, the value is leaked
199 /// and the pool slot is not returned. Use simple operations like
200 /// `Vec::clear()` or field resets.
201 pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
202 where
203 I: FnMut() -> T,
204 R: Fn(&mut T) + Send + Sync + 'static,
205 {
206 assert!(capacity > 0, "capacity must be non-zero");
207 assert!(
208 capacity < NONE,
209 "capacity must be less than {}",
210 NONE
211 );
212
213 // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
214 let slots: Box<[Slot<T>]> = (0..capacity)
215 .map(|i| Slot {
216 value: UnsafeCell::new(MaybeUninit::new(init())),
217 next: AtomicUsize::new(if i + 1 < capacity { i + 1 } else { NONE }),
218 })
219 .collect();
220
221 Self {
222 inner: Arc::new(Inner {
223 slots,
224 free_head: AtomicUsize::new(0), // Head of free list
225 free_count: AtomicUsize::new(capacity),
226 reset: Box::new(reset),
227 }),
228 }
229 }
230
231 /// Attempts to acquire an object from the pool.
232 ///
233 /// Returns `None` if all objects are currently in use.
234 pub fn try_acquire(&self) -> Option<Pooled<T>> {
235 self.inner.pop().map(|idx| {
236 // SAFETY: We just popped this slot via CAS (exclusive ownership).
237 // The slot contains a valid value written by new() or a prior push().
238 let value = unsafe { self.inner.read_value(idx) };
239 Pooled {
240 value: ManuallyDrop::new(value),
241 idx,
242 inner: Arc::downgrade(&self.inner),
243 }
244 })
245 }
246
247 /// Returns the number of available objects.
248 ///
249 /// O(1) — backed by an atomic counter. This is a snapshot and may
250 /// be immediately outdated if other threads are returning objects
251 /// concurrently.
252 pub fn available(&self) -> usize {
253 self.inner.free_count.load(Ordering::Relaxed)
254 }
255}
256
257// =============================================================================
258// Pooled - RAII guard
259// =============================================================================
260
261/// RAII guard that returns the object to the pool on drop.
262///
263/// This guard can be sent to other threads. When dropped, the object
264/// is automatically returned to the pool (if the pool still exists).
265pub struct Pooled<T> {
266 value: ManuallyDrop<T>,
267 idx: usize,
268 inner: Weak<Inner<T>>,
269}
270
271// SAFETY: Pooled owns its value exclusively (ManuallyDrop<T>). The Weak<Inner<T>>
272// is only used during drop to push the slot back via atomic CAS. T: Send ensures
273// the value can cross threads.
274#[allow(clippy::non_send_fields_in_send_ty)]
275unsafe impl<T: Send> Send for Pooled<T> {}
276// SAFETY: Pooled's &self access goes through Deref to &T, which requires T: Sync.
277// The Weak and idx fields are not mutated through &self.
278unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
279
280impl<T> Deref for Pooled<T> {
281 type Target = T;
282
283 #[inline]
284 fn deref(&self) -> &T {
285 &self.value
286 }
287}
288
289impl<T> DerefMut for Pooled<T> {
290 #[inline]
291 fn deref_mut(&mut self) -> &mut T {
292 &mut self.value
293 }
294}
295
296impl<T> Drop for Pooled<T> {
297 fn drop(&mut self) {
298 if let Some(inner) = self.inner.upgrade() {
299 // SAFETY: Value is valid (ManuallyDrop preserves it until explicit take/drop).
300 // After take, self.value is consumed; inner.push writes it back to the slot.
301 let value = unsafe { ManuallyDrop::take(&mut self.value) };
302 inner.push(self.idx, value);
303 } else {
304 // SAFETY: Pool is gone. Value is valid and must be dropped to avoid a leak.
305 // After drop, we never touch self.value again.
306 unsafe { ManuallyDrop::drop(&mut self.value) };
307 }
308 }
309}
310
311// =============================================================================
312// Tests
313// =============================================================================
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use std::sync::atomic::AtomicUsize;
319 use std::thread;
320
321 #[test]
322 fn basic_acquire_release() {
323 let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), Vec::clear);
324
325 let mut a = acquirer.try_acquire().unwrap();
326 a.extend_from_slice(b"hello");
327 assert_eq!(&*a, b"hello");
328
329 let _b = acquirer.try_acquire().unwrap();
330 let _c = acquirer.try_acquire().unwrap();
331
332 // Pool exhausted
333 assert!(acquirer.try_acquire().is_none());
334
335 // Return one
336 drop(a);
337
338 // Can acquire again - and it's been cleared
339 let d = acquirer.try_acquire().unwrap();
340 assert!(d.is_empty());
341 }
342
343 #[test]
344 fn cross_thread_return() {
345 let acquirer = Pool::new(2, || 42u32, |_| {});
346
347 let item = acquirer.try_acquire().unwrap();
348 assert_eq!(*item, 42);
349
350 // Send to another thread to drop
351 thread::spawn(move || {
352 assert_eq!(*item, 42);
353 drop(item);
354 })
355 .join()
356 .unwrap();
357
358 // Should be back in pool
359 let item2 = acquirer.try_acquire().unwrap();
360 assert_eq!(*item2, 42);
361 }
362
363 #[test]
364 fn acquirer_dropped_first() {
365 let item;
366 {
367 let acquirer = Pool::new(1, || String::from("test"), String::clear);
368 item = acquirer.try_acquire().unwrap();
369 // acquirer drops here
370 }
371 // item still valid - we can access it
372 assert_eq!(&*item, "test");
373 // item drops here - should not panic
374 }
375
376 #[test]
377 fn reset_called_on_return() {
378 let reset_count = Arc::new(AtomicUsize::new(0));
379 let reset_count_clone = Arc::clone(&reset_count);
380
381 let acquirer = Pool::new(
382 2,
383 || 0u32,
384 move |_| {
385 reset_count_clone.fetch_add(1, Ordering::Relaxed);
386 },
387 );
388
389 let a = acquirer.try_acquire().unwrap();
390 assert_eq!(reset_count.load(Ordering::Relaxed), 0);
391
392 drop(a);
393 assert_eq!(reset_count.load(Ordering::Relaxed), 1);
394
395 let b = acquirer.try_acquire().unwrap();
396 let c = acquirer.try_acquire().unwrap();
397 drop(b);
398 drop(c);
399 assert_eq!(reset_count.load(Ordering::Relaxed), 3);
400 }
401
402 #[test]
403 fn lifo_ordering() {
404 let acquirer = Pool::new(3, Vec::<u8>::new, Vec::clear);
405
406 let mut guard_a = acquirer.try_acquire().unwrap();
407 let mut guard_b = acquirer.try_acquire().unwrap();
408 let mut guard_c = acquirer.try_acquire().unwrap();
409
410 guard_a.push(1);
411 guard_b.push(2);
412 guard_c.push(3);
413
414 // Return in order: a, b, c
415 drop(guard_a);
416 drop(guard_b);
417 drop(guard_c);
418
419 // Should get back in LIFO order: c, b, a
420 let reacquired_1 = acquirer.try_acquire().unwrap();
421 assert!(reacquired_1.is_empty()); // reset was called, but this was 'c'
422
423 let reacquired_2 = acquirer.try_acquire().unwrap();
424 assert!(reacquired_2.is_empty()); // this was 'b'
425
426 let reacquired_3 = acquirer.try_acquire().unwrap();
427 assert!(reacquired_3.is_empty()); // this was 'a'
428 }
429
430 #[test]
431 #[should_panic(expected = "capacity must be non-zero")]
432 fn zero_capacity_panics() {
433 let _ = Pool::new(0, || (), |()| {});
434 }
435
436 // =========================================================================
437 // Stress tests
438 // =========================================================================
439
440 #[test]
441 fn stress_single_thread() {
442 let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), Vec::clear);
443
444 for _ in 0..10_000 {
445 let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
446
447 for item in &mut items {
448 item.extend_from_slice(b"data");
449 }
450
451 drop(items);
452 }
453
454 // All items should be back
455 let count = acquirer.available();
456 assert_eq!(count, 100);
457 }
458
459 #[test]
460 fn stress_multi_thread_return() {
461 let acquirer = Pool::new(
462 100,
463 || AtomicUsize::new(0),
464 |v| {
465 v.store(0, Ordering::Relaxed);
466 },
467 );
468
469 let returned = Arc::new(AtomicUsize::new(0));
470
471 thread::scope(|s| {
472 let (tx, rx) = std::sync::mpsc::channel();
473 let returned_clone = Arc::clone(&returned);
474
475 // Single worker thread receives and returns items
476 s.spawn(move || {
477 while let Ok(item) = rx.recv() {
478 let _item: Pooled<AtomicUsize> = item;
479 returned_clone.fetch_add(1, Ordering::Relaxed);
480 // item drops here, returns to pool
481 }
482 });
483
484 // Main thread acquires and sends to worker
485 let mut sent = 0;
486 while sent < 1000 {
487 if let Some(item) = acquirer.try_acquire() {
488 tx.send(item).unwrap();
489 sent += 1;
490 } else {
491 // Pool exhausted, wait a bit for returns
492 thread::yield_now();
493 }
494 }
495 // tx drops here, worker sees disconnect
496 });
497
498 assert_eq!(returned.load(Ordering::Relaxed), 1000);
499 }
500
501 #[test]
502 fn stress_concurrent_return() {
503 // Multiple threads returning simultaneously
504 let acquirer = Pool::new(1000, || 0u64, |_| {});
505
506 // Acquire all items
507 let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
508 assert_eq!(items.len(), 1000);
509
510 // Split items across threads and return concurrently
511 let items_per_thread = 250;
512 let mut item_chunks: Vec<Vec<_>> = Vec::new();
513 let mut iter = items.into_iter();
514 for _ in 0..4 {
515 item_chunks.push(iter.by_ref().take(items_per_thread).collect());
516 }
517
518 thread::scope(|s| {
519 for chunk in item_chunks {
520 s.spawn(move || {
521 for item in chunk {
522 drop(item);
523 }
524 });
525 }
526 });
527
528 // All items should be back
529 let count = acquirer.available();
530 assert_eq!(count, 1000);
531 }
532}