frozen_core/bpool.rs
1//! Lock-free buffer pool used for staging IO buffers
2//!
3//! ## Features
4//!
5//! - *RAII Safe*
6//! - *Graceful Shutdown*
7//! - *Lock-free fast path*
8//!
9//! ## Pooling for allocations
10//!
11//! Bufs are allocated in batches using [`BPool::allocate`], it may allocate fewer than requested, in such cases
12//! caller should wait using [`BPool::wait`] which block till any bufs are available to use again
13//!
14//! ## Example
15//!
16//! ```
17//! use frozen_core::bpool::BPool;
18//! use std::sync::Arc;
19//! use std::thread;
20//!
21//! const MODULE_ID: u8 = 0;
22//! const CAPACITY: usize = 8;
23//! const BUF_SIZE: usize = 0x20;
24//!
25//! let pool = Arc::new(BPool::new(BUF_SIZE, CAPACITY, MODULE_ID));
26//! let mut handles = Vec::new();
27//!
28//! for _ in 0..4 {
29//! let pool = pool.clone();
30//! handles.push(thread::spawn(move || {
31//! for _ in 0..0x80 {
32//! let mut n = 2;
33//! while n != 0 {
34//! let alloc = pool.allocate(n);
35//!
36//! // pool when not all bufs are allocated
37//! if alloc.count == 0 {
38//! pool.wait().expect("wait failed");
39//! continue;
40//! }
41//!
42//! n -= alloc.count;
43//! }
44//!
45//! // NOTE: allocated bufs are freed automatically when `alloc` drops
46//! }
47//! }));
48//! }
49//!
50//! for h in handles {
51//! h.join().unwrap();
52//! }
53//! ```
54
55use crate::error::{FrozenErr, FrozenRes};
56use std::sync::{self, atomic};
57
58const INVALID_POOL_SLOT: usize = u32::MAX as usize;
59
60/// Lock-free buffer pool used for staging IO buffers
61///
62/// ## Features
63///
64/// - *RAII Safe*
65/// - *Graceful Shutdown*
66/// - *Lock-free fast path*
67///
68/// ## Pooling for allocations
69///
70/// Bufs are allocated in batches using [`BPool::allocate`], it may allocate fewer than requested, in such cases
71/// caller should wait using [`BPool::wait`] which block till any bufs are available to use again
72///
73/// ## Example
74///
75/// ```
76/// const MODULE_ID: u8 = 0;
77/// const CAPACITY: usize = 4;
78/// const BUF_SIZE: usize = 0x20;
79///
80/// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
81///
82/// {
83/// // NOTE: allocations are RAII safe, hence the underlying resource is reused when `alloc` is dropped
84/// let alloc = pool.allocate(4);
85/// assert_eq!(alloc.count, 4);
86/// }
87///
88/// let alloc2 = pool.allocate(4);
89/// assert_eq!(alloc2.count, 4);
90/// ```
91#[derive(Debug)]
92pub struct BPool {
93 ptr: PoolPtr,
94 module_id: u8,
95 capacity: usize,
96 buf_size: usize,
97 lock: sync::Mutex<()>,
98 wait_cv: sync::Condvar,
99 close_cv: sync::Condvar,
100 head: atomic::AtomicUsize,
101 active: atomic::AtomicUsize,
102 next: Box<[atomic::AtomicUsize]>,
103}
104
105unsafe impl Send for BPool {}
106unsafe impl Sync for BPool {}
107
108impl BPool {
109 /// Create a new instance of [`BPool`]
110 ///
111 /// ## Params
112 ///
113 /// - `buf_size`: size of each buf in [`BPool`]
114 /// - `capacity`: capacity of [`BPool`] i.e. number of buffers in memory
115 /// - `module_id`: id used by [`FrozenErr`] for error propagation
116 ///
117 /// ## Example
118 ///
119 /// ```
120 /// const MODULE_ID: u8 = 0;
121 /// const CAPACITY: usize = 4;
122 /// const BUF_SIZE: usize = 0x20;
123 ///
124 /// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
125 ///
126 /// {
127 /// // NOTE: allocations are RAII safe, hence the underlying resource is reused when `alloc` is dropped
128 /// let alloc = pool.allocate(4);
129 /// assert_eq!(alloc.count, 4);
130 /// }
131 ///
132 /// let alloc2 = pool.allocate(4);
133 /// assert_eq!(alloc2.count, 4);
134 /// ```
135 pub fn new(buf_size: usize, capacity: usize, module_id: u8) -> Self {
136 let pool_size = capacity * buf_size;
137 let mut pool = Vec::<u8>::with_capacity(pool_size);
138 let ptr = PoolPtr { ptr: pool.as_mut_ptr() };
139
140 // NOTE: `Vec::with_capacity(N)` allocates memory but keeps the len at 0. We use raw pointers
141 // to access different slots, if the len stays at 0, it'd create undefined behavior. Also, the
142 // reconstruct of vector from the pointer would become invalid. To avoid memory leaks, we
143 // reconstruct the vec from the pointer in the drop.
144 unsafe { pool.set_len(pool_size) };
145
146 // NOTE: When the `pool` is dropped, it'll free up the entire memory. This should not happen,
147 // as we own the underlying memory via mutable pointer, which is an implicit owenership, so we
148 // avoid destruction of `pool` when it goes out of scope.
149 std::mem::forget(pool);
150
151 let mut next = Vec::with_capacity(capacity);
152 for i in 0..capacity {
153 let _i = 1 + i;
154 let n = if _i < capacity { _i } else { INVALID_POOL_SLOT };
155 next.push(atomic::AtomicUsize::new(n));
156 }
157
158 Self {
159 ptr,
160 capacity,
161 buf_size,
162 module_id,
163 wait_cv: sync::Condvar::new(),
164 close_cv: sync::Condvar::new(),
165 lock: sync::Mutex::new(()),
166 next: next.into_boxed_slice(),
167 active: atomic::AtomicUsize::new(0),
168 head: atomic::AtomicUsize::new(pack_pool_idx(0, 0)),
169 }
170 }
171
172 /// Allocates `N` buffers for staging IO ops
173 ///
174 /// ## Calling Pattern
175 ///
176 /// ```txt
177 /// remaining = n
178 /// loop
179 /// alloc = allocate(remaining)
180 /// if alloc.count == 0
181 /// wait()
182 /// continue
183 ///
184 /// remaining -= alloc.count
185 /// if remaining == 0
186 /// break
187 /// ```
188 ///
189 /// ## Polling
190 ///
191 /// This function may not allocate all the `N` required buffers in one call, so the caller must
192 /// poll (wait and retry) for remaining `N` buffers
193 ///
194 /// ## RAII Safety
195 ///
196 /// All [`BPool`] aloocations are RAII safe by default, hence when the variable which stores the result
197 /// of `allocate`, is dropped, the buffer's it holds are also automatically freed, the burden of _freeing after use_
198 /// does not fall on the caller
199 ///
200 /// ## Example
201 ///
202 /// ```
203 /// const MODULE_ID: u8 = 0;
204 /// const CAPACITY: usize = 4;
205 /// const BUF_SIZE: usize = 0x20;
206 ///
207 /// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
208 ///
209 /// let alloc1 = pool.allocate(4);
210 /// assert!(alloc1.count == 4);
211 ///
212 /// let alloc2 = pool.allocate(1);
213 /// assert!(alloc2.count == 0);
214 ///
215 /// drop(alloc1);
216 ///
217 /// let alloc3 = pool.allocate(1);
218 /// assert!(alloc3.count == 1);
219 /// ```
220 #[inline(always)]
221 pub fn allocate<'a>(&'a self, n: usize) -> Allocation<'a> {
222 let mut head = self.head.load(atomic::Ordering::Acquire);
223 let mut batch = Allocation::new(self, n);
224
225 loop {
226 let (idx, tag) = unpack_pool_idx(head);
227
228 // NOTE: If we reach the last entry (i.e. invalid ptr), we return early, despite not
229 // allocating all the required buffers
230 //
231 // This allows caller to process allocated buffers, and avoid busy waiting for
232 // more buffers
233 //
234 // The caller should pool to allocate, till all the required buffers are allocated
235 if idx == INVALID_POOL_SLOT {
236 return batch;
237 }
238
239 //
240 // local walk
241 //
242
243 let mut count = 1;
244 let mut last = idx;
245
246 while count < n {
247 // This is valid as `next` is already the index (unpacked version) of the slot
248 let next = self.next[last as usize].load(atomic::Ordering::Relaxed);
249 if next == INVALID_POOL_SLOT {
250 break;
251 }
252
253 last = next;
254 count += 1;
255 }
256
257 let new_head_idx = self.next[last as usize].load(atomic::Ordering::Relaxed);
258 let new_head = pack_pool_idx(new_head_idx, 1 + tag);
259
260 match self
261 .head
262 .compare_exchange(head, new_head, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
263 {
264 Err(h) => head = h,
265 Ok(_) => {
266 let mut cur = idx;
267 for _ in 0..count {
268 batch.slots.push(self.ptr.add((cur * self.buf_size) as u64));
269 cur = self.next[cur as usize].load(atomic::Ordering::Relaxed);
270 }
271
272 batch.count = count;
273 return batch;
274 }
275 }
276 }
277 }
278
279 #[inline(always)]
280 fn free(&self, ptr: &PoolPtr) {
281 let offset = self.ptr.offset_from(&ptr);
282 let idx = offset / self.buf_size;
283
284 let mut head = self.head.load(atomic::Ordering::Acquire);
285 loop {
286 let (head_idx, head_tag) = unpack_pool_idx(head);
287 self.next[idx as usize].store(head_idx, atomic::Ordering::Relaxed);
288 let new = pack_pool_idx(idx, 1 + head_tag);
289
290 match self
291 .head
292 .compare_exchange(head, new, atomic::Ordering::AcqRel, atomic::Ordering::Acquire)
293 {
294 Ok(_) => {
295 self.wait_cv.notify_one();
296 return;
297 }
298 Err(h) => head = h,
299 }
300 }
301 }
302
303 /// Block until at least one buffer becomes available
304 ///
305 /// This function is intended to be used when [`BPool::allocate`] returns less than requested bufs,
306 /// i.e. when pool is exhausted
307 ///
308 /// ## Polling
309 ///
310 /// The typical allocation pattern is,
311 ///
312 /// - Attempt allocation w/ [`BPool::allocate`]
313 /// - If less than requested bufs are allocated, call [`BPool::wait`]
314 /// - Retry the allocation; all within a loop
315 ///
316 /// ## Notes
317 ///
318 /// - The caller must retry [`BPool::allocate`] after calling [`BPool::wait`]
319 /// - Only threads waiting for buffers are blocked, the allocation fast path remains lock-free
320 ///
321 /// ## Example
322 ///
323 /// ```
324 /// const MODULE_ID: u8 = 0;
325 /// const CAPACITY: usize = 2;
326 /// const BUF_SIZE: usize = 0x20;
327 ///
328 /// let pool = frozen_core::bpool::BPool::new(BUF_SIZE, CAPACITY, MODULE_ID);
329 ///
330 /// let alloc = pool.allocate(2);
331 /// assert_eq!(alloc.count, 2);
332 ///
333 /// let empty = pool.allocate(1);
334 /// assert_eq!(empty.count, 0);
335 ///
336 /// drop(alloc);
337 /// pool.wait().expect("wait failed");
338 ///
339 /// let alloc2 = pool.allocate(1);
340 /// assert_eq!(alloc2.count, 1);
341 /// ```
342 #[inline]
343 pub fn wait(&self) -> FrozenRes<()> {
344 if self.has_free() {
345 return Ok(());
346 }
347
348 let mut guard = self
349 .lock
350 .lock()
351 .map_err(|e| new_err_raw(BPoolErrRes::Lpn, e, self.module_id))?;
352
353 if self.has_free() {
354 return Ok(());
355 }
356
357 while !self.has_free() {
358 guard = self
359 .wait_cv
360 .wait(guard)
361 .map_err(|e| new_err_raw(BPoolErrRes::Lpn, e, self.module_id))?;
362 }
363
364 Ok(())
365 }
366
367 #[inline]
368 fn has_free(&self) -> bool {
369 let (idx, _) = unpack_pool_idx(self.head.load(atomic::Ordering::Acquire));
370 idx != INVALID_POOL_SLOT
371 }
372}
373
374impl Drop for BPool {
375 fn drop(&mut self) {
376 let mut guard = match self.lock.lock() {
377 Ok(g) => g,
378 Err(_) => return,
379 };
380
381 while self.active.load(atomic::Ordering::Acquire) != 0 {
382 guard = self.close_cv.wait(guard).expect("shutdown cv poisoned");
383 }
384
385 let pool_size = self.capacity * self.buf_size;
386
387 // NOTE: We re-construct original allocation from the stored pointer! This builds up the vecotor
388 // as it was created, which than is dropped by Rust destructor's automatically!
389 let _ = unsafe { Vec::from_raw_parts(self.ptr.ptr, pool_size, pool_size) };
390 }
391}
392
393/// Domain Id for [`BPool`] is **19**
394const ERRDOMAIN: u8 = 0x13;
395
396/// Error codes for [`BPool`]
397#[repr(u16)]
398pub enum BPoolErrRes {
399 /// (518) lock error (failed or poisoned)
400 Lpn = 0x301,
401}
402
403impl BPoolErrRes {
404 #[inline]
405 fn default_message(&self) -> &'static [u8] {
406 match self {
407 Self::Lpn => b"lock poisoned while waiting for BPool",
408 }
409 }
410}
411
412#[inline]
413fn new_err_raw<E: std::fmt::Display>(res: BPoolErrRes, error: E, mid: u8) -> FrozenErr {
414 let detail = res.default_message();
415 FrozenErr::new(
416 mid,
417 ERRDOMAIN,
418 res as u16,
419 detail,
420 error.to_string().as_bytes().to_vec(),
421 )
422}
423
424const POOL_IDX_BITS: usize = 0x20;
425const POOL_IDX_MASK: usize = (1 << POOL_IDX_BITS) - 1;
426
427#[inline]
428const fn pack_pool_idx(idx: usize, tag: usize) -> usize {
429 (tag << POOL_IDX_BITS) | (idx & POOL_IDX_MASK)
430}
431
432#[inline]
433const fn unpack_pool_idx(id: usize) -> (usize, usize) {
434 (id & POOL_IDX_MASK, id >> POOL_IDX_BITS)
435}
436
437type TPoolPtr = *mut u8;
438
439/// Pointer to buffer allocated by [`BPool::allocate`]
440#[derive(Debug, Clone, Eq, PartialEq)]
441pub struct PoolPtr {
442 /// Raw pointer to the start of a buffer owned by the pool, where the valid memory range is `[ptr, ptr + buf_size)`
443 pub ptr: TPoolPtr,
444}
445
446unsafe impl Send for PoolPtr {}
447unsafe impl Sync for PoolPtr {}
448
449impl PoolPtr {
450 #[inline]
451 fn add(&self, count: u64) -> Self {
452 Self {
453 ptr: unsafe { self.ptr.add(count as usize) },
454 }
455 }
456
457 #[inline]
458 fn offset_from(&self, ptr: &Self) -> usize {
459 unsafe { ptr.ptr.offset_from(self.ptr) as usize }
460 }
461}
462
463/// Buffer allocations allocated by [`BPool::allocate`]
464#[derive(Debug)]
465pub struct Allocation<'a> {
466 /// Number of buffers allocated, can be lower than the requested amount
467 pub count: usize,
468
469 /// Vector of [`PoolPtr`] objects, i.e. Raw buffer pointers
470 pub slots: Vec<PoolPtr>,
471
472 /// Guard to enforce RAII safety
473 guard: AllocationGuard<'a>,
474}
475
476impl<'a> Allocation<'a> {
477 #[inline]
478 fn new(pool: &'a BPool, cap: usize) -> Self {
479 pool.active.fetch_add(1, atomic::Ordering::Relaxed);
480
481 Self {
482 count: 0,
483 slots: Vec::<PoolPtr>::with_capacity(cap),
484 guard: AllocationGuard(pool),
485 }
486 }
487}
488
489impl<'a> Drop for Allocation<'a> {
490 fn drop(&mut self) {
491 for ptr in &self.slots {
492 self.guard.0.free(ptr);
493 }
494 }
495}
496
497#[derive(Debug)]
498struct AllocationGuard<'a>(&'a BPool);
499
500impl Drop for AllocationGuard<'_> {
501 fn drop(&mut self) {
502 if self.0.active.fetch_sub(1, atomic::Ordering::Release) == 1 {
503 // last user
504 if let Ok(_g) = self.0.lock.lock() {
505 self.0.close_cv.notify_one();
506 }
507 }
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use crate::error::TEST_MID;
515
516 const CAP: usize = 0x20;
517 const SIZE: usize = 0x0A;
518
519 fn new_pool() -> BPool {
520 BPool::new(SIZE, CAP, TEST_MID)
521 }
522
523 mod utils {
524 use super::*;
525
526 #[test]
527 fn pack_unpack_cycle() {
528 let pack_id = pack_pool_idx(0x20, 0x0A);
529 let (idx, tag) = unpack_pool_idx(pack_id);
530
531 assert_eq!(idx, 0x20);
532 assert_eq!(tag, 0x0A);
533 }
534 }
535
536 mod allocations {
537 use super::*;
538
539 #[test]
540 fn ok_alloc_works() {
541 let pool = new_pool();
542 let alloc = pool.allocate(1);
543
544 assert_eq!(alloc.count, 1);
545 assert_eq!(alloc.slots.len(), 1);
546 }
547
548 #[test]
549 fn ok_alloc_exact_cap_as_requested() {
550 let pool = new_pool();
551 let alloc = pool.allocate(CAP);
552
553 assert_eq!(alloc.count, CAP);
554 assert_eq!(alloc.slots.len(), CAP);
555 }
556
557 #[test]
558 fn ok_alloc_partial_when_exhausted() {
559 let pool = new_pool();
560
561 let a1 = pool.allocate(CAP - 1);
562 assert_eq!(a1.count, CAP - 1);
563
564 let a2 = pool.allocate(CAP);
565 assert_eq!(a2.count, 1);
566
567 let a3 = pool.allocate(1);
568 assert_eq!(a3.count, 0);
569 }
570
571 #[test]
572 fn ok_allocs_none_when_exhausted() {
573 let pool = new_pool();
574
575 let _a1 = pool.allocate(CAP);
576
577 let a2 = pool.allocate(1);
578 assert_eq!(a2.count, 0);
579 }
580
581 #[test]
582 fn ok_no_duplicate_slots_in_single_alloc() {
583 let pool = new_pool();
584
585 let alloc = pool.allocate(CAP);
586 let mut ptrs: Vec<_> = alloc.slots.iter().map(|s| s.ptr).collect();
587
588 // remove duplicates if any
589 ptrs.sort();
590 ptrs.dedup();
591
592 assert_eq!(ptrs.len(), CAP);
593 }
594 }
595
596 mod raii_safety {
597 use super::*;
598
599 #[test]
600 fn ok_auto_free_on_drop() {
601 let pool = new_pool();
602
603 {
604 let alloc = pool.allocate(CAP);
605 assert_eq!(alloc.count, CAP);
606 }
607
608 let alloc2 = pool.allocate(CAP);
609 assert_eq!(alloc2.count, CAP);
610 }
611 }
612
613 mod concurrency {
614 use super::*;
615 use std::sync::{Arc, Barrier};
616 use std::thread;
617
618 #[test]
619 fn ok_concurrent_alloc() {
620 const THREADS: usize = 8;
621 const ITERS: usize = 0x1000;
622
623 let barrier = Arc::new(Barrier::new(THREADS));
624 let pool = Arc::new(BPool::new(SIZE, CAP * 0x0A, TEST_MID));
625
626 let mut handles = Vec::new();
627 for _ in 0..THREADS {
628 let pool = pool.clone();
629 let barrier = barrier.clone();
630
631 handles.push(thread::spawn(move || {
632 barrier.wait();
633
634 for _ in 0..ITERS {
635 let mut n = CAP / 2;
636 while n != 0 {
637 let alloc = pool.allocate(n);
638 if alloc.count == 0 {
639 pool.wait().expect("wait for free");
640 continue;
641 }
642
643 n -= alloc.count;
644 }
645 }
646 }));
647 }
648
649 for h in handles {
650 assert!(h.join().is_ok());
651 }
652
653 let final_alloc = pool.allocate(CAP);
654 assert_eq!(final_alloc.count, CAP);
655 }
656 }
657
658 mod shutdown_safety {
659 use super::*;
660 use std::sync::Arc;
661
662 #[test]
663 fn drop_waits_for_active_allocations() {
664 let pool = Arc::new(BPool::new(SIZE, CAP * 0x0A, TEST_MID));
665 let pool2 = pool.clone();
666
667 let handle = std::thread::spawn(move || {
668 let alloc = pool2.allocate(4);
669 std::thread::sleep(std::time::Duration::from_millis(50));
670 drop(alloc);
671 });
672
673 // give the other thread time to allocate
674 std::thread::sleep(std::time::Duration::from_millis(10));
675
676 // this must block until alloc is dropped
677 drop(pool);
678
679 assert!(handle.join().is_ok());
680 }
681 }
682}