Skip to main content

frozen_core/
bufpool.rs

1//! A low-latency memory-budgeted buffer pool to manage fixed-sized buffer allocations
2//!
3//! ## Memory Allocations
4//!
5//! All the allocations are allocated using the global memory allocator as requested (on-the-fly).
6//!
7//! While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
8//! memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
9//!
10//! Memory layout structure,
11//!
12//! ```text
13//! allocation = [[buf0][buf1][buf2]]
14//! where,
15//!   - every buffer is of size `buffer_size`
16//!   - each buffer is pointed using `*mut u8`
17//! ```
18//!
19//! ## Backpressure
20//!
21//! Every allocation reserves a memory budget and is only allowed to allocate memory if enough
22//! budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
23//! space is available.
24//!
25//! When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
26//! deallocated while relaxing the budget and dropping the backpressure (if any).
27//!
28//! _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
29//! backpressure, as the waiting callers are awaken opportunistically.
30//!
31//! ## Benchmarks
32//!
33//! Observed measurements of latency and throughput,
34//!
35//! | Metric                       | Value              |
36//! |:-----------------------------|:-------------------|
37//! | Allocation Latency (avg)     | ~254 nanosecond    |
38//! | Allocation Throughput (avg)  | ~3.94 million/sec  |
39//!
40//! _NOTE:_ All measurements includes the complete RAII lifecycle (i.e. allocation + deallocation).
41//!
42//! Observed allocation latency for `N` buffers,
43//!
44//! | Buffers  | Latency  |
45//! |:---------|:---------|
46//! | 0x01     | 246 ns   |
47//! | 0x10     | 251 ns   |
48//! | 0x400    | 300 ns   |
49//!
50//! _INFO:_ As seen, the allocation latency stays near constant irrespective to the size of buffers
51//! and the allocated bytes.
52//!
53//! Environment used for benching,
54//!
55//! * OS: NixOS (WSL2)
56//! * Architecture: x86_64
57//! * Memory: 8 GiB RAM (DDR4)
58//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
59//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
60//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
61//!
62//! ## Example
63//!
64//! ```
65//! use frozen_core::{
66//!     bufpool::{BufPool, BufPoolCfg, BufferPointer},
67//!     utils::BufferSize,
68//! };
69//!
70//! const BUF_SIZE: BufferSize = BufferSize::S16;
71//!
72//! let pool = BufPool::new(BufPoolCfg {
73//!     buffer_size: BUF_SIZE,
74//!     max_memory: BUF_SIZE as usize * 0x40,
75//! });
76//!
77//! let alloc = pool.allocate(0x2A);
78//!
79//! assert_eq!(alloc.length(), 0x2A);
80//! assert!(!alloc.first().is_null());
81//! assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x2A);
82//!
83//! let ptrs: Vec<BufferPointer> = alloc.iter().collect();
84//! assert_eq!(ptrs.len(), 0x2A);
85//! ```
86
87use std::{alloc, ptr, sync, sync::atomic};
88
89/// An unsafe pointer to an individual in memory buffer
90///
91/// ## Safety
92///
93/// The pointer is untyped and uninitialized. Caller is responsible for:
94///
95/// * Writes stay within the bounds, while the size of each buffer is [`BufPoolCfg::buffer_size`]
96/// * Reads should only occur after initilization/write is completed on the buffer
97/// * The pointer must not outlive the lifetime of [`BufPoolAllocation`] object
98///
99/// ## Example
100///
101/// ```
102/// use frozen_core::{
103///     bufpool::{BufPool, BufPoolCfg},
104///     utils::BufferSize,
105/// };
106///
107/// const BUF_SIZE: BufferSize = BufferSize::S16;
108/// const BUFFER: [u8; BUF_SIZE as usize] = [1u8; BUF_SIZE as usize];
109///
110/// let pool = BufPool::new(BufPoolCfg {
111///     buffer_size: BUF_SIZE,
112///     max_memory: BUF_SIZE as usize * 0x0A,
113/// });
114///
115/// let alloc = pool.allocate(1);
116/// unsafe {
117///     std::ptr::copy_nonoverlapping(BUFFER.as_ptr(), alloc.first(), BUF_SIZE as usize);
118/// }
119/// ```
120pub type BufferPointer = *mut u8;
121
122/// All the available configrations for [`BufPool`]
123///
124/// ## Example
125///
126/// ```
127/// use frozen_core::{bufpool::BufPoolCfg, utils::BufferSize};
128///
129/// const BUF_SIZE: BufferSize = BufferSize::S64;
130///
131/// let cfg = BufPoolCfg {
132///     buffer_size: BUF_SIZE,
133///     max_memory: BUF_SIZE as usize * 0x1000,
134/// };
135///
136/// assert_ne!(cfg.max_memory, 0);
137/// assert!(cfg.max_memory > cfg.buffer_size.bytes());
138/// ```
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140pub struct BufPoolCfg {
141    /// Size (in bytes) of an indivdual buffer unit allocated
142    pub buffer_size: crate::utils::BufferSize,
143
144    /// Maximum allowed memory (in bytes) to be simultaneosuly allocated by [`BufPool`]
145    ///
146    /// _IMPORTANT:_ When trying to allocate more memory then [`BufPoolCfg::max_memory`] via
147    /// [`BufPool::allocate`], a deadlock will happen due to memory budgeting in place. The caller
148    /// must make sure the `max_meory` is high enough to avoid this scenerio.
149    ///
150    /// _NOTE:_ To avoid backpressure, set the `max_memory` to an arbitrary large value. This
151    /// would not have any direct impact on performance or resource usage, and will avoid
152    /// backpressure under heavy workload.
153    pub max_memory: usize,
154}
155
156/// An implementation of a low-latency memory-budgeted buffer pool managing fixed-sized buffer
157/// allocations
158///
159/// ## Blocking `drop`
160///
161/// Dropping the [`BufPool`] instance from memory blocks unitl all the allocated instances of
162/// [`BufPoolAllocations`] and all there references are dropped from memory.
163///
164/// This is in place to avoid memory leaks, as well as to enable sending [`BufPoolAllocation`]
165/// objects across threads while being tied to the lifecyle of [`BufPool`].
166///
167/// ## Memory Allocations
168///
169/// All the allocations are allocated using the global memory allocator as requested (on-the-fly).
170///
171/// While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
172/// memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
173///
174/// Memory layout structure,
175///
176/// ```text
177/// allocation = [[buf0][buf1][buf2]]
178/// where,
179///   - every buffer is of size `buffer_size`
180///   - each buffer is pointed using `*mut u8`
181/// ```
182///
183/// ## Backpressure
184///
185/// Every allocation reserves a memory budget and is only allowed to allocate memory if enough
186/// budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
187/// space is available.
188///
189/// When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
190/// deallocated while relaxing the budget and dropping the backpressure (if any).
191///
192/// _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
193/// backpressure, as the waiting callers are awaken opportunistically.
194///
195/// ## Example
196///
197/// ```
198/// use frozen_core::{
199///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
200///     utils::BufferSize,
201/// };
202///
203/// const BUF_SIZE: BufferSize = BufferSize::S16;
204///
205/// let pool = BufPool::new(BufPoolCfg {
206///     buffer_size: BUF_SIZE,
207///     max_memory: BUF_SIZE as usize * 0x40,
208/// });
209///
210/// let alloc = pool.allocate(0x30);
211///
212/// assert_eq!(alloc.length(), 0x30);
213/// assert!(!alloc.first().is_null());
214/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x30);
215///
216/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
217/// assert_eq!(ptrs.len(), 0x30);
218/// ```
219#[derive(Debug)]
220pub struct BufPool {
221    active_allocations: atomic::AtomicUsize,
222    allocation_cv: sync::Condvar,
223    allocation_lock: sync::Mutex<()>,
224    allocated_memory: atomic::AtomicUsize,
225    cfg: BufPoolCfg,
226    shutdown_cv: sync::Condvar,
227    shutdown_lock: sync::Mutex<()>,
228}
229
230unsafe impl Send for BufPool {}
231unsafe impl Sync for BufPool {}
232
233impl BufPool {
234    /// Create a new instance of [`BufPool`]
235    ///
236    /// ## Debug Assertions
237    ///
238    /// In debug builds, this function uses `debug_assertion` to prevent invalid configurations.
239    /// Caller must refer to [`BufPoolCfg`] for details about the config params.
240    ///
241    /// ## Example
242    ///
243    /// ```
244    /// use frozen_core::{
245    ///     bufpool::{BufPool, BufPoolCfg},
246    ///     utils::BufferSize,
247    /// };
248    ///
249    /// const BUF_SIZE: BufferSize = BufferSize::S16;
250    ///
251    /// let pool = BufPool::new(BufPoolCfg {
252    ///     buffer_size: BUF_SIZE,
253    ///     max_memory: BUF_SIZE as usize * 0x0A,
254    /// });
255    ///
256    /// let alloc = pool.allocate(1);
257    ///
258    /// assert_eq!(alloc.length(), 1);
259    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize);
260    /// ```
261    #[inline]
262    pub fn new(cfg: BufPoolCfg) -> Self {
263        // sanity check
264        debug_assert!(
265            cfg.buffer_size.bytes() < cfg.max_memory,
266            "MAX_MEMORY must always be larger than the BUFFER_SIZE"
267        );
268
269        Self {
270            cfg,
271            active_allocations: atomic::AtomicUsize::new(0),
272            allocated_memory: atomic::AtomicUsize::new(0),
273            allocation_cv: sync::Condvar::new(),
274            allocation_lock: sync::Mutex::new(()),
275            shutdown_cv: sync::Condvar::new(),
276            shutdown_lock: sync::Mutex::new(()),
277        }
278    }
279
280    /// Allocate `required` number of buffers each of [`BufPoolCfg::buffer_size`] size
281    ///
282    /// ## Allocation Layout
283    ///
284    /// The allocation is a large contineous slice of memory w/ size as
285    ///
286    /// ```text
287    /// BufPoolCfg::buffer_size.bytes() * n_buffers
288    /// ```
289    ///
290    /// Memory layout structure,
291    ///
292    /// ```text
293    /// allocation = [[buf0][buf1][buf2]]
294    /// where,
295    ///   - every buffer is of size `buffer_size`
296    ///   - each buffer is pointed using `*mut u8`
297    /// ```
298    ///
299    /// ## RAII Safety
300    ///
301    /// The allocation object, i.e. [`BufPoolAllocation`], is RAII safe. The allocated memory is
302    /// automatically deallocated as soon as the last reference to the object is dropped, while also
303    /// relaxing the memory budget to eliminate backpressure (if any).
304    ///
305    /// ## Important Considerations
306    ///
307    /// The number of buffers required should never exceed `u16::MAX`. This is an abstract soft
308    /// limit and should be enforced by the public interface to avoid any weird exhaustion issues
309    /// or unknown bugs across the storage system.
310    ///
311    /// As `u16::MAX` is large enough value to almost never cause any problems for a single write
312    /// operation, this soft limit acts as a guidline to safely operate with arithmatic operations
313    /// across storage engine(s), including but not limited to [`frozen_core`].
314    ///
315    /// ## Example
316    ///
317    /// ```
318    /// use frozen_core::{
319    ///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
320    ///     utils::BufferSize,
321    /// };
322    ///
323    /// const BUF_SIZE: BufferSize = BufferSize::S16;
324    ///
325    /// let pool = BufPool::new(BufPoolCfg {
326    ///     buffer_size: BUF_SIZE,
327    ///     max_memory: BUF_SIZE as usize * 0x14,
328    /// });
329    ///
330    /// let alloc = pool.allocate(0x0A);
331    ///
332    /// assert_eq!(alloc.length(), 0x0A);
333    /// assert!(!alloc.first().is_null());
334    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
335    ///
336    /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
337    /// assert_eq!(ptrs.len(), 0x0A);
338    /// ```
339    #[inline(always)]
340    pub fn allocate(&self, required: usize) -> BufPoolAllocation {
341        // sanity checks
342        debug_assert!(required > 0, "required buffers must never be 0");
343        debug_assert!(
344            required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
345            "Total required bytes must be smaller then the MAX_MEMORY allowed to avoid deadlock"
346        );
347        debug_assert!(
348            required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
349            "Total required bytes must never exceed `u16::MAX` to avoid arithmatic overflows"
350        );
351
352        let required_bytes = self.cfg.buffer_size.bytes() * required;
353        loop {
354            let current_bytes = self.allocated_memory.load(atomic::Ordering::Acquire);
355            if current_bytes + required_bytes > self.cfg.max_memory {
356                self.backpressure(required_bytes);
357                continue;
358            }
359
360            match self.allocated_memory.compare_exchange(
361                current_bytes,
362                current_bytes + required_bytes,
363                atomic::Ordering::AcqRel,
364                atomic::Ordering::Acquire,
365            ) {
366                Ok(_) => break,
367                Err(_) => continue,
368            }
369        }
370
371        let layout = create_layout(required_bytes);
372        let pointer = allocate_layout(layout);
373        self.active_allocations.fetch_add(1, atomic::Ordering::Relaxed);
374
375        BufPoolAllocation { layout, pointer, required_bytes, buffers: required, pool: ptr::NonNull::from(self) }
376    }
377
378    /// Applies backpressure until enough memory budget is available for the allocation
379    ///
380    /// ## Why we ignore [`std::sync::PoisonError`]
381    ///
382    /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
383    /// protect any mutable state. All the pool invariants and accounting are maintained via
384    /// atomics and are completely seperated from the mutex.
385    ///
386    /// A poisoned mutex only indicates that another tx panicked while holding the lock, and
387    /// indicates an inconsistent state of the protected value. Since no state can be left
388    /// partially modified under this lock, there is no possible consistency risk to recover from
389    /// and propagating the poison error would only introduce unnecessary failures into the
390    /// allocation path.
391    ///
392    /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
393    /// with the recovered guard.
394    #[inline]
395    fn backpressure(&self, required_bytes: usize) {
396        let mut guard = self.allocation_lock.lock().unwrap_or_else(|e| e.into_inner());
397        while self.allocated_memory.load(atomic::Ordering::Acquire) + required_bytes > self.cfg.max_memory {
398            guard = self.allocation_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
399        }
400    }
401}
402
403impl Drop for BufPool {
404    fn drop(&mut self) {
405        // NOTE: See [`BufPool::backpressure`] implementation for rationale behind poison recovery
406
407        let mut guard = self.shutdown_lock.lock().unwrap_or_else(|e| e.into_inner());
408        while self.active_allocations.load(atomic::Ordering::Acquire) != 0 {
409            guard = self.shutdown_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
410        }
411    }
412}
413
414/// A RAII safe allocation object containing allocated buffers
415///
416/// ## Lifetime
417///
418/// The object can/may outlive the scope that created it, while also being able to transfer across
419/// threads. As, internally the [`BufPool`] tracks all the active allocations and delays the drop
420/// until every allocation and all there references are dropped from memory.
421///
422/// ## Example
423///
424/// ```
425/// use frozen_core::{
426///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
427///     utils::BufferSize,
428/// };
429///
430/// const BUF_SIZE: BufferSize = BufferSize::S16;
431///
432/// let pool = BufPool::new(BufPoolCfg {
433///     buffer_size: BUF_SIZE,
434///     max_memory: BUF_SIZE as usize * 0x10,
435/// });
436///
437/// let alloc = pool.allocate(0x0A);
438///
439/// assert_eq!(alloc.length(), 0x0A);
440/// assert!(!alloc.first().is_null());
441/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
442///
443/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
444/// assert_eq!(ptrs.len(), 0x0A);
445/// ```
446#[derive(Debug)]
447pub struct BufPoolAllocation {
448    buffers: usize,
449    layout: alloc::Layout,
450    pointer: ptr::NonNull<u8>,
451    pool: ptr::NonNull<BufPool>,
452    required_bytes: usize,
453}
454
455unsafe impl Send for BufPoolAllocation {}
456
457impl BufPoolAllocation {
458    /// Returns a [`BufferPointer`] to the first buffer from the allocated list of buffers
459    ///
460    /// _NOTE:_ The returned [`BufferPointer`] can also be used as a _base_pointer_ to operate on
461    /// the entire allocated memory slice.
462    ///
463    /// ## Example
464    ///
465    /// ```
466    /// use frozen_core::{
467    ///     bufpool::{BufPool, BufPoolCfg},
468    ///     utils::BufferSize,
469    /// };
470    ///
471    /// const BUF_SIZE: BufferSize = BufferSize::S32;
472    ///
473    /// let pool = BufPool::new(BufPoolCfg {
474    ///     buffer_size: BUF_SIZE,
475    ///     max_memory: BUF_SIZE as usize * 0x0A,
476    /// });
477    ///
478    /// let alloc = pool.allocate(0x0A);
479    /// assert!(!alloc.first().is_null());
480    /// ```
481    #[inline]
482    pub const fn first(&self) -> BufferPointer {
483        self.pointer.as_ptr()
484    }
485
486    /// Returns the total number of allocated buffers
487    ///
488    /// _IMPORTANT:_ The returned value is always equal to the `required` value using while
489    /// calling [`BufPool::allocate`].
490    ///
491    /// ## Example
492    ///
493    /// ```
494    /// use frozen_core::{
495    ///     bufpool::{BufPool, BufPoolCfg},
496    ///     utils::BufferSize,
497    /// };
498    ///
499    /// const BUF_SIZE: BufferSize = BufferSize::S64;
500    ///
501    /// let pool = BufPool::new(BufPoolCfg {
502    ///     buffer_size: BUF_SIZE,
503    ///     max_memory: BUF_SIZE as usize * 0x0A,
504    /// });
505    ///
506    /// let alloc = pool.allocate(0x0A);
507    /// assert_eq!(alloc.length(), 0x0A);
508    /// ```
509    #[inline]
510    pub const fn length(&self) -> usize {
511        self.buffers
512    }
513
514    /// Returns the total number of bytes of memory allocated
515    ///
516    /// ## Example
517    ///
518    /// ```
519    /// use frozen_core::{
520    ///     bufpool::{BufPool, BufPoolCfg},
521    ///     utils::BufferSize,
522    /// };
523    ///
524    /// const BUF_SIZE: BufferSize = BufferSize::S16;
525    ///
526    /// let pool = BufPool::new(BufPoolCfg {
527    ///     buffer_size: BUF_SIZE,
528    ///     max_memory: BUF_SIZE as usize * 0x0A,
529    /// });
530    ///
531    /// let alloc = pool.allocate(0x0A);
532    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
533    /// ```
534    #[inline]
535    pub const fn allocated_bytes(&self) -> usize {
536        self.required_bytes
537    }
538
539    /// A custom [`Iterator`] implementation to enable iteration over the list of allocated buffers
540    /// from [`BufPoolAllocation`]
541    ///
542    /// _NOTE:_ Each yielded pointer refers to a unique individual buffer each of size
543    /// [`BufPoolCfg::buffer_size`].
544    ///
545    /// ## Example
546    ///
547    /// ```
548    /// use frozen_core::{
549    ///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
550    ///     utils::BufferSize,
551    /// };
552    ///
553    /// const BUF_SIZE: BufferSize = BufferSize::S16;
554    ///
555    /// let pool = BufPool::new(BufPoolCfg {
556    ///     buffer_size: BUF_SIZE,
557    ///     max_memory: BUF_SIZE as usize * 0x14,
558    /// });
559    ///
560    /// let alloc = pool.allocate(0x0A);
561    /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
562    ///
563    /// assert_eq!(ptrs.len(), 0x0A);
564    /// ```
565    #[inline]
566    pub fn iter(&self) -> BufPoolAllocationIter {
567        let pool = unsafe { self.pool.as_ref() };
568        BufPoolAllocationIter {
569            pointer: self.pointer,
570            buffer_size: pool.cfg.buffer_size.bytes(),
571            remaining: self.buffers,
572        }
573    }
574}
575
576impl Drop for BufPoolAllocation {
577    fn drop(&mut self) {
578        let pool = unsafe { self.pool.as_ref() };
579        deallocate_memory(self.pointer, self.layout);
580
581        pool.allocated_memory.fetch_sub(self.required_bytes, atomic::Ordering::Release);
582        pool.allocation_cv.notify_one();
583
584        if pool.active_allocations.fetch_sub(1, atomic::Ordering::Release) == 1 {
585            pool.shutdown_cv.notify_one();
586        }
587    }
588}
589
590/// A custom [`Iterator`] object to iterate over the list of allocated buffers
591///
592/// _NOTE:_ Buffers are yielded in allocation order and are backed by a single contiguous memory
593/// region.
594///
595/// ## Example
596///
597/// ```rs
598/// use frozen_core::{
599///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
600///     utils::BufferSize,
601/// };
602///
603/// const BUF_SIZE: BufferSize = BufferSize::S32;
604///
605/// let pool = BufPool::new(BufPoolCfg {
606///     buffer_size: BUF_SIZE,
607///     max_memory: BUF_SIZE as usize * 0x14,
608/// });
609///
610/// let alloc = pool.allocate(0x0A);
611/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
612///
613/// assert_eq!(ptrs.len(), 0x0A);
614/// ```
615#[derive(Debug)]
616pub struct BufPoolAllocationIter {
617    pointer: ptr::NonNull<u8>,
618    buffer_size: usize,
619    remaining: usize,
620}
621
622impl Iterator for BufPoolAllocationIter {
623    type Item = BufferPointer;
624
625    #[inline(always)]
626    fn next(&mut self) -> Option<Self::Item> {
627        if self.remaining == 0 {
628            return None;
629        }
630
631        let curr_ptr = self.pointer;
632
633        self.pointer = unsafe { self.pointer.add(self.buffer_size) };
634        self.remaining -= 1;
635
636        Some(curr_ptr.as_ptr())
637    }
638}
639
640/// Creates a array layout with given `capacity`
641///
642/// _NOTE:_ Use of `unwrap` is totally safe as the panic, if any, would be caught by unit tests and
643/// would be the indication of incorrect impl and not any runtime failures.
644#[inline]
645fn create_layout(required_bytes: usize) -> alloc::Layout {
646    match alloc::Layout::array::<u8>(required_bytes) {
647        Ok(layout) => layout,
648        Err(e) => panic!("Invalid Layout: {e}"),
649    }
650}
651
652/// Allocate a memory slice with given allocation `layout`
653///
654/// ## Allocation Failure
655///
656/// If the allocator is unable to satisfy the request (typically due to an OOM condition),
657/// [`alloc::alloc`] returns a null pointer.
658///
659/// In such cases we delegate to [`alloc::handle_alloc_error`], matching the behavior of std library
660/// types such as [`Vec`], [`Box`] and [`String`].
661///
662/// This path aborts the process and never returns. Allocation failures are therefore treated as
663/// fatal runtime conditions rather than recoverable errors.
664///
665/// Under normal operation this path should never be reached, as memory usage is expected to be
666/// bounded by the buffer pools memory budget and backpressure mechanisms.
667///
668/// ## Why not return `FrozenErr`
669///
670/// A null return from [`alloc::alloc`] indicates that the global allocator itself was unable to
671/// satisfy the request.
672///
673/// Delegating to [`alloc::handle_alloc_error`] matches the behavior of standard library containers
674/// and avoids continuing execution after a catastrophic allocator failure, where further
675/// allocations required for error handling, logging or recovery may also fail.
676#[inline]
677fn allocate_layout(layout: alloc::Layout) -> ptr::NonNull<u8> {
678    let pointer = unsafe { alloc::alloc(layout) };
679    match ptr::NonNull::new(pointer) {
680        Some(p) => p,
681        None => alloc::handle_alloc_error(layout),
682    }
683}
684
685/// Deallocate the manually allocated slice of memory with help of given `pointer` and mem `layout`
686#[inline]
687fn deallocate_memory(pointer: ptr::NonNull<u8>, layout: alloc::Layout) {
688    unsafe { alloc::dealloc(pointer.as_ptr(), layout) };
689}
690
691#[cfg(test)]
692mod tests {
693    use super::*;
694
695    const BUF_SIZE: crate::utils::BufferSize = crate::utils::BufferSize::S32;
696
697    #[inline]
698    fn create_bufpool(max_mem: usize) -> BufPool {
699        BufPool::new(BufPoolCfg { buffer_size: BUF_SIZE, max_memory: max_mem })
700    }
701
702    #[test]
703    #[should_panic]
704    #[cfg(debug_assertions)]
705    fn err_new_with_invalid_cfg() {
706        create_bufpool(BUF_SIZE.bytes() >> 1);
707    }
708
709    #[test]
710    #[should_panic]
711    #[cfg(debug_assertions)]
712    fn err_alloc_zero() {
713        let bpool = create_bufpool(BUF_SIZE.bytes());
714        let _ = bpool.allocate(0);
715    }
716
717    #[test]
718    #[should_panic]
719    #[cfg(debug_assertions)]
720    fn err_alloc_more_then_max_memory() {
721        let bpool = create_bufpool(BUF_SIZE.bytes());
722        let _ = bpool.allocate(2);
723    }
724
725    #[test]
726    fn ok_alloc_single() {
727        let bpool = create_bufpool(BUF_SIZE.bytes() * 2);
728        let alloc = bpool.allocate(1);
729
730        assert_eq!(alloc.buffers, 1);
731        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes());
732    }
733
734    #[test]
735    fn ok_alloc_multiple() {
736        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
737        let alloc = bpool.allocate(0x10);
738
739        assert_eq!(alloc.buffers, 0x10);
740        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x10);
741    }
742
743    #[test]
744    fn ok_alloc_max_memory() {
745        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
746        let alloc = bpool.allocate(0x0A);
747
748        assert_eq!(alloc.buffers, 0x0A);
749        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x0A);
750    }
751
752    #[test]
753    fn ok_alloc_updates_memory_accounting() {
754        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
755        let alloc = bpool.allocate(0x10);
756
757        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 0x10);
758        drop(alloc);
759        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
760    }
761
762    #[test]
763    fn ok_alloc_updates_active_allocation_tracking() {
764        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x2A);
765
766        let alloc1 = bpool.allocate(0x10);
767        let alloc2 = bpool.allocate(0x10);
768
769        assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 2);
770        let _ = (drop(alloc1), drop(alloc2));
771        assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 0);
772    }
773
774    #[test]
775    fn ok_alloc_decrments_allocated_memory_after_deallocations() {
776        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x80);
777        let allocations: Vec<_> = (0..0x20).map(|_| bpool.allocate(2)).collect();
778
779        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0x20 * 0x40);
780        drop(allocations);
781        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
782    }
783
784    #[test]
785    fn ok_backpressure_blocks_till_memory_is_deallocated() {
786        let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
787        let alloc = bpool.allocate(1);
788
789        let pool2 = bpool.clone();
790        let barrier = sync::Arc::new(sync::Barrier::new(2));
791        let barrier2 = barrier.clone();
792
793        let handle = std::thread::spawn(move || {
794            barrier2.wait();
795
796            let start = std::time::Instant::now();
797            let _alloc = pool2.allocate(2);
798
799            start.elapsed()
800        });
801
802        barrier.wait();
803
804        std::thread::sleep(std::time::Duration::from_millis(100));
805        drop(alloc);
806
807        let elapsed = handle.join().expect("allocation thread should not panic");
808        assert!(elapsed >= std::time::Duration::from_millis(100));
809    }
810
811    #[test]
812    fn ok_concurrent_allocations() {
813        let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1000));
814
815        let mut handles = Vec::new();
816        for _ in 0..0x0A {
817            let pool = pool.clone();
818
819            handles.push(std::thread::spawn(move || {
820                for _ in 0..0x64 {
821                    drop(pool.allocate(1));
822                }
823            }));
824        }
825
826        for h in handles {
827            h.join().unwrap();
828        }
829
830        assert_eq!(pool.allocated_memory.load(atomic::Ordering::Acquire), 0);
831        assert_eq!(pool.active_allocations.load(atomic::Ordering::Acquire), 0);
832    }
833
834    mod drop {
835        use super::*;
836
837        #[test]
838        fn ok_partial_drop_updates_accounting() {
839            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
840
841            let alloc1 = bpool.allocate(2);
842            let alloc2 = bpool.allocate(2);
843
844            assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 4);
845            drop(alloc1);
846
847            assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 2);
848            drop(alloc2);
849
850            assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
851        }
852
853        #[test]
854        fn ok_drop_waits_for_active_allocations() {
855            let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1A));
856            let alloc = bpool.allocate(0x10);
857
858            let handle = std::thread::spawn(move || {
859                drop(bpool);
860            });
861
862            std::thread::sleep(std::time::Duration::from_millis(0x64));
863            assert!(!handle.is_finished());
864            drop(alloc);
865
866            handle.join().unwrap();
867        }
868    }
869
870    mod memory_tests {
871        use super::*;
872
873        #[test]
874        fn ok_create_layout() {
875            let layout = create_layout(0x1000);
876
877            assert_eq!(layout.align(), 1);
878            assert_eq!(layout.size(), 0x1000);
879        }
880
881        #[test]
882        #[should_panic(expected = "Invalid Layout")]
883        fn err_create_layout() {
884            create_layout(usize::MAX);
885        }
886
887        #[test]
888        fn ok_allocate_layout() {
889            let layout = create_layout(0x10);
890            let pointer = allocate_layout(layout);
891            let raw_ptr = pointer.as_ptr();
892
893            assert!(!raw_ptr.is_null());
894            deallocate_memory(pointer, layout);
895        }
896
897        #[test]
898        fn ok_allocate_layout_allows_write() {
899            let layout = create_layout(0x80);
900            let pointer = allocate_layout(layout);
901
902            unsafe {
903                pointer.as_ptr().write(0x40);
904                assert_eq!(pointer.as_ptr().read(), 0x40);
905            }
906
907            deallocate_memory(pointer, layout);
908        }
909
910        #[test]
911        fn ok_allocate_layout_allows_write_to_entire_slice() {
912            let layout = create_layout(0x200);
913            let pointer = allocate_layout(layout);
914
915            unsafe {
916                for i in 0..0x200 {
917                    pointer.as_ptr().add(i).write((i % 0xFF) as u8);
918                }
919
920                for i in 0..0x200 {
921                    assert_eq!(pointer.as_ptr().add(i).read(), (i % 0xFF) as u8);
922                }
923            }
924
925            deallocate_memory(pointer, layout);
926        }
927    }
928
929    mod alloc_struct {
930        use super::*;
931
932        #[test]
933        fn ok_first_returns_ptr_to_first_buf_from_alloc() {
934            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
935            let alloc = bpool.allocate(0x10);
936
937            assert_eq!(alloc.first(), alloc.pointer.as_ptr());
938        }
939
940        #[test]
941        fn ok_length_returns_length_of_alloc() {
942            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
943            let alloc = bpool.allocate(0x10);
944
945            assert_eq!(alloc.length(), alloc.buffers);
946        }
947
948        #[test]
949        fn ok_allocated_bytes_return_total_allocated_bytes() {
950            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
951            let alloc = bpool.allocate(0x10);
952
953            assert_eq!(alloc.allocated_bytes(), alloc.buffers * BUF_SIZE.bytes());
954        }
955
956        #[test]
957        fn ok_alloc_can_be_shared_across_threads() {
958            let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
959            let alloc = pool.allocate(1);
960
961            std::thread::spawn(move || {
962                drop(alloc);
963            })
964            .join()
965            .unwrap();
966        }
967    }
968
969    mod iterator {
970        use super::*;
971
972        #[test]
973        fn ok_iter_yeilds_all_buffers() {
974            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
975            let alloc = bpool.allocate(4);
976
977            let ptrs: Vec<_> = alloc.iter().collect();
978            assert_eq!(ptrs.len(), 4);
979
980            assert_eq!(ptrs[1] as usize - ptrs[0] as usize, 0x20);
981            assert_eq!(ptrs[2] as usize - ptrs[1] as usize, 0x20);
982            assert_eq!(ptrs[3] as usize - ptrs[2] as usize, 0x20);
983        }
984
985        #[test]
986        fn ok_iter_yeilds_none_when_exhausted() {
987            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
988            let alloc = bpool.allocate(2);
989            let mut iter = alloc.iter();
990
991            assert!(iter.next().is_some());
992            assert!(iter.next().is_some());
993            assert!(iter.next().is_none());
994            assert!(iter.next().is_none());
995        }
996    }
997}