Skip to main content

frozen_core/
bpool.rs

1//! Lock-free buffer pool used for staging IO buffers
2//!
3//! ## Features
4//!
5//! - *RAII Safe*
6//! - *Graceful Shutdown*
7//! - *Lock-free fast path*
8//!
9//! ## Polling
10//!
11//! The use of [`BufPool::allocate`] guarantees that the requested number of chunks are allocatated and stored
12//! in [`Allocation`], but if the choosen `backend` is [`BPBackend::Prealloc`], the call blocks internally
13//! when not enough chunks are available, generally till the previous [`Allocation`] are dropped
14//!
15//! ## Example
16//!
17//! ```
18//! use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
19//!
20//! let pool = BufPool::new(BPCfg {
21//!     mid: 0,
22//!     chunk_size: 0x20,
23//!     backend: BPBackend::Prealloc { capacity: 4 },
24//! });
25//!
26//! {
27//!     let alloc = pool.allocate(4).unwrap();
28//!     assert_eq!(alloc.count, 4);
29//!     // `alloc` is dropped here
30//! }
31//!
32//! let alloc2 = pool.allocate(4).unwrap();
33//! assert_eq!(alloc2.count, 4);
34//! ```
35
36use crate::error::{FrozenErr, FrozenRes};
37use std::{
38    ptr,
39    sync::{self, atomic},
40};
41
42const INVALID_POOL_SLOT: usize = u32::MAX as usize;
43
44/// Config for [`BufPool`]
45#[derive(Debug, Clone)]
46pub struct BPCfg {
47    /// Module id used for error logging
48    pub mid: u8,
49
50    /// Size (in bytes) of a single unit (chunk of memory) returned via [`Allocation`]
51    pub chunk_size: usize,
52
53    /// Backend used for allocations
54    pub backend: BPBackend,
55}
56
57/// Available allocation backends for [`BufPool`]
58#[derive(Debug, PartialEq, Clone, Copy)]
59pub enum BPBackend {
60    /// All slots are dynamically constructed at runtime, avoids waiting for slot availablity under high contention
61    /// at cost of runtime allocations
62    ///
63    /// ## When to use
64    ///
65    /// - burst workloads
66    /// - low-contention code paths
67    /// - requests that may exceed the configured pool capacity
68    Dynamic,
69
70    /// Uses a pre-allocated freelist w/ the given `capacity`
71    ///
72    /// All the chunks are allocated upfront, avoiding runtime allocations, while providing lower and more
73    /// predicatble latency compared to [`BPBackend::Dynamic`]
74    ///
75    /// ## When to use
76    ///
77    /// - hot IO paths
78    /// - write pipelines
79    /// - storage engines
80    /// - workloads where allocation latency must remain stable
81    ///
82    /// If all buffers are currently in use, [`BufPool::allocate`] blocks until another [`Allocation`]
83    /// is dropped and buffers return to the pool
84    ///
85    /// ## Fallback
86    ///
87    /// For [`BPBackend::Prealloc`] backend, f `n` exceeds the pool capacity, the allocation is performed using the
88    /// [`BPBackend::Dynamic`]
89    Prealloc {
90        /// Number of chunks to pre-allocate in memory
91        capacity: usize,
92    },
93}
94
95/// Lock-free buffer pool used for staging IO buffers
96///
97/// ## Features
98///
99/// - *RAII Safe*
100/// - *Graceful Shutdown*
101/// - *Lock-free fast path*
102///
103/// ## Pooling for allocations
104///
105/// Bufs are allocated in batches using [`BufPool::allocate`], it may allocate fewer than requested, in such cases
106/// caller should wait using [`BufPool::wait`] which block till any bufs are available to use again
107#[derive(Debug)]
108pub struct BufPool {
109    cfg: BPCfg,
110    state: BackendState,
111    lock: sync::Mutex<()>,
112    wait_cv: sync::Condvar,
113    close_cv: sync::Condvar,
114    active: atomic::AtomicUsize,
115}
116
117unsafe impl Send for BufPool {}
118unsafe impl Sync for BufPool {}
119
120impl BufPool {
121    /// Create a new [`BufPool`]
122    ///
123    /// ## Example
124    ///
125    /// ```
126    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
127    ///
128    /// let pool = BufPool::new(BPCfg {
129    ///     mid: 0,
130    ///     chunk_size: 0x20,
131    ///     backend: BPBackend::Prealloc { capacity: 0x10 },
132    /// });
133    ///
134    /// let alloc = pool.allocate(4).unwrap();
135    /// assert_eq!(alloc.count, 4);
136    /// ```
137    pub fn new(cfg: BPCfg) -> Self {
138        let state = BackendState::new(&cfg);
139        Self {
140            cfg,
141            state,
142            lock: sync::Mutex::new(()),
143            wait_cv: sync::Condvar::new(),
144            close_cv: sync::Condvar::new(),
145            active: atomic::AtomicUsize::new(0),
146        }
147    }
148
149    /// Allocate `n` buffers
150    ///
151    /// ## Blocking
152    ///
153    /// For [`BPBackend::Prealloc`] backend, if the pool does not currently contain enough chunks, call is blocked
154    /// until all required chunks are allocated
155    ///
156    /// ## Fallback to [`BPBackend::Dynamic`]
157    ///
158    /// For [`BPBackend::Prealloc`] backend, f `n` exceeds the pool capacity, the allocation is performed using the
159    /// [`BPBackend::Dynamic`]
160    ///
161    /// ## RAII
162    ///
163    /// The [`Allocation`] is RAII safe, as the allocated buffers are automatically reused (or free'ed from memory w/
164    /// respect to the backend used) as the caller drops reference to the [`Allocation`]
165    ///
166    /// ## Example
167    ///
168    /// ```
169    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
170    ///
171    /// let pool = BufPool::new(BPCfg {
172    ///     mid: 0,
173    ///     chunk_size: 0x20,
174    ///     backend: BPBackend::Prealloc { capacity: 0x10 },
175    /// });
176    ///
177    /// let alloc = pool.allocate(2).unwrap();
178    /// assert_eq!(alloc.count, 2);
179    /// assert_eq!(alloc.slots().len(), 2);
180    /// ```
181    #[inline(always)]
182    pub fn allocate(&self, n: usize) -> FrozenRes<Allocation> {
183        match &self.state {
184            BackendState::Dynamic => Ok(Allocation::new_dynamic(self, n)),
185            BackendState::Prealloc(state) => {
186                if n > state.capacity {
187                    return Ok(Allocation::new_dynamic(self, n));
188                }
189
190                state.allocate(n, self)
191            }
192        }
193    }
194}
195
196impl Drop for BufPool {
197    fn drop(&mut self) {
198        let mut guard = match self.lock.lock() {
199            Ok(g) => g,
200            Err(_) => return,
201        };
202
203        while self.active.load(atomic::Ordering::Acquire) != 0 {
204            guard = self.close_cv.wait(guard).expect("shutdown cv poisoned");
205        }
206
207        if let BackendState::Prealloc(state) = &self.state {
208            let pool_size = state.capacity * self.cfg.chunk_size;
209
210            // NOTE: We re-construct original allocation from the stored pointer! This builds up the vecotor
211            // as it was created, which than is dropped by Rust destructor's automatically!
212            let _ = unsafe { Vec::from_raw_parts(state.base_ptr, pool_size, pool_size) };
213        }
214    }
215}
216
217#[derive(Debug)]
218enum BackendState {
219    Dynamic,
220    Prealloc(PreallocState),
221}
222
223impl BackendState {
224    fn new(cfg: &BPCfg) -> Self {
225        match cfg.backend {
226            BPBackend::Dynamic => BackendState::Dynamic,
227            BPBackend::Prealloc { capacity } => BackendState::Prealloc(PreallocState::new(capacity, cfg)),
228        }
229    }
230}
231
232#[derive(Debug)]
233struct PreallocState {
234    capacity: usize,
235    base_ptr: TBPPtr,
236    head: atomic::AtomicUsize,
237    next: Box<[atomic::AtomicUsize]>,
238}
239
240impl PreallocState {
241    fn new(capacity: usize, cfg: &BPCfg) -> Self {
242        let pool_size = capacity * cfg.chunk_size;
243
244        let mut pool = Vec::<u8>::with_capacity(pool_size);
245        let base_ptr = pool.as_mut_ptr();
246
247        // NOTE: `Vec::with_capacity(N)` allocates memory but keeps the len at 0, we use raw pointers to access
248        // the slots, if the len stays at 0, it'd be UB while the reconstruction of the slice from the pointer
249        // would be invalid as well
250        unsafe { pool.set_len(pool_size) };
251
252        // NOTE: Here, when `pool` goes out of scope, the `drop(pool)` will be called by the compiler, which
253        // would drop the allocated memory, which for us must be pinned, to avoid this, we tell the compiler
254        // to not call the drop, i.e. simply forget that the `pool` exists, while we also make sure to drop
255        // this allocated memory pool manually when the [`BufPool`] itself is dropped
256        std::mem::forget(pool);
257
258        let mut next = Vec::with_capacity(capacity);
259        for i in 0..capacity {
260            let _i = 1 + i;
261            let n = if _i < capacity { _i } else { INVALID_POOL_SLOT };
262            next.push(atomic::AtomicUsize::new(n));
263        }
264
265        Self {
266            capacity,
267            base_ptr,
268            next: next.into_boxed_slice(),
269            head: atomic::AtomicUsize::new(pack_pool_idx(0, 0)),
270        }
271    }
272
273    #[inline(always)]
274    fn allocate(&self, n: usize, pool: &BufPool) -> FrozenRes<Allocation> {
275        let mut remaining = n;
276        let mut alloc = Allocation::new(pool, n);
277
278        while remaining != 0 {
279            let taken = self.alloc_batch(remaining, pool, &mut alloc);
280
281            if taken == 0 {
282                self.wait(pool)?;
283                continue;
284            }
285
286            remaining -= taken;
287        }
288
289        Ok(alloc)
290    }
291
292    #[inline(always)]
293    fn alloc_batch(&self, cap: usize, pool: &BufPool, out: &mut Allocation) -> usize {
294        let mut head = self.head.load(atomic::Ordering::Relaxed);
295        loop {
296            let (idx, tag) = unpack_pool_idx(head);
297
298            // NOTE:
299            //
300            // - If we reach the last entry (i.e. invalid ptr), we return early, despite not
301            // allocating all the required buffers
302            // - This allows caller to process allocated buffers, and avoid busy waiting for
303            // more buffers
304            // - The caller should poll to allocate, till all required bufs are allocated
305
306            if idx == INVALID_POOL_SLOT {
307                return 0;
308            }
309
310            //
311            // local walk
312            //
313
314            let mut count = 1;
315            let mut last = idx;
316
317            while count < cap {
318                let next = self.next[last].load(atomic::Ordering::Relaxed);
319                if next == INVALID_POOL_SLOT {
320                    break;
321                }
322
323                last = next;
324                count += 1;
325            }
326
327            let new_head_idx = self.next[last].load(atomic::Ordering::Relaxed);
328            let new_head = pack_pool_idx(new_head_idx, tag + 1);
329
330            match self
331                .head
332                .compare_exchange(head, new_head, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
333            {
334                Err(h) => head = h,
335                Ok(_) => {
336                    let base = self.base_ptr;
337                    let chunk = pool.cfg.chunk_size;
338
339                    let slots = out.slots.slots();
340                    slots.reserve(count);
341
342                    let mut cur = idx;
343                    for _ in 0..count {
344                        slots.push(unsafe { base.add(cur * chunk) });
345                        cur = self.next[cur].load(atomic::Ordering::Relaxed);
346                    }
347
348                    out.count += count;
349                    return count;
350                }
351            }
352        }
353    }
354
355    /// Block until at least one buffer becomes available
356    ///
357    /// This function is intended to be used when [`BufPool::allocate`] returns less than requested bufs,
358    /// i.e. when pool is exhausted
359    #[inline]
360    fn wait(&self, pool: &BufPool) -> FrozenRes<()> {
361        if self.has_free() {
362            return Ok(());
363        }
364
365        let mut guard = pool
366            .lock
367            .lock()
368            .map_err(|e| new_err_raw(BufPoolErrRes::Lpn, e, pool.cfg.mid))?;
369
370        if self.has_free() {
371            return Ok(());
372        }
373
374        while !self.has_free() {
375            guard = pool
376                .wait_cv
377                .wait(guard)
378                .map_err(|e| new_err_raw(BufPoolErrRes::Lpn, e, pool.cfg.mid))?;
379        }
380
381        Ok(())
382    }
383
384    #[inline]
385    fn has_free(&self) -> bool {
386        let (idx, _) = unpack_pool_idx(self.head.load(atomic::Ordering::Acquire));
387        idx != INVALID_POOL_SLOT
388    }
389
390    #[inline(always)]
391    fn free(&self, ptr: TBPPtr, pool: &BufPool) {
392        let offset = unsafe { ptr.offset_from(self.base_ptr) } as usize;
393        let idx = offset / pool.cfg.chunk_size;
394
395        let mut head = self.head.load(atomic::Ordering::Acquire);
396        loop {
397            let (head_idx, head_tag) = unpack_pool_idx(head);
398            self.next[idx].store(head_idx, atomic::Ordering::Relaxed);
399            let new = pack_pool_idx(idx, 1 + head_tag);
400
401            match self
402                .head
403                .compare_exchange(head, new, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
404            {
405                Ok(_) => {
406                    pool.wait_cv.notify_one();
407                    return;
408                }
409                Err(h) => head = h,
410            }
411        }
412    }
413}
414
415/// Domain Id for [`BufPool`] is **19**
416const ERRDOMAIN: u8 = 0x13;
417
418/// Error codes for [`BufPool`]
419#[repr(u16)]
420pub enum BufPoolErrRes {
421    /// (768) lock error (failed or poisoned)
422    Lpn = 0x300,
423}
424
425impl BufPoolErrRes {
426    #[inline]
427    fn default_message(&self) -> &'static [u8] {
428        match self {
429            Self::Lpn => b"lock poisoned while waiting for BufPool",
430        }
431    }
432}
433
434#[inline]
435fn new_err_raw<E: std::fmt::Display>(res: BufPoolErrRes, error: E, mid: u8) -> FrozenErr {
436    let detail = res.default_message();
437    FrozenErr::new(
438        mid,
439        ERRDOMAIN,
440        res as u16,
441        detail,
442        error.to_string().as_bytes().to_vec(),
443    )
444}
445
446const POOL_IDX_BITS: usize = 0x20;
447const POOL_IDX_MASK: usize = (1 << POOL_IDX_BITS) - 1;
448
449#[inline]
450fn pack_pool_idx(idx: usize, tag: usize) -> usize {
451    (tag << POOL_IDX_BITS) | (idx & POOL_IDX_MASK)
452}
453
454#[inline]
455fn unpack_pool_idx(id: usize) -> (usize, usize) {
456    (id & POOL_IDX_MASK, id >> POOL_IDX_BITS)
457}
458
459/// Pointer to buffer allocated by [`BufPool::allocate`]
460pub type TBPPtr = *mut u8;
461
462/// Buffer allocations allocated by [`BufPool::allocate`]
463#[derive(Debug)]
464pub struct Allocation {
465    /// Number of buffers allocated, can be lower than the requested amount
466    pub count: usize,
467
468    /// Vector containing raw buffer pointers, i.e. [`BPPtr`]
469    slots: AllocSlotType,
470
471    /// Guard to enforce RAII safety
472    guard: AllocationGuard,
473}
474
475unsafe impl Send for Allocation {}
476
477impl Allocation {
478    /// Returns the raw buffer pointers belonging to this allocation
479    ///
480    /// Each pointer references a contiguous memory region of size `BufPool::chunk_size`
481    ///
482    /// ## Example
483    ///
484    /// ```
485    /// use frozen_core::bpool::{BufPool, BPCfg, BPBackend};
486    ///
487    /// let pool = BufPool::new(BPCfg {
488    ///     mid: 0,
489    ///     chunk_size: 0x20,
490    ///     backend: BPBackend::Prealloc { capacity: 0x10 },
491    /// });
492    ///
493    /// let alloc = pool.allocate(2).unwrap();
494    ///
495    /// for ptr in alloc.slots() {
496    ///     assert!(!ptr.is_null());
497    /// }
498    /// ```
499    #[inline]
500    pub fn slots(&self) -> &Vec<TBPPtr> {
501        match &self.slots {
502            AllocSlotType::Pool(slots) => slots,
503            AllocSlotType::Dynamic(slots) => slots,
504        }
505    }
506
507    #[inline]
508    fn new(pool: &BufPool, cap: usize) -> Self {
509        pool.active.fetch_add(1, atomic::Ordering::Relaxed);
510
511        Self {
512            count: 0,
513            guard: AllocationGuard(ptr::NonNull::from(pool)),
514            slots: AllocSlotType::Pool(Vec::<TBPPtr>::with_capacity(cap)),
515        }
516    }
517
518    #[inline]
519    fn new_dynamic(pool: &BufPool, count: usize) -> Self {
520        let total = pool.cfg.chunk_size * count;
521        pool.active.fetch_add(1, atomic::Ordering::Relaxed);
522
523        let mut slice = Vec::<u8>::with_capacity(total);
524        let base_ptr = slice.as_mut_ptr();
525
526        // NOTE: `Vec::with_capacity(N)` allocates memory but keeps the len at 0, we use raw pointers to access
527        // the slots, if the len stays at 0, it'd be UB while the reconstruction of the slice from the pointer
528        // would be invalid as well
529        unsafe { slice.set_len(total) };
530
531        // NOTE: Here, when `slice` goes out of scope, the `drop(allocation)` will be called by the compiler,
532        // which would drop the allocated memory, which for us must be pinned, to avoid this, we tell the compiler
533        // to not call the drop, i.e. simply forget that the `pool` exists, while we also make sure to drop
534        // this allocated memory pool manually when the [`BufPool`] itself is dropped
535        std::mem::forget(slice);
536
537        let mut ptrs = Vec::with_capacity(count);
538        for i in 0..count {
539            let p = unsafe { base_ptr.add(i * pool.cfg.chunk_size) };
540            ptrs.push(p);
541        }
542
543        Self {
544            count,
545            slots: AllocSlotType::Dynamic(ptrs),
546            guard: AllocationGuard(ptr::NonNull::from(pool)),
547        }
548    }
549}
550
551impl Drop for Allocation {
552    fn drop(&mut self) {
553        let pool = unsafe { self.guard.0.as_ref() };
554
555        match &self.slots {
556            AllocSlotType::Pool(slots) => match &pool.state {
557                BackendState::Prealloc(state) => {
558                    for ptr in slots {
559                        state.free(*ptr, pool);
560                    }
561                }
562
563                // NOTE: The `AllocSlotType::Pool` can only originate from the `Prealloc` backend where chunks are
564                // taken from the freelist, and when the backend is `Dynamic`, all allocations are created through
565                // `Allocation::new_dynamic`, meaning no pool backed slots can ever exist
566                _ => unreachable!(),
567            },
568
569            AllocSlotType::Dynamic(slots) => {
570                // avoid panic in drop, i.e. UB risk (@_@;)
571                if slots.is_empty() {
572                    return;
573                }
574
575                let buf_size = pool.cfg.chunk_size;
576                let total = buf_size * slots.len();
577                let base = slots[0];
578
579                let _ = unsafe { Vec::from_raw_parts(base, total, total) };
580            }
581        }
582    }
583}
584
585#[derive(Debug)]
586enum AllocSlotType {
587    Pool(Vec<TBPPtr>),
588    Dynamic(Vec<TBPPtr>),
589}
590
591impl AllocSlotType {
592    fn slots(&mut self) -> &mut Vec<TBPPtr> {
593        match self {
594            Self::Pool(slots) => slots,
595            Self::Dynamic(slots) => slots,
596        }
597    }
598}
599
600#[derive(Debug)]
601struct AllocationGuard(ptr::NonNull<BufPool>);
602
603impl Drop for AllocationGuard {
604    fn drop(&mut self) {
605        let pool = unsafe { self.0.as_ref() };
606
607        if pool.active.fetch_sub(1, atomic::Ordering::Release) == 1 {
608            // last user
609            if let Ok(_g) = pool.lock.lock() {
610                pool.close_cv.notify_one();
611            }
612        }
613    }
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619    use crate::error::TEST_MID;
620
621    const CAP: usize = 0x20;
622    const SIZE: usize = 0x0A;
623
624    fn new_pool_prealloc(capacity: usize) -> BufPool {
625        BufPool::new(BPCfg {
626            mid: TEST_MID,
627            chunk_size: SIZE,
628            backend: BPBackend::Prealloc { capacity },
629        })
630    }
631
632    fn new_pool_dynamic() -> BufPool {
633        BufPool::new(BPCfg {
634            mid: TEST_MID,
635            chunk_size: SIZE,
636            backend: BPBackend::Dynamic,
637        })
638    }
639
640    mod utils {
641        use super::*;
642
643        #[test]
644        fn pack_unpack_cycle() {
645            let pack_id = pack_pool_idx(0x20, 0x0A);
646            let (idx, tag) = unpack_pool_idx(pack_id);
647
648            assert_eq!(idx, 0x20);
649            assert_eq!(tag, 0x0A);
650        }
651    }
652
653    mod pre_allocs {
654        use super::*;
655
656        #[test]
657        fn ok_alloc_works() {
658            let pool = new_pool_prealloc(CAP);
659            let alloc = pool.allocate(1).unwrap();
660
661            assert_eq!(alloc.count, 1);
662            assert_eq!(alloc.slots().len(), 1);
663        }
664
665        #[test]
666        fn ok_alloc_exact_cap_as_requested() {
667            let pool = new_pool_prealloc(CAP);
668            let alloc = pool.allocate(CAP).unwrap();
669
670            assert_eq!(alloc.count, CAP);
671            assert_eq!(alloc.slots().len(), CAP);
672        }
673
674        #[test]
675        fn ok_alloc_all_even_when_exhausted() {
676            let pool = new_pool_prealloc(CAP);
677
678            let a1 = pool.allocate(CAP - 1).unwrap();
679            assert_eq!(a1.count, CAP - 1);
680            drop(a1);
681
682            let a2 = pool.allocate(CAP).unwrap();
683            assert_eq!(a2.count, CAP);
684            drop(a2);
685
686            let a3 = pool.allocate(1).unwrap();
687            assert_eq!(a3.count, 1);
688        }
689
690        #[test]
691        fn ok_alloc_all_when_requested_larger_then_cap() {
692            let pool = new_pool_prealloc(CAP);
693
694            let a1 = pool.allocate(CAP * 2).unwrap();
695            assert_eq!(a1.count, CAP * 2);
696        }
697
698        #[test]
699        fn ok_no_duplicate_slots_in_single_alloc() {
700            let pool = new_pool_prealloc(CAP);
701
702            let alloc = pool.allocate(CAP).unwrap();
703            let mut ptrs: Vec<TBPPtr> = alloc.slots().to_vec();
704
705            // remove duplicates if any
706            ptrs.sort();
707            ptrs.dedup();
708
709            assert_eq!(ptrs.len(), CAP);
710        }
711
712        #[test]
713        fn ok_large_allocation_with_pre_alloc() {
714            let pool = new_pool_prealloc(0x100);
715
716            for i in 0..0x100 {
717                let a = pool.allocate(i).unwrap();
718                assert_eq!(a.slots().len(), i);
719            }
720
721            let final_alloc = pool.allocate(0x10).unwrap();
722            assert_eq!(final_alloc.count, 0x10);
723        }
724    }
725
726    mod dynamic_allocs {
727        use super::*;
728
729        #[test]
730        fn ok_dynamic_alloc() {
731            let pool = new_pool_dynamic();
732            let alloc = pool.allocate(CAP).unwrap();
733
734            assert_eq!(alloc.count, CAP);
735            assert_eq!(alloc.slots().len(), CAP);
736        }
737
738        #[test]
739        fn ok_no_duplicate_slots_in_single_dynamic_alloc() {
740            let pool = new_pool_dynamic();
741
742            let alloc = pool.allocate(CAP).unwrap();
743            let mut ptrs: Vec<TBPPtr> = alloc.slots().to_vec();
744
745            // remove duplicates if any
746            ptrs.sort();
747            ptrs.dedup();
748
749            assert_eq!(ptrs.len(), CAP);
750        }
751
752        #[test]
753        fn ok_large_allocation_with_dynamic_alloc() {
754            let pool = new_pool_dynamic();
755            let alloc = pool.allocate(0x400).unwrap();
756
757            assert_eq!(alloc.count, 0x400);
758            assert_eq!(alloc.slots().len(), 0x400);
759        }
760    }
761
762    mod raii_safety {
763        use super::*;
764
765        #[test]
766        fn ok_pre_alloc_auto_free_on_drop() {
767            let pool = new_pool_prealloc(CAP);
768
769            {
770                let alloc = pool.allocate(CAP).unwrap();
771                assert_eq!(alloc.count, CAP);
772            }
773
774            // validate free on drop
775            assert_eq!(pool.active.load(atomic::Ordering::Acquire), 0);
776
777            let alloc2 = pool.allocate(CAP).unwrap();
778            assert_eq!(alloc2.count, CAP);
779        }
780
781        #[test]
782        fn ok_dynamic_alloc_auto_free_on_drop() {
783            let pool = new_pool_dynamic();
784
785            {
786                let alloc = pool.allocate(CAP).unwrap();
787                assert_eq!(alloc.count, CAP);
788            }
789
790            // validate free on drop
791            assert_eq!(pool.active.load(atomic::Ordering::Acquire), 0);
792        }
793    }
794
795    mod concurrency {
796        use super::*;
797        use std::sync::{Arc, Barrier};
798        use std::thread;
799
800        #[test]
801        fn ok_concurrent_alloc() {
802            const THREADS: usize = 8;
803            const ITERS: usize = 0x1000;
804
805            let barrier = Arc::new(Barrier::new(THREADS));
806            let pool = Arc::new(new_pool_prealloc(CAP * 0x0A));
807
808            let mut handles = Vec::new();
809            for _ in 0..THREADS {
810                let pool = pool.clone();
811                let barrier = barrier.clone();
812
813                handles.push(thread::spawn(move || {
814                    barrier.wait();
815
816                    for _ in 0..ITERS {
817                        let mut n = CAP / 2;
818                        while n != 0 {
819                            let alloc = pool.allocate(n).unwrap();
820                            n -= alloc.count;
821                        }
822                    }
823                }));
824            }
825
826            for h in handles {
827                assert!(h.join().is_ok());
828            }
829
830            let final_alloc = pool.allocate(CAP).unwrap();
831            assert_eq!(final_alloc.count, CAP);
832        }
833
834        #[test]
835        fn ok_concurrent_dynamic_alloc() {
836            const THREADS: usize = 8;
837            const ITERS: usize = 0x200;
838
839            let pool = Arc::new(new_pool_dynamic());
840
841            let mut handles = Vec::new();
842            for _ in 0..THREADS {
843                let pool = pool.clone();
844
845                handles.push(thread::spawn(move || {
846                    for _ in 0..ITERS {
847                        let alloc = pool.allocate(0x10).unwrap();
848                        assert_eq!(alloc.count, 0x10);
849                    }
850                }));
851            }
852
853            for h in handles {
854                assert!(h.join().is_ok());
855            }
856
857            assert_eq!(pool.active.load(atomic::Ordering::Acquire), 0);
858
859            // pool should still function correctly
860            let alloc = pool.allocate(CAP).unwrap();
861            assert_eq!(alloc.count, CAP);
862        }
863    }
864
865    mod polling {
866        use super::*;
867        use std::sync::Arc;
868        use std::thread;
869        use std::time::{Duration, Instant};
870
871        #[test]
872        fn ok_pre_alloc_blocks_until_buffers_freed() {
873            let pool = Arc::new(new_pool_prealloc(1));
874            let a = pool.allocate(1).unwrap();
875
876            let pool2 = pool.clone();
877            let h1 = thread::spawn(move || {
878                let start = Instant::now();
879                let alloc = pool2.allocate(1).expect("alloc failed");
880                let elapsed = start.elapsed();
881
882                assert!(elapsed >= Duration::from_millis(20));
883                assert_eq!(alloc.count, 1);
884            });
885
886            let h2 = thread::spawn(move || {
887                thread::sleep(Duration::from_millis(30));
888                drop(a);
889            });
890
891            assert!(h1.join().is_ok());
892            assert!(h2.join().is_ok());
893        }
894    }
895
896    mod shutdown_safety {
897        use super::*;
898        use std::sync::Arc;
899
900        #[test]
901        fn drop_waits_for_active_pre_allocations() {
902            let pool = Arc::new(new_pool_prealloc(CAP * 0x0A));
903            let pool2 = pool.clone();
904
905            let handle = std::thread::spawn(move || {
906                let alloc = pool2.allocate(4).unwrap();
907                std::thread::sleep(std::time::Duration::from_millis(0x32));
908                drop(alloc);
909            });
910
911            std::thread::sleep(std::time::Duration::from_millis(0x0A)); // give the other thread time to allocate
912            drop(pool); // this must block until alloc is dropped
913
914            assert!(handle.join().is_ok());
915        }
916
917        #[test]
918        fn drop_waits_for_active_dynamic_allocations() {
919            let pool = Arc::new(new_pool_prealloc(CAP * 0x0A));
920            let pool2 = pool.clone();
921
922            let handle = std::thread::spawn(move || {
923                let alloc = pool2.allocate(4).unwrap();
924                std::thread::sleep(std::time::Duration::from_millis(0x32));
925                drop(alloc);
926            });
927
928            std::thread::sleep(std::time::Duration::from_millis(0x0A)); // give the other thread time to allocate
929            drop(pool); // this must block until alloc is dropped
930
931            assert!(handle.join().is_ok());
932        }
933
934        #[test]
935        fn ok_cross_thread_drop() {
936            let pool = Arc::new(new_pool_prealloc(0x0C));
937            let alloc = pool.allocate(0x0C).unwrap();
938
939            let h1 = {
940                std::thread::spawn(move || {
941                    drop(alloc);
942                })
943            };
944
945            let h2 = {
946                let pool = pool.clone();
947
948                std::thread::spawn(move || {
949                    let a = pool.allocate(8).unwrap();
950                    assert_eq!(a.count, 8);
951                })
952            };
953
954            assert!(h1.join().is_ok());
955            assert!(h2.join().is_ok());
956        }
957    }
958}