nexus_pool/sync.rs
1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::{Arc, Weak};
13
14const NONE: u32 = u32::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21 value: UnsafeCell<MaybeUninit<T>>,
22 next: AtomicU32,
23}
24
25// Safety: Slot is Send+Sync because:
26// - value is only accessed when slot is "owned" (popped from free list)
27// - next is atomic
28unsafe impl<T: Send> Send for Slot<T> {}
29unsafe impl<T: Send + Sync> Sync for Slot<T> {}
30
31// =============================================================================
32// Inner - shared pool state
33// =============================================================================
34
35struct Inner<T> {
36 slots: Box<[Slot<T>]>,
37 free_head: AtomicU32,
38 reset: Box<dyn Fn(&mut T) + Send + Sync>,
39}
40
41impl<T> Inner<T> {
42 /// Push a slot back onto the free list. Called from any thread.
43 fn push(&self, idx: u32, mut value: T) {
44 // Reset the value
45 (self.reset)(&mut value);
46
47 // Write value back to slot
48 // Safety: we own this slot (it was popped), no one else can access it
49 unsafe {
50 (*self.slots[idx as usize].value.get()).write(value);
51 }
52
53 // Link into free list with CAS loop
54 loop {
55 let head = self.free_head.load(Ordering::Relaxed);
56 self.slots[idx as usize].next.store(head, Ordering::Relaxed);
57
58 match self.free_head.compare_exchange_weak(
59 head,
60 idx,
61 Ordering::Release, // Publishes value write + next write
62 Ordering::Relaxed, // Failure just retries
63 ) {
64 Ok(_) => return,
65 Err(_) => std::hint::spin_loop(),
66 }
67 }
68 }
69
70 /// Pop a slot from the free list. Called only from Acquirer thread.
71 fn pop(&self) -> Option<u32> {
72 loop {
73 let head = self.free_head.load(Ordering::Acquire);
74 if head == NONE {
75 return None;
76 }
77
78 // Read next - safe because we Acquired head, syncs with pusher's Release
79 let next = self.slots[head as usize].next.load(Ordering::Relaxed);
80
81 match self.free_head.compare_exchange_weak(
82 head,
83 next,
84 Ordering::Acquire, // Syncs with pusher's Release
85 Ordering::Acquire, // On fail, need to see new head
86 ) {
87 Ok(_) => return Some(head),
88 Err(_) => {
89 // Pusher added something newer - retry for hotter item
90 std::hint::spin_loop();
91 }
92 }
93 }
94 }
95
96 /// Get reference to value at index.
97 ///
98 /// # Safety
99 ///
100 /// Caller must own the slot (have popped it) and slot must contain valid value.
101 unsafe fn read_value(&self, idx: u32) -> T {
102 unsafe { (*self.slots[idx as usize].value.get()).assume_init_read() }
103 }
104}
105
106impl<T> Drop for Inner<T> {
107 fn drop(&mut self) {
108 // Only drop values that are currently in the free list.
109 // Values that are "out" (held by Pooled) have been moved
110 // out of the slot, and the guard's Drop impl will handle them
111 // (either returning to pool, or dropping directly if pool is gone).
112 let mut idx = *self.free_head.get_mut();
113 while idx != NONE {
114 unsafe {
115 (*self.slots[idx as usize].value.get()).assume_init_drop();
116 }
117 idx = *self.slots[idx as usize].next.get_mut();
118 }
119 // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
120 // deallocate memory without double-dropping.
121 }
122}
123
124// =============================================================================
125// Pool - the pool and acquire handle combined
126// =============================================================================
127
128/// A bounded pool where one thread acquires and any thread can return.
129///
130/// Only one `Pool` exists per pool. It cannot be cloned or shared
131/// across threads (it is `Send` but not `Sync` or `Clone`).
132///
133/// When the `Pool` is dropped, outstanding `Pooled` guards
134/// will drop their values directly instead of returning them to the pool.
135///
136/// # Example
137///
138/// ```
139/// use nexus_pool::sync::Pool;
140///
141/// let acquirer = Pool::new(
142/// 100,
143/// || Vec::<u8>::with_capacity(1024),
144/// |v| v.clear(),
145/// );
146///
147/// // Acquirer thread
148/// let mut buf = acquirer.try_acquire().unwrap();
149/// buf.extend_from_slice(b"hello");
150///
151/// // Can send buf to another thread
152/// std::thread::spawn(move || {
153/// println!("{:?}", &*buf);
154/// // buf returns to pool on drop
155/// }).join().unwrap();
156/// ```
157pub struct Pool<T> {
158 inner: Arc<Inner<T>>,
159}
160
161// Pool is Send (can be moved to another thread) but not Sync (not shared)
162// Not Clone - only one acquirer exists
163// Safety: Inner uses atomics for the free list and values are only accessed
164// when a slot is owned (popped). T: Send ensures values can cross threads.
165#[allow(clippy::non_send_fields_in_send_ty)]
166unsafe impl<T: Send> Send for Pool<T> {}
167
168impl<T> Pool<T> {
169 /// Creates a pool with `capacity` pre-initialized objects.
170 ///
171 /// # Arguments
172 ///
173 /// * `capacity` - Number of objects to pre-allocate
174 /// * `init` - Factory function to create each object
175 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
176 ///
177 /// # Panics
178 ///
179 /// Panics if capacity is zero or exceeds `u32::MAX - 1`.
180 pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
181 where
182 I: FnMut() -> T,
183 R: Fn(&mut T) + Send + Sync + 'static,
184 {
185 assert!(capacity > 0, "capacity must be non-zero");
186 assert!(
187 capacity < NONE as usize,
188 "capacity must be less than {}",
189 NONE
190 );
191
192 // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
193 let slots: Box<[Slot<T>]> = (0..capacity)
194 .map(|i| Slot {
195 value: UnsafeCell::new(MaybeUninit::new(init())),
196 next: AtomicU32::new(if i + 1 < capacity {
197 (i + 1) as u32
198 } else {
199 NONE
200 }),
201 })
202 .collect();
203
204 Self {
205 inner: Arc::new(Inner {
206 slots,
207 free_head: AtomicU32::new(0), // Head of free list
208 reset: Box::new(reset),
209 }),
210 }
211 }
212
213 /// Attempts to acquire an object from the pool.
214 ///
215 /// Returns `None` if all objects are currently in use.
216 pub fn try_acquire(&self) -> Option<Pooled<T>> {
217 self.inner.pop().map(|idx| {
218 // Take value from slot
219 // Safety: we just popped this slot, we own it, it contains valid value
220 let value = unsafe { self.inner.read_value(idx) };
221 Pooled {
222 value: ManuallyDrop::new(value),
223 idx,
224 inner: Arc::downgrade(&self.inner),
225 }
226 })
227 }
228
229 /// Returns the number of available objects.
230 ///
231 /// Note: This is a snapshot and may be immediately outdated if other
232 /// threads are returning objects concurrently.
233 pub fn available(&self) -> usize {
234 let mut count = 0;
235 let mut idx = self.inner.free_head.load(Ordering::Relaxed);
236 while idx != NONE {
237 count += 1;
238 idx = self.inner.slots[idx as usize].next.load(Ordering::Relaxed);
239 }
240 count
241 }
242}
243
244// =============================================================================
245// Pooled - RAII guard
246// =============================================================================
247
248/// RAII guard that returns the object to the pool on drop.
249///
250/// This guard can be sent to other threads. When dropped, the object
251/// is automatically returned to the pool (if the pool still exists).
252pub struct Pooled<T> {
253 value: ManuallyDrop<T>,
254 idx: u32,
255 inner: Weak<Inner<T>>,
256}
257
258// Pooled is Send + Sync - can be sent anywhere, dropped from anywhere
259// Safety: Pooled owns its value (ManuallyDrop<T>). The Weak<Inner<T>> is only
260// used during drop to push the slot back via atomic CAS. T: Send ensures the
261// value can cross threads; T: Sync ensures shared references are safe.
262#[allow(clippy::non_send_fields_in_send_ty)]
263unsafe impl<T: Send> Send for Pooled<T> {}
264unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
265
266impl<T> Deref for Pooled<T> {
267 type Target = T;
268
269 #[inline]
270 fn deref(&self) -> &T {
271 &self.value
272 }
273}
274
275impl<T> DerefMut for Pooled<T> {
276 #[inline]
277 fn deref_mut(&mut self) -> &mut T {
278 &mut self.value
279 }
280}
281
282impl<T> Drop for Pooled<T> {
283 fn drop(&mut self) {
284 if let Some(inner) = self.inner.upgrade() {
285 // Pool still alive - return value
286 let value = unsafe { ManuallyDrop::take(&mut self.value) };
287 inner.push(self.idx, value);
288 } else {
289 // Pool is gone - just drop the value
290 unsafe { ManuallyDrop::drop(&mut self.value) };
291 }
292 }
293}
294
295// =============================================================================
296// Tests
297// =============================================================================
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use std::sync::atomic::AtomicUsize;
303 use std::thread;
304
305 #[test]
306 fn basic_acquire_release() {
307 let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), |v| v.clear());
308
309 let mut a = acquirer.try_acquire().unwrap();
310 a.extend_from_slice(b"hello");
311 assert_eq!(&*a, b"hello");
312
313 let _b = acquirer.try_acquire().unwrap();
314 let _c = acquirer.try_acquire().unwrap();
315
316 // Pool exhausted
317 assert!(acquirer.try_acquire().is_none());
318
319 // Return one
320 drop(a);
321
322 // Can acquire again - and it's been cleared
323 let d = acquirer.try_acquire().unwrap();
324 assert!(d.is_empty());
325 }
326
327 #[test]
328 fn cross_thread_return() {
329 let acquirer = Pool::new(2, || 42u32, |_| {});
330
331 let item = acquirer.try_acquire().unwrap();
332 assert_eq!(*item, 42);
333
334 // Send to another thread to drop
335 thread::spawn(move || {
336 assert_eq!(*item, 42);
337 drop(item);
338 })
339 .join()
340 .unwrap();
341
342 // Should be back in pool
343 let item2 = acquirer.try_acquire().unwrap();
344 assert_eq!(*item2, 42);
345 }
346
347 #[test]
348 fn acquirer_dropped_first() {
349 let item;
350 {
351 let acquirer = Pool::new(1, || String::from("test"), |s| s.clear());
352 item = acquirer.try_acquire().unwrap();
353 // acquirer drops here
354 }
355 // item still valid - we can access it
356 assert_eq!(&*item, "test");
357 // item drops here - should not panic
358 }
359
360 #[test]
361 fn reset_called_on_return() {
362 let reset_count = Arc::new(AtomicUsize::new(0));
363 let reset_count_clone = Arc::clone(&reset_count);
364
365 let acquirer = Pool::new(
366 2,
367 || 0u32,
368 move |_| {
369 reset_count_clone.fetch_add(1, Ordering::Relaxed);
370 },
371 );
372
373 let a = acquirer.try_acquire().unwrap();
374 assert_eq!(reset_count.load(Ordering::Relaxed), 0);
375
376 drop(a);
377 assert_eq!(reset_count.load(Ordering::Relaxed), 1);
378
379 let b = acquirer.try_acquire().unwrap();
380 let c = acquirer.try_acquire().unwrap();
381 drop(b);
382 drop(c);
383 assert_eq!(reset_count.load(Ordering::Relaxed), 3);
384 }
385
386 #[test]
387 fn lifo_ordering() {
388 let acquirer = Pool::new(3, || Vec::<u8>::new(), |v| v.clear());
389
390 let mut a = acquirer.try_acquire().unwrap();
391 let mut b = acquirer.try_acquire().unwrap();
392 let mut c = acquirer.try_acquire().unwrap();
393
394 a.push(1);
395 b.push(2);
396 c.push(3);
397
398 // Return in order: a, b, c
399 drop(a);
400 drop(b);
401 drop(c);
402
403 // Should get back in LIFO order: c, b, a
404 let x = acquirer.try_acquire().unwrap();
405 assert!(x.is_empty()); // reset was called, but this was 'c'
406
407 let y = acquirer.try_acquire().unwrap();
408 assert!(y.is_empty()); // this was 'b'
409
410 let z = acquirer.try_acquire().unwrap();
411 assert!(z.is_empty()); // this was 'a'
412 }
413
414 #[test]
415 #[should_panic(expected = "capacity must be non-zero")]
416 fn zero_capacity_panics() {
417 let _ = Pool::new(0, || (), |_| {});
418 }
419
420 // =========================================================================
421 // Stress tests
422 // =========================================================================
423
424 #[test]
425 fn stress_single_thread() {
426 let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), |v| v.clear());
427
428 for _ in 0..10_000 {
429 let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
430
431 for item in &mut items {
432 item.extend_from_slice(b"data");
433 }
434
435 drop(items);
436 }
437
438 // All items should be back
439 let count = acquirer.available();
440 assert_eq!(count, 100);
441 }
442
443 #[test]
444 fn stress_multi_thread_return() {
445 let acquirer = Pool::new(
446 100,
447 || AtomicUsize::new(0),
448 |v| {
449 v.store(0, Ordering::Relaxed);
450 },
451 );
452
453 let returned = Arc::new(AtomicUsize::new(0));
454
455 thread::scope(|s| {
456 let (tx, rx) = std::sync::mpsc::channel();
457 let returned_clone = Arc::clone(&returned);
458
459 // Single worker thread receives and returns items
460 s.spawn(move || {
461 while let Ok(item) = rx.recv() {
462 let _item: Pooled<AtomicUsize> = item;
463 returned_clone.fetch_add(1, Ordering::Relaxed);
464 // item drops here, returns to pool
465 }
466 });
467
468 // Main thread acquires and sends to worker
469 let mut sent = 0;
470 while sent < 1000 {
471 if let Some(item) = acquirer.try_acquire() {
472 tx.send(item).unwrap();
473 sent += 1;
474 } else {
475 // Pool exhausted, wait a bit for returns
476 thread::yield_now();
477 }
478 }
479 // tx drops here, worker sees disconnect
480 });
481
482 assert_eq!(returned.load(Ordering::Relaxed), 1000);
483 }
484
485 #[test]
486 fn stress_concurrent_return() {
487 // Multiple threads returning simultaneously
488 let acquirer = Pool::new(1000, || 0u64, |_| {});
489
490 // Acquire all items
491 let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
492 assert_eq!(items.len(), 1000);
493
494 // Split items across threads and return concurrently
495 let items_per_thread = 250;
496 let mut item_chunks: Vec<Vec<_>> = Vec::new();
497 let mut iter = items.into_iter();
498 for _ in 0..4 {
499 item_chunks.push(iter.by_ref().take(items_per_thread).collect());
500 }
501
502 thread::scope(|s| {
503 for chunk in item_chunks {
504 s.spawn(move || {
505 for item in chunk {
506 drop(item);
507 }
508 });
509 }
510 });
511
512 // All items should be back
513 let count = acquirer.available();
514 assert_eq!(count, 1000);
515 }
516}