nexus_pool/sync.rs
1//! Single-acquirer pool: one thread acquires, any thread can return.
2//!
3//! Items are acquired from a single point (the `Acquirer`) and can be
4//! returned from any thread via `Drop` on `Pooled`.
5//!
6//! Uses LIFO ordering for cache locality.
7
8use std::cell::UnsafeCell;
9use std::mem::{ManuallyDrop, MaybeUninit};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::{Arc, Weak};
13
14const NONE: u32 = u32::MAX;
15
16// =============================================================================
17// Slot - individual pool entry
18// =============================================================================
19
20struct Slot<T> {
21 value: UnsafeCell<MaybeUninit<T>>,
22 next: AtomicU32,
23}
24
25// Safety: Slot is Send+Sync because:
26// - value is only accessed when slot is "owned" (popped from free list)
27// - next is atomic
28unsafe impl<T: Send> Send for Slot<T> {}
29unsafe impl<T: Send + Sync> Sync for Slot<T> {}
30
31// =============================================================================
32// Inner - shared pool state
33// =============================================================================
34
35struct Inner<T> {
36 slots: Box<[Slot<T>]>,
37 free_head: AtomicU32,
38 reset: Box<dyn Fn(&mut T) + Send + Sync>,
39}
40
41impl<T> Inner<T> {
42 /// Push a slot back onto the free list. Called from any thread.
43 #[inline]
44 fn push(&self, idx: u32, mut value: T) {
45 // Reset the value
46 (self.reset)(&mut value);
47
48 // Write value back to slot
49 // Safety: we own this slot (it was popped), no one else can access it
50 unsafe {
51 (*self.slots[idx as usize].value.get()).write(value);
52 }
53
54 // Link into free list with CAS loop
55 loop {
56 let head = self.free_head.load(Ordering::Relaxed);
57 self.slots[idx as usize].next.store(head, Ordering::Relaxed);
58
59 match self.free_head.compare_exchange_weak(
60 head,
61 idx,
62 Ordering::Release, // Publishes value write + next write
63 Ordering::Relaxed, // Failure just retries
64 ) {
65 Ok(_) => return,
66 Err(_) => std::hint::spin_loop(),
67 }
68 }
69 }
70
71 /// Pop a slot from the free list. Called only from Acquirer thread.
72 #[inline]
73 fn pop(&self) -> Option<u32> {
74 loop {
75 let head = self.free_head.load(Ordering::Acquire);
76 if head == NONE {
77 return None;
78 }
79
80 // Read next - safe because we Acquired head, syncs with pusher's Release
81 let next = self.slots[head as usize].next.load(Ordering::Relaxed);
82
83 match self.free_head.compare_exchange_weak(
84 head,
85 next,
86 Ordering::Acquire, // Syncs with pusher's Release
87 Ordering::Acquire, // On fail, need to see new head
88 ) {
89 Ok(_) => return Some(head),
90 Err(_) => {
91 // Pusher added something newer - retry for hotter item
92 std::hint::spin_loop();
93 }
94 }
95 }
96 }
97
98 /// Get reference to value at index.
99 ///
100 /// # Safety
101 ///
102 /// Caller must own the slot (have popped it) and slot must contain valid value.
103 #[inline]
104 unsafe fn read_value(&self, idx: u32) -> T {
105 unsafe { (*self.slots[idx as usize].value.get()).assume_init_read() }
106 }
107}
108
109impl<T> Drop for Inner<T> {
110 fn drop(&mut self) {
111 // Only drop values that are currently in the free list.
112 // Values that are "out" (held by Pooled) have been moved
113 // out of the slot, and the guard's Drop impl will handle them
114 // (either returning to pool, or dropping directly if pool is gone).
115 let mut idx = *self.free_head.get_mut();
116 while idx != NONE {
117 unsafe {
118 (*self.slots[idx as usize].value.get()).assume_init_drop();
119 }
120 idx = *self.slots[idx as usize].next.get_mut();
121 }
122 // MaybeUninit doesn't drop contents, so Box<[Slot<T>]> will just
123 // deallocate memory without double-dropping.
124 }
125}
126
127// =============================================================================
128// Pool - the pool and acquire handle combined
129// =============================================================================
130
131/// A bounded pool where one thread acquires and any thread can return.
132///
133/// Only one `Pool` exists per pool. It cannot be cloned or shared
134/// across threads (it is `Send` but not `Sync` or `Clone`).
135///
136/// When the `Pool` is dropped, outstanding `Pooled` guards
137/// will drop their values directly instead of returning them to the pool.
138///
139/// # Example
140///
141/// ```
142/// use nexus_pool::sync::Pool;
143///
144/// let acquirer = Pool::new(
145/// 100,
146/// || Vec::<u8>::with_capacity(1024),
147/// |v| v.clear(),
148/// );
149///
150/// // Acquirer thread
151/// let mut buf = acquirer.try_acquire().unwrap();
152/// buf.extend_from_slice(b"hello");
153///
154/// // Can send buf to another thread
155/// std::thread::spawn(move || {
156/// println!("{:?}", &*buf);
157/// // buf returns to pool on drop
158/// }).join().unwrap();
159/// ```
160pub struct Pool<T> {
161 inner: Arc<Inner<T>>,
162}
163
164// Pool is Send (can be moved to another thread) but not Sync (not shared)
165// Not Clone - only one acquirer exists
166unsafe impl<T: Send> Send for Pool<T> {}
167
168impl<T> Pool<T> {
169 /// Creates a pool with `capacity` pre-initialized objects.
170 ///
171 /// # Arguments
172 ///
173 /// * `capacity` - Number of objects to pre-allocate
174 /// * `init` - Factory function to create each object
175 /// * `reset` - Called when object returns to pool (e.g., `Vec::clear`)
176 ///
177 /// # Panics
178 ///
179 /// Panics if capacity is zero or exceeds `u32::MAX - 1`.
180 pub fn new<I, R>(capacity: usize, mut init: I, reset: R) -> Self
181 where
182 I: FnMut() -> T,
183 R: Fn(&mut T) + Send + Sync + 'static,
184 {
185 assert!(capacity > 0, "capacity must be non-zero");
186 assert!(
187 capacity < NONE as usize,
188 "capacity must be less than {}",
189 NONE
190 );
191
192 // Build slots with linked free list: 0 -> 1 -> 2 -> ... -> NONE
193 let slots: Box<[Slot<T>]> = (0..capacity)
194 .map(|i| Slot {
195 value: UnsafeCell::new(MaybeUninit::new(init())),
196 next: AtomicU32::new(if i + 1 < capacity {
197 (i + 1) as u32
198 } else {
199 NONE
200 }),
201 })
202 .collect();
203
204 Self {
205 inner: Arc::new(Inner {
206 slots,
207 free_head: AtomicU32::new(0), // Head of free list
208 reset: Box::new(reset),
209 }),
210 }
211 }
212
213 /// Attempts to acquire an object from the pool.
214 ///
215 /// Returns `None` if all objects are currently in use.
216 #[inline]
217 pub fn try_acquire(&self) -> Option<Pooled<T>> {
218 self.inner.pop().map(|idx| {
219 // Take value from slot
220 // Safety: we just popped this slot, we own it, it contains valid value
221 let value = unsafe { self.inner.read_value(idx) };
222 Pooled {
223 value: ManuallyDrop::new(value),
224 idx,
225 inner: Arc::downgrade(&self.inner),
226 }
227 })
228 }
229
230 /// Returns the number of available objects.
231 ///
232 /// Note: This is a snapshot and may be immediately outdated if other
233 /// threads are returning objects concurrently.
234 pub fn available(&self) -> usize {
235 let mut count = 0;
236 let mut idx = self.inner.free_head.load(Ordering::Relaxed);
237 while idx != NONE {
238 count += 1;
239 idx = self.inner.slots[idx as usize].next.load(Ordering::Relaxed);
240 }
241 count
242 }
243}
244
245// =============================================================================
246// Pooled - RAII guard
247// =============================================================================
248
249/// RAII guard that returns the object to the pool on drop.
250///
251/// This guard can be sent to other threads. When dropped, the object
252/// is automatically returned to the pool (if the pool still exists).
253pub struct Pooled<T> {
254 value: ManuallyDrop<T>,
255 idx: u32,
256 inner: Weak<Inner<T>>,
257}
258
259// Pooled is Send + Sync - can be sent anywhere, dropped from anywhere
260unsafe impl<T: Send> Send for Pooled<T> {}
261unsafe impl<T: Send + Sync> Sync for Pooled<T> {}
262
263impl<T> Deref for Pooled<T> {
264 type Target = T;
265
266 #[inline]
267 fn deref(&self) -> &T {
268 &self.value
269 }
270}
271
272impl<T> DerefMut for Pooled<T> {
273 #[inline]
274 fn deref_mut(&mut self) -> &mut T {
275 &mut self.value
276 }
277}
278
279impl<T> Drop for Pooled<T> {
280 fn drop(&mut self) {
281 if let Some(inner) = self.inner.upgrade() {
282 // Pool still alive - return value
283 let value = unsafe { ManuallyDrop::take(&mut self.value) };
284 inner.push(self.idx, value);
285 } else {
286 // Pool is gone - just drop the value
287 unsafe { ManuallyDrop::drop(&mut self.value) };
288 }
289 }
290}
291
292// =============================================================================
293// Tests
294// =============================================================================
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use std::sync::atomic::AtomicUsize;
300 use std::thread;
301
302 #[test]
303 fn basic_acquire_release() {
304 let acquirer = Pool::new(3, || Vec::<u8>::with_capacity(16), |v| v.clear());
305
306 let mut a = acquirer.try_acquire().unwrap();
307 a.extend_from_slice(b"hello");
308 assert_eq!(&*a, b"hello");
309
310 let _b = acquirer.try_acquire().unwrap();
311 let _c = acquirer.try_acquire().unwrap();
312
313 // Pool exhausted
314 assert!(acquirer.try_acquire().is_none());
315
316 // Return one
317 drop(a);
318
319 // Can acquire again - and it's been cleared
320 let d = acquirer.try_acquire().unwrap();
321 assert!(d.is_empty());
322 }
323
324 #[test]
325 fn cross_thread_return() {
326 let acquirer = Pool::new(2, || 42u32, |_| {});
327
328 let item = acquirer.try_acquire().unwrap();
329 assert_eq!(*item, 42);
330
331 // Send to another thread to drop
332 thread::spawn(move || {
333 assert_eq!(*item, 42);
334 drop(item);
335 })
336 .join()
337 .unwrap();
338
339 // Should be back in pool
340 let item2 = acquirer.try_acquire().unwrap();
341 assert_eq!(*item2, 42);
342 }
343
344 #[test]
345 fn acquirer_dropped_first() {
346 let item;
347 {
348 let acquirer = Pool::new(1, || String::from("test"), |s| s.clear());
349 item = acquirer.try_acquire().unwrap();
350 // acquirer drops here
351 }
352 // item still valid - we can access it
353 assert_eq!(&*item, "test");
354 // item drops here - should not panic
355 }
356
357 #[test]
358 fn reset_called_on_return() {
359 let reset_count = Arc::new(AtomicUsize::new(0));
360 let reset_count_clone = Arc::clone(&reset_count);
361
362 let acquirer = Pool::new(
363 2,
364 || 0u32,
365 move |_| {
366 reset_count_clone.fetch_add(1, Ordering::Relaxed);
367 },
368 );
369
370 let a = acquirer.try_acquire().unwrap();
371 assert_eq!(reset_count.load(Ordering::Relaxed), 0);
372
373 drop(a);
374 assert_eq!(reset_count.load(Ordering::Relaxed), 1);
375
376 let b = acquirer.try_acquire().unwrap();
377 let c = acquirer.try_acquire().unwrap();
378 drop(b);
379 drop(c);
380 assert_eq!(reset_count.load(Ordering::Relaxed), 3);
381 }
382
383 #[test]
384 fn lifo_ordering() {
385 let acquirer = Pool::new(3, || Vec::<u8>::new(), |v| v.clear());
386
387 let mut a = acquirer.try_acquire().unwrap();
388 let mut b = acquirer.try_acquire().unwrap();
389 let mut c = acquirer.try_acquire().unwrap();
390
391 a.push(1);
392 b.push(2);
393 c.push(3);
394
395 // Return in order: a, b, c
396 drop(a);
397 drop(b);
398 drop(c);
399
400 // Should get back in LIFO order: c, b, a
401 let x = acquirer.try_acquire().unwrap();
402 assert!(x.is_empty()); // reset was called, but this was 'c'
403
404 let y = acquirer.try_acquire().unwrap();
405 assert!(y.is_empty()); // this was 'b'
406
407 let z = acquirer.try_acquire().unwrap();
408 assert!(z.is_empty()); // this was 'a'
409 }
410
411 #[test]
412 #[should_panic(expected = "capacity must be non-zero")]
413 fn zero_capacity_panics() {
414 let _ = Pool::new(0, || (), |_| {});
415 }
416
417 // =========================================================================
418 // Stress tests
419 // =========================================================================
420
421 #[test]
422 fn stress_single_thread() {
423 let acquirer = Pool::new(100, || Vec::<u8>::with_capacity(64), |v| v.clear());
424
425 for _ in 0..10_000 {
426 let mut items: Vec<_> = (0..50).filter_map(|_| acquirer.try_acquire()).collect();
427
428 for item in &mut items {
429 item.extend_from_slice(b"data");
430 }
431
432 drop(items);
433 }
434
435 // All items should be back
436 let count = acquirer.available();
437 assert_eq!(count, 100);
438 }
439
440 #[test]
441 fn stress_multi_thread_return() {
442 let acquirer = Pool::new(
443 100,
444 || AtomicUsize::new(0),
445 |v| {
446 v.store(0, Ordering::Relaxed);
447 },
448 );
449
450 let returned = Arc::new(AtomicUsize::new(0));
451
452 thread::scope(|s| {
453 let (tx, rx) = std::sync::mpsc::channel();
454 let returned_clone = Arc::clone(&returned);
455
456 // Single worker thread receives and returns items
457 s.spawn(move || {
458 while let Ok(item) = rx.recv() {
459 let _item: Pooled<AtomicUsize> = item;
460 returned_clone.fetch_add(1, Ordering::Relaxed);
461 // item drops here, returns to pool
462 }
463 });
464
465 // Main thread acquires and sends to worker
466 let mut sent = 0;
467 while sent < 1000 {
468 if let Some(item) = acquirer.try_acquire() {
469 tx.send(item).unwrap();
470 sent += 1;
471 } else {
472 // Pool exhausted, wait a bit for returns
473 thread::yield_now();
474 }
475 }
476 // tx drops here, worker sees disconnect
477 });
478
479 assert_eq!(returned.load(Ordering::Relaxed), 1000);
480 }
481
482 #[test]
483 fn stress_concurrent_return() {
484 // Multiple threads returning simultaneously
485 let acquirer = Pool::new(1000, || 0u64, |_| {});
486
487 // Acquire all items
488 let items: Vec<_> = (0..1000).filter_map(|_| acquirer.try_acquire()).collect();
489 assert_eq!(items.len(), 1000);
490
491 // Split items across threads and return concurrently
492 let items_per_thread = 250;
493 let mut item_chunks: Vec<Vec<_>> = Vec::new();
494 let mut iter = items.into_iter();
495 for _ in 0..4 {
496 item_chunks.push(iter.by_ref().take(items_per_thread).collect());
497 }
498
499 thread::scope(|s| {
500 for chunk in item_chunks {
501 s.spawn(move || {
502 for item in chunk {
503 drop(item);
504 }
505 });
506 }
507 });
508
509 // All items should be back
510 let count = acquirer.available();
511 assert_eq!(count, 1000);
512 }
513}