Skip to main content

frozen_core/
bpool.rs

1//! Lock-free buffer pool used for staging IO buffers
2//!
3//! ## Features
4//!
5//! - *RAII Safe*
6//! - *Graceful Shutdown*
7//! - *Lock-free fast path*
8//!
9//! ## Pooling for allocations
10//!
11//! Bufs are allocated in batches using [`BPool::allocate`], it may allocate fewer than requested, in such cases
12//! caller should wait using [`BPool::wait`] which block till any bufs are available to use again
13//!
14//! ## Example
15//!
16//! ```
17//! use frozen_core::bpool::BPool;
18//! use std::sync::Arc;
19//! use std::thread;
20//!
21//! const MODULE_ID: u8 = 0;
22//! const CAPACITY: usize = 8;
23//! const BUF_SIZE: usize = 0x20;
24//!
25//! let pool = Arc::new(BPool::new(BUF_SIZE, CAPACITY, MODULE_ID));
26//! let mut handles = Vec::new();
27//!
28//! for _ in 0..4 {
29//!     let pool = pool.clone();
30//!     handles.push(thread::spawn(move || {
31//!         for _ in 0..0x80 {
32//!             let mut n = 2;
33//!             while n != 0 {
34//!                 let alloc = pool.allocate(n);
35//!
36//!                 // pool when not all bufs are allocated
37//!                 if alloc.count == 0 {
38//!                     pool.wait().expect("wait failed");
39//!                     continue;
40//!                 }
41//!
42//!                n -= alloc.count;
43//!             }
44//!
45//!             // NOTE: allocated bufs are freed automatically when `alloc` drops
46//!         }
47//!     }));
48//! }
49//!
50//! for h in handles {
51//!     h.join().unwrap();
52//! }
53//! ```
54
55use crate::error::{FrozenErr, FrozenRes};
56use std::sync::{self, atomic};
57
58const INVALID_POOL_SLOT: usize = u32::MAX as usize;
59
60/// Lock-free buffer pool used for staging IO buffers
61///
62/// ## Features
63///
64/// - *RAII Safe*
65/// - *Graceful Shutdown*
66/// - *Lock-free fast path*
67///
68/// ## Pooling for allocations
69///
70/// Bufs are allocated in batches using [`BPool::allocate`], it may allocate fewer than requested, in such cases
71/// caller should wait using [`BPool::wait`] which block till any bufs are available to use again
72///
73/// ## Example
74///
75/// ```
76/// const MODULE_ID: u8 = 0;
77/// const CAPACITY: usize = 4;
78/// const BUF_SIZE: usize = 0x20;
79///
80/// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
81///
82/// {
83///     // NOTE: allocations are RAII safe, hence the underlying resource is reused when `alloc` is dropped
84///     let alloc = pool.allocate(4);
85///     assert_eq!(alloc.count, 4);
86/// }
87///
88/// let alloc2 = pool.allocate(4);
89/// assert_eq!(alloc2.count, 4);
90/// ```
91#[derive(Debug)]
92pub struct BPool {
93    ptr: PoolPtr,
94    module_id: u8,
95    capacity: usize,
96    buf_size: usize,
97    lock: sync::Mutex<()>,
98    wait_cv: sync::Condvar,
99    close_cv: sync::Condvar,
100    head: atomic::AtomicUsize,
101    active: atomic::AtomicUsize,
102    next: Box<[atomic::AtomicUsize]>,
103}
104
105unsafe impl Send for BPool {}
106unsafe impl Sync for BPool {}
107
108impl BPool {
109    /// Create a new instance of [`BPool`]
110    ///
111    /// ## Params
112    ///
113    /// - `buf_size`: size of each buf in [`BPool`]
114    /// - `capacity`: capacity of [`BPool`] i.e. number of buffers in memory
115    /// - `module_id`: id used by [`FrozenErr`] for error propagation
116    ///
117    /// ## Example
118    ///
119    /// ```
120    /// const MODULE_ID: u8 = 0;
121    /// const CAPACITY: usize = 4;
122    /// const BUF_SIZE: usize = 0x20;
123    ///
124    /// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
125    ///
126    /// {
127    ///     // NOTE: allocations are RAII safe, hence the underlying resource is reused when `alloc` is dropped
128    ///     let alloc = pool.allocate(4);
129    ///     assert_eq!(alloc.count, 4);
130    /// }
131    ///
132    /// let alloc2 = pool.allocate(4);
133    /// assert_eq!(alloc2.count, 4);
134    /// ```
135    pub fn new(buf_size: usize, capacity: usize, module_id: u8) -> Self {
136        let pool_size = capacity * buf_size;
137        let mut pool = Vec::<u8>::with_capacity(pool_size);
138        let ptr = PoolPtr { ptr: pool.as_mut_ptr() };
139
140        // NOTE: `Vec::with_capacity(N)` allocates memory but keeps the len at 0. We use raw pointers
141        // to access different slots, if the len stays at 0, it'd create undefined behavior. Also, the
142        // reconstruct of vector from the pointer would become invalid. To avoid memory leaks, we
143        // reconstruct the vec from the pointer in the drop.
144        unsafe { pool.set_len(pool_size) };
145
146        // NOTE: When the `pool` is dropped, it'll free up the entire memory. This should not happen,
147        // as we own the underlying memory via mutable pointer, which is an implicit owenership, so we
148        // avoid destruction of `pool` when it goes out of scope.
149        std::mem::forget(pool);
150
151        let mut next = Vec::with_capacity(capacity);
152        for i in 0..capacity {
153            let _i = 1 + i;
154            let n = if _i < capacity { _i } else { INVALID_POOL_SLOT };
155            next.push(atomic::AtomicUsize::new(n));
156        }
157
158        Self {
159            ptr,
160            capacity,
161            buf_size,
162            module_id,
163            wait_cv: sync::Condvar::new(),
164            close_cv: sync::Condvar::new(),
165            lock: sync::Mutex::new(()),
166            next: next.into_boxed_slice(),
167            active: atomic::AtomicUsize::new(0),
168            head: atomic::AtomicUsize::new(pack_pool_idx(0, 0)),
169        }
170    }
171
172    /// Allocates `N` buffers for staging IO ops
173    ///
174    /// ## Calling Pattern
175    ///
176    /// ```txt
177    /// remaining = n
178    /// loop
179    ///     alloc = allocate(remaining)
180    ///     if alloc.count == 0
181    ///         wait()
182    ///         continue
183    ///
184    ///     remaining -= alloc.count
185    ///     if remaining == 0
186    ///         break
187    /// ```
188    ///
189    /// ## Polling
190    ///
191    /// This function may not allocate all the `N` required buffers in one call, so the caller must
192    /// poll (wait and retry) for remaining `N` buffers
193    ///
194    /// ## RAII Safety
195    ///
196    /// All [`BPool`] aloocations are RAII safe by default, hence when the variable which stores the result
197    /// of `allocate`, is dropped, the buffer's it holds are also automatically freed, the burden of _freeing after use_
198    /// does not fall on the caller
199    ///
200    /// ## Example
201    ///
202    /// ```
203    /// const MODULE_ID: u8 = 0;
204    /// const CAPACITY: usize = 4;
205    /// const BUF_SIZE: usize = 0x20;
206    ///
207    /// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
208    ///
209    /// let alloc1 = pool.allocate(4);
210    /// assert!(alloc1.count == 4);
211    ///
212    /// let alloc2 = pool.allocate(1);
213    /// assert!(alloc2.count == 0);
214    ///
215    /// drop(alloc1);
216    ///
217    /// let alloc3 = pool.allocate(1);
218    /// assert!(alloc3.count == 1);
219    /// ```
220    #[inline(always)]
221    pub fn allocate<'a>(&'a self, n: usize) -> Allocation<'a> {
222        let mut head = self.head.load(atomic::Ordering::Acquire);
223        let mut batch = Allocation::new(self, n);
224
225        loop {
226            let (idx, tag) = unpack_pool_idx(head);
227
228            // NOTE: If we reach the last entry (i.e. invalid ptr), we return early, despite not
229            // allocating all the required buffers
230            //
231            // This allows caller to process allocated buffers, and avoid busy waiting for
232            // more buffers
233            //
234            // The caller should pool to allocate, till all the required buffers are allocated
235            if idx == INVALID_POOL_SLOT {
236                return batch;
237            }
238
239            //
240            // local walk
241            //
242
243            let mut count = 1;
244            let mut last = idx;
245
246            while count < n {
247                // This is valid as `next` is already the index (unpacked version) of the slot
248                let next = self.next[last as usize].load(atomic::Ordering::Relaxed);
249                if next == INVALID_POOL_SLOT {
250                    break;
251                }
252
253                last = next;
254                count += 1;
255            }
256
257            let new_head_idx = self.next[last as usize].load(atomic::Ordering::Relaxed);
258            let new_head = pack_pool_idx(new_head_idx, 1 + tag);
259
260            match self
261                .head
262                .compare_exchange(head, new_head, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
263            {
264                Err(h) => head = h,
265                Ok(_) => {
266                    let mut cur = idx;
267                    for _ in 0..count {
268                        batch.slots.push(self.ptr.add((cur * self.buf_size) as u64));
269                        cur = self.next[cur as usize].load(atomic::Ordering::Relaxed);
270                    }
271
272                    batch.count = count;
273                    return batch;
274                }
275            }
276        }
277    }
278
279    #[inline(always)]
280    fn free(&self, ptr: &PoolPtr) {
281        let offset = self.ptr.offset_from(&ptr);
282        let idx = offset / self.buf_size;
283
284        let mut head = self.head.load(atomic::Ordering::Acquire);
285        loop {
286            let (head_idx, head_tag) = unpack_pool_idx(head);
287            self.next[idx as usize].store(head_idx, atomic::Ordering::Relaxed);
288            let new = pack_pool_idx(idx, 1 + head_tag);
289
290            match self
291                .head
292                .compare_exchange(head, new, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
293            {
294                Ok(_) => {
295                    self.wait_cv.notify_one();
296                    return;
297                }
298                Err(h) => head = h,
299            }
300        }
301    }
302
303    /// Block until at least one buffer becomes available
304    ///
305    /// This function is intended to be used when [`BPool::allocate`] returns less than requested bufs,
306    /// i.e. when pool is exhausted
307    ///
308    /// ## Polling
309    ///
310    /// The typical allocation pattern is,
311    ///
312    /// - Attempt allocation w/ [`BPool::allocate`]
313    /// - If less than requested bufs are allocated, call [`BPool::wait`]
314    /// - Retry the allocation; all within a loop
315    ///
316    /// ## Notes
317    ///
318    /// - The caller must retry [`BPool::allocate`] after calling [`BPool::wait`]
319    /// - Only threads waiting for buffers are blocked, the allocation fast path remains lock-free
320    ///
321    /// ## Example
322    ///
323    /// ```
324    /// const MODULE_ID: u8 = 0;
325    /// const CAPACITY: usize = 2;
326    /// const BUF_SIZE: usize = 0x20;
327    ///
328    /// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
329    ///
330    /// let alloc = pool.allocate(2);
331    /// assert_eq!(alloc.count, 2);
332    ///
333    /// let empty = pool.allocate(1);
334    /// assert_eq!(empty.count, 0);
335    ///
336    /// drop(alloc);
337    /// pool.wait().expect("wait failed");
338    ///
339    /// let alloc2 = pool.allocate(1);
340    /// assert_eq!(alloc2.count, 1);
341    /// ```
342    #[inline]
343    pub fn wait(&self) -> FrozenRes<()> {
344        if self.has_free() {
345            return Ok(());
346        }
347
348        let mut guard = self
349            .lock
350            .lock()
351            .map_err(|e| new_err_raw(BPoolErrRes::Lpn, e, self.module_id))?;
352
353        if self.has_free() {
354            return Ok(());
355        }
356
357        while !self.has_free() {
358            guard = self
359                .wait_cv
360                .wait(guard)
361                .map_err(|e| new_err_raw(BPoolErrRes::Lpn, e, self.module_id))?;
362        }
363
364        Ok(())
365    }
366
367    #[inline]
368    fn has_free(&self) -> bool {
369        let (idx, _) = unpack_pool_idx(self.head.load(atomic::Ordering::Acquire));
370        idx != INVALID_POOL_SLOT
371    }
372}
373
374impl Drop for BPool {
375    fn drop(&mut self) {
376        let mut guard = match self.lock.lock() {
377            Ok(g) => g,
378            Err(_) => return,
379        };
380
381        while self.active.load(atomic::Ordering::Acquire) != 0 {
382            guard = self.close_cv.wait(guard).expect("shutdown cv poisoned");
383        }
384
385        let pool_size = self.capacity * self.buf_size;
386
387        // NOTE: We re-construct original allocation from the stored pointer! This builds up the vecotor
388        // as it was created, which than is dropped by Rust destructor's automatically!
389        let _ = unsafe { Vec::from_raw_parts(self.ptr.ptr, pool_size, pool_size) };
390    }
391}
392
393/// Domain Id for [`BPool`] is **19**
394const ERRDOMAIN: u8 = 0x13;
395
396/// Error codes for [`BPool`]
397#[repr(u16)]
398pub enum BPoolErrRes {
399    /// (518) lock error (failed or poisoned)
400    Lpn = 0x301,
401}
402
403impl BPoolErrRes {
404    #[inline]
405    fn default_message(&self) -> &'static [u8] {
406        match self {
407            Self::Lpn => b"lock poisoned while waiting for BPool",
408        }
409    }
410}
411
412#[inline]
413fn new_err_raw<E: std::fmt::Display>(res: BPoolErrRes, error: E, mid: u8) -> FrozenErr {
414    let detail = res.default_message();
415    FrozenErr::new(
416        mid,
417        ERRDOMAIN,
418        res as u16,
419        detail,
420        error.to_string().as_bytes().to_vec(),
421    )
422}
423
424const POOL_IDX_BITS: usize = 0x20;
425const POOL_IDX_MASK: usize = (1 << POOL_IDX_BITS) - 1;
426
427#[inline]
428const fn pack_pool_idx(idx: usize, tag: usize) -> usize {
429    (tag << POOL_IDX_BITS) | (idx & POOL_IDX_MASK)
430}
431
432#[inline]
433const fn unpack_pool_idx(id: usize) -> (usize, usize) {
434    (id & POOL_IDX_MASK, id >> POOL_IDX_BITS)
435}
436
437type TPoolPtr = *mut u8;
438
439/// Pointer to buffer allocated by [`BPool::allocate`]
440#[derive(Debug, Clone, Eq, PartialEq)]
441pub struct PoolPtr {
442    /// Raw pointer to the start of a buffer owned by the pool, where the valid memory range is `[ptr, ptr + buf_size)`
443    pub ptr: TPoolPtr,
444}
445
446unsafe impl Send for PoolPtr {}
447unsafe impl Sync for PoolPtr {}
448
449impl PoolPtr {
450    #[inline]
451    fn add(&self, count: u64) -> Self {
452        Self {
453            ptr: unsafe { self.ptr.add(count as usize) },
454        }
455    }
456
457    #[inline]
458    fn offset_from(&self, ptr: &Self) -> usize {
459        unsafe { ptr.ptr.offset_from(self.ptr) as usize }
460    }
461}
462
463/// Buffer allocations allocated by [`BPool::allocate`]
464#[derive(Debug)]
465pub struct Allocation<'a> {
466    /// Number of buffers allocated, can be lower than the requested amount
467    pub count: usize,
468
469    /// Vector of [`PoolPtr`] objects, i.e. Raw buffer pointers
470    pub slots: Vec<PoolPtr>,
471
472    /// Guard to enforce RAII safety
473    guard: AllocationGuard<'a>,
474}
475
476impl<'a> Allocation<'a> {
477    #[inline]
478    fn new(pool: &'a BPool, cap: usize) -> Self {
479        pool.active.fetch_add(1, atomic::Ordering::Relaxed);
480
481        Self {
482            count: 0,
483            slots: Vec::<PoolPtr>::with_capacity(cap),
484            guard: AllocationGuard(pool),
485        }
486    }
487}
488
489impl<'a> Drop for Allocation<'a> {
490    fn drop(&mut self) {
491        for ptr in &self.slots {
492            self.guard.0.free(ptr);
493        }
494    }
495}
496
497#[derive(Debug)]
498struct AllocationGuard<'a>(&'a BPool);
499
500impl Drop for AllocationGuard<'_> {
501    fn drop(&mut self) {
502        if self.0.active.fetch_sub(1, atomic::Ordering::Release) == 1 {
503            // last user
504            if let Ok(_g) = self.0.lock.lock() {
505                self.0.close_cv.notify_one();
506            }
507        }
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use crate::error::TEST_MID;
515
516    const CAP: usize = 0x20;
517    const SIZE: usize = 0x0A;
518
519    fn new_pool() -> BPool {
520        BPool::new(SIZE, CAP, TEST_MID)
521    }
522
523    mod utils {
524        use super::*;
525
526        #[test]
527        fn pack_unpack_cycle() {
528            let pack_id = pack_pool_idx(0x20, 0x0A);
529            let (idx, tag) = unpack_pool_idx(pack_id);
530
531            assert_eq!(idx, 0x20);
532            assert_eq!(tag, 0x0A);
533        }
534    }
535
536    mod allocations {
537        use super::*;
538
539        #[test]
540        fn ok_alloc_works() {
541            let pool = new_pool();
542            let alloc = pool.allocate(1);
543
544            assert_eq!(alloc.count, 1);
545            assert_eq!(alloc.slots.len(), 1);
546        }
547
548        #[test]
549        fn ok_alloc_exact_cap_as_requested() {
550            let pool = new_pool();
551            let alloc = pool.allocate(CAP);
552
553            assert_eq!(alloc.count, CAP);
554            assert_eq!(alloc.slots.len(), CAP);
555        }
556
557        #[test]
558        fn ok_alloc_partial_when_exhausted() {
559            let pool = new_pool();
560
561            let a1 = pool.allocate(CAP - 1);
562            assert_eq!(a1.count, CAP - 1);
563
564            let a2 = pool.allocate(CAP);
565            assert_eq!(a2.count, 1);
566
567            let a3 = pool.allocate(1);
568            assert_eq!(a3.count, 0);
569        }
570
571        #[test]
572        fn ok_allocs_none_when_exhausted() {
573            let pool = new_pool();
574
575            let _a1 = pool.allocate(CAP);
576
577            let a2 = pool.allocate(1);
578            assert_eq!(a2.count, 0);
579        }
580
581        #[test]
582        fn ok_no_duplicate_slots_in_single_alloc() {
583            let pool = new_pool();
584
585            let alloc = pool.allocate(CAP);
586            let mut ptrs: Vec<_> = alloc.slots.iter().map(|s| s.ptr).collect();
587
588            // remove duplicates if any
589            ptrs.sort();
590            ptrs.dedup();
591
592            assert_eq!(ptrs.len(), CAP);
593        }
594    }
595
596    mod raii_safety {
597        use super::*;
598
599        #[test]
600        fn ok_auto_free_on_drop() {
601            let pool = new_pool();
602
603            {
604                let alloc = pool.allocate(CAP);
605                assert_eq!(alloc.count, CAP);
606            }
607
608            let alloc2 = pool.allocate(CAP);
609            assert_eq!(alloc2.count, CAP);
610        }
611    }
612
613    mod concurrency {
614        use super::*;
615        use std::sync::{Arc, Barrier};
616        use std::thread;
617
618        #[test]
619        fn ok_concurrent_alloc() {
620            const THREADS: usize = 8;
621            const ITERS: usize = 0x1000;
622
623            let barrier = Arc::new(Barrier::new(THREADS));
624            let pool = Arc::new(BPool::new(SIZE, CAP * 0x0A, TEST_MID));
625
626            let mut handles = Vec::new();
627            for _ in 0..THREADS {
628                let pool = pool.clone();
629                let barrier = barrier.clone();
630
631                handles.push(thread::spawn(move || {
632                    barrier.wait();
633
634                    for _ in 0..ITERS {
635                        let mut n = CAP / 2;
636                        while n != 0 {
637                            let alloc = pool.allocate(n);
638                            if alloc.count == 0 {
639                                pool.wait().expect("wait for free");
640                                continue;
641                            }
642
643                            n -= alloc.count;
644                        }
645                    }
646                }));
647            }
648
649            for h in handles {
650                assert!(h.join().is_ok());
651            }
652
653            let final_alloc = pool.allocate(CAP);
654            assert_eq!(final_alloc.count, CAP);
655        }
656    }
657
658    mod shutdown_safety {
659        use super::*;
660        use std::sync::Arc;
661
662        #[test]
663        fn drop_waits_for_active_allocations() {
664            let pool = Arc::new(BPool::new(SIZE, CAP * 0x0A, TEST_MID));
665            let pool2 = pool.clone();
666
667            let handle = std::thread::spawn(move || {
668                let alloc = pool2.allocate(4);
669                std::thread::sleep(std::time::Duration::from_millis(50));
670                drop(alloc);
671            });
672
673            // give the other thread time to allocate
674            std::thread::sleep(std::time::Duration::from_millis(10));
675
676            // this must block until alloc is dropped
677            drop(pool);
678
679            assert!(handle.join().is_ok());
680        }
681    }
682}