Skip to main content

frozen_core/
bufpool.rs

1//! A low-latency memory-budgeted buffer pool to manage fixed-sized buffer allocations
2//!
3//! ## Memory Allocations
4//!
5//! All the allocations are allocated using the global memory allocator as requested (on-the-fly).
6//!
7//! While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
8//! memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
9//!
10//! Memory layout structure,
11//!
12//! ```text
13//! allocation = [[buf0][buf1][buf2]]
14//! where,
15//!   - every buffer is of size `buffer_size`
16//!   - each buffer is pointed using `*mut u8`
17//! ```
18//!
19//! ## Backpressure
20//!
21//! Every allocation reserves a memory budget and is only allowed to allocate memory if enough
22//! budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
23//! space is available.
24//!
25//! When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
26//! deallocated while relaxing the budget and dropping the backpressure (if any).
27//!
28//! _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
29//! backpressure, as the waiting callers are awaken opportunistically.
30//!
31//! ## Benchmarks
32//!
33//! Observed measurements of latency and throughput,
34//!
35//! ```md
36//! | Metric                       | Value              |
37//! |:-----------------------------|:-------------------|
38//! | Allocation Latency (avg)     | ~254 nanosecond    |
39//! | Allocation Throughput (avg)  | ~3.94 million/sec  |
40//! ```
41//!
42//! _NOTE:_ All measurements includes the complete RAII lifecycle (i.e. allocation + deallocation).
43//!
44//! Observed allocation latency for `N` buffers,
45//!
46//! ```md
47//! | Buffers  | Latency  |
48//! |:---------|:---------|
49//! | 0x01     | 246 ns   |
50//! | 0x10     | 251 ns   |
51//! | 0x400    | 300 ns   |
52//! ```
53//!
54//! _INFO:_ As seen, the allocation latency stays near constant irrespective to the size of buffers
55//! and the allocated bytes.
56//!
57//! Environment used for benching,
58//!
59//! * OS: NixOS (WSL2)
60//! * Architecture: x86_64
61//! * Memory: 8 GiB RAM (DDR4)
62//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
63//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
64//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
65//!
66//! ## Example
67//!
68//! ```
69//! use frozen_core::{
70//!     bufpool::{BufPool, BufPoolCfg, BufferPointer},
71//!     utils::BufferSize,
72//! };
73//!
74//! const BUF_SIZE: BufferSize = BufferSize::S16;
75//!
76//! let pool = BufPool::new(BufPoolCfg {
77//!     buffer_size: BUF_SIZE,
78//!     max_memory: BUF_SIZE as usize * 0x40,
79//! });
80//!
81//! let alloc = pool.allocate(0x2A);
82//!
83//! assert_eq!(alloc.length(), 0x2A);
84//! assert!(!alloc.first().is_null());
85//! assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x2A);
86//!
87//! let ptrs: Vec<BufferPointer> = alloc.iter().collect();
88//! assert_eq!(ptrs.len(), 0x2A);
89//! ```
90
91use std::{alloc, ptr, sync, sync::atomic};
92
93/// An unsafe pointer to an individual in memory buffer
94///
95/// ## Safety
96///
97/// The pointer is untyped and uninitialized. Caller is responsible for:
98///
99/// * Writes stay within the bounds, while the size of each buffer is [`BufPoolCfg::buffer_size`]
100/// * Reads should only occur after initilization/write is completed on the buffer
101/// * The pointer must not outlive the lifetime of [`BufPoolAllocation`] object
102///
103/// ## Example
104///
105/// ```
106/// use frozen_core::{
107///     bufpool::{BufPool, BufPoolCfg},
108///     utils::BufferSize,
109/// };
110///
111/// const BUF_SIZE: BufferSize = BufferSize::S16;
112/// const BUFFER: [u8; BUF_SIZE as usize] = [1u8; BUF_SIZE as usize];
113///
114/// let pool = BufPool::new(BufPoolCfg {
115///     buffer_size: BUF_SIZE,
116///     max_memory: BUF_SIZE as usize * 0x0A,
117/// });
118///
119/// let alloc = pool.allocate(1);
120/// unsafe {
121///     std::ptr::copy_nonoverlapping(BUFFER.as_ptr(), alloc.first(), BUF_SIZE as usize);
122/// }
123/// ```
124pub type BufferPointer = *mut u8;
125
126/// All the available configrations for [`BufPool`]
127///
128/// ## Example
129///
130/// ```
131/// use frozen_core::{bufpool::BufPoolCfg, utils::BufferSize};
132///
133/// const BUF_SIZE: BufferSize = BufferSize::S64;
134///
135/// let cfg = BufPoolCfg {
136///     buffer_size: BUF_SIZE,
137///     max_memory: BUF_SIZE as usize * 0x1000,
138/// };
139///
140/// assert_ne!(cfg.max_memory, 0);
141/// assert!(cfg.max_memory > cfg.buffer_size.bytes());
142/// ```
143#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub struct BufPoolCfg {
145    /// Size (in bytes) of an indivdual buffer unit allocated
146    pub buffer_size: crate::utils::BufferSize,
147
148    /// Maximum allowed memory (in bytes) to be simultaneosuly allocated by [`BufPool`]
149    ///
150    /// _IMPORTANT:_ When trying to allocate more memory then [`BufPoolCfg::max_memory`] via
151    /// [`BufPool::allocate`], a deadlock will happen due to memory budgeting in place. The caller
152    /// must make sure the `max_meory` is high enough to avoid this scenerio.
153    ///
154    /// _NOTE:_ To avoid backpressure, set the `max_memory` to an arbitrary large value. This
155    /// would not have any direct impact on performance or resource usage, and will avoid
156    /// backpressure under heavy workload.
157    pub max_memory: usize,
158}
159
160/// An implementation of a low-latency memory-budgeted buffer pool managing fixed-sized buffer
161/// allocations
162///
163/// ## Blocking `drop`
164///
165/// Dropping the [`BufPool`] instance from memory blocks unitl all the allocated instances of
166/// [`BufPoolAllocations`] and all there references are dropped from memory.
167///
168/// This is in place to avoid memory leaks, as well as to enable sending [`BufPoolAllocation`]
169/// objects across threads while being tied to the lifecyle of [`BufPool`].
170///
171/// ## Memory Allocations
172///
173/// All the allocations are allocated using the global memory allocator as requested (on-the-fly).
174///
175/// While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
176/// memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
177///
178/// Memory layout structure,
179///
180/// ```text
181/// allocation = [[buf0][buf1][buf2]]
182/// where,
183///   - every buffer is of size `buffer_size`
184///   - each buffer is pointed using `*mut u8`
185/// ```
186///
187/// ## Backpressure
188///
189/// Every allocation reserves a memory budget and is only allowed to allocate memory if enough
190/// budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
191/// space is available.
192///
193/// When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
194/// deallocated while relaxing the budget and dropping the backpressure (if any).
195///
196/// _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
197/// backpressure, as the waiting callers are awaken opportunistically.
198///
199/// ## Example
200///
201/// ```
202/// use frozen_core::{
203///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
204///     utils::BufferSize,
205/// };
206///
207/// const BUF_SIZE: BufferSize = BufferSize::S16;
208///
209/// let pool = BufPool::new(BufPoolCfg {
210///     buffer_size: BUF_SIZE,
211///     max_memory: BUF_SIZE as usize * 0x40,
212/// });
213///
214/// let alloc = pool.allocate(0x30);
215///
216/// assert_eq!(alloc.length(), 0x30);
217/// assert!(!alloc.first().is_null());
218/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x30);
219///
220/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
221/// assert_eq!(ptrs.len(), 0x30);
222/// ```
223#[derive(Debug)]
224pub struct BufPool {
225    active_allocations: atomic::AtomicUsize,
226    allocation_cv: sync::Condvar,
227    allocation_lock: sync::Mutex<()>,
228    allocated_memory: atomic::AtomicUsize,
229    cfg: BufPoolCfg,
230    shutdown_cv: sync::Condvar,
231    shutdown_lock: sync::Mutex<()>,
232}
233
234unsafe impl Send for BufPool {}
235unsafe impl Sync for BufPool {}
236
237impl BufPool {
238    /// Create a new instance of [`BufPool`]
239    ///
240    /// ## Debug Assertions
241    ///
242    /// In debug builds, this function uses `debug_assertion` to prevent invalid configurations.
243    /// Caller must refer to [`BufPoolCfg`] for details about the config params.
244    ///
245    /// ## Example
246    ///
247    /// ```
248    /// use frozen_core::{
249    ///     bufpool::{BufPool, BufPoolCfg},
250    ///     utils::BufferSize,
251    /// };
252    ///
253    /// const BUF_SIZE: BufferSize = BufferSize::S16;
254    ///
255    /// let pool = BufPool::new(BufPoolCfg {
256    ///     buffer_size: BUF_SIZE,
257    ///     max_memory: BUF_SIZE as usize * 0x0A,
258    /// });
259    ///
260    /// let alloc = pool.allocate(1);
261    ///
262    /// assert_eq!(alloc.length(), 1);
263    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize);
264    /// ```
265    #[inline]
266    pub fn new(cfg: BufPoolCfg) -> Self {
267        // sanity check
268        debug_assert!(
269            cfg.buffer_size.bytes() < cfg.max_memory,
270            "MAX_MEMORY must always be larger than the BUFFER_SIZE"
271        );
272
273        Self {
274            cfg,
275            active_allocations: atomic::AtomicUsize::new(0),
276            allocated_memory: atomic::AtomicUsize::new(0),
277            allocation_cv: sync::Condvar::new(),
278            allocation_lock: sync::Mutex::new(()),
279            shutdown_cv: sync::Condvar::new(),
280            shutdown_lock: sync::Mutex::new(()),
281        }
282    }
283
284    /// Allocate `required` number of buffers each of [`BufPoolCfg::buffer_size`] size
285    ///
286    /// ## Allocation Layout
287    ///
288    /// The allocation is a large contineous slice of memory w/ size as
289    ///
290    /// ```text
291    /// BufPoolCfg::buffer_size.bytes() * n_buffers
292    /// ```
293    ///
294    /// Memory layout structure,
295    ///
296    /// ```text
297    /// allocation = [[buf0][buf1][buf2]]
298    /// where,
299    ///   - every buffer is of size `buffer_size`
300    ///   - each buffer is pointed using `*mut u8`
301    /// ```
302    ///
303    /// ## RAII Safety
304    ///
305    /// The allocation object, i.e. [`BufPoolAllocation`], is RAII safe. The allocated memory is
306    /// automatically deallocated as soon as the last reference to the object is dropped, while also
307    /// relaxing the memory budget to eliminate backpressure (if any).
308    ///
309    /// ## Important Considerations
310    ///
311    /// The number of buffers required should never exceed `u16::MAX`. This is an abstract soft
312    /// limit and should be enforced by the public interface to avoid any weird exhaustion issues
313    /// or unknown bugs across the storage system.
314    ///
315    /// As `u16::MAX` is large enough value to almost never cause any problems for a single write
316    /// operation, this soft limit acts as a guidline to safely operate with arithmatic operations
317    /// across storage engine(s), including but not limited to [`frozen_core`].
318    ///
319    /// ## Example
320    ///
321    /// ```
322    /// use frozen_core::{
323    ///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
324    ///     utils::BufferSize,
325    /// };
326    ///
327    /// const BUF_SIZE: BufferSize = BufferSize::S16;
328    ///
329    /// let pool = BufPool::new(BufPoolCfg {
330    ///     buffer_size: BUF_SIZE,
331    ///     max_memory: BUF_SIZE as usize * 0x14,
332    /// });
333    ///
334    /// let alloc = pool.allocate(0x0A);
335    ///
336    /// assert_eq!(alloc.length(), 0x0A);
337    /// assert!(!alloc.first().is_null());
338    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
339    ///
340    /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
341    /// assert_eq!(ptrs.len(), 0x0A);
342    /// ```
343    #[inline(always)]
344    pub fn allocate(&self, required: usize) -> BufPoolAllocation {
345        // sanity checks
346        debug_assert!(required > 0, "required buffers must never be 0");
347        debug_assert!(
348            required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
349            "Total required bytes must be smaller then the MAX_MEMORY allowed to avoid deadlock"
350        );
351        debug_assert!(
352            required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
353            "Total required bytes must never exceed `u16::MAX` to avoid arithmatic overflows"
354        );
355
356        let required_bytes = self.cfg.buffer_size.bytes() * required;
357        loop {
358            let current_bytes = self.allocated_memory.load(atomic::Ordering::Acquire);
359            if current_bytes + required_bytes > self.cfg.max_memory {
360                self.backpressure(required_bytes);
361                continue;
362            }
363
364            match self.allocated_memory.compare_exchange(
365                current_bytes,
366                current_bytes + required_bytes,
367                atomic::Ordering::AcqRel,
368                atomic::Ordering::Acquire,
369            ) {
370                Ok(_) => break,
371                Err(_) => continue,
372            }
373        }
374
375        let layout = create_layout(required_bytes);
376        let pointer = allocate_layout(layout);
377        self.active_allocations.fetch_add(1, atomic::Ordering::Relaxed);
378
379        BufPoolAllocation { layout, pointer, required_bytes, buffers: required, pool: ptr::NonNull::from(self) }
380    }
381
382    /// Applies backpressure until enough memory budget is available for the allocation
383    ///
384    /// ## Why we ignore [`std::sync::PoisonError`]
385    ///
386    /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
387    /// protect any mutable state. All the pool invariants and accounting are maintained via
388    /// atomics and are completely seperated from the mutex.
389    ///
390    /// A poisoned mutex only indicates that another tx panicked while holding the lock, and
391    /// indicates an inconsistent state of the protected value. Since no state can be left
392    /// partially modified under this lock, there is no possible consistency risk to recover from
393    /// and propagating the poison error would only introduce unnecessary failures into the
394    /// allocation path.
395    ///
396    /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
397    /// with the recovered guard.
398    #[inline]
399    fn backpressure(&self, required_bytes: usize) {
400        let mut guard = self.allocation_lock.lock().unwrap_or_else(|e| e.into_inner());
401        while self.allocated_memory.load(atomic::Ordering::Acquire) + required_bytes > self.cfg.max_memory {
402            guard = self.allocation_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
403        }
404    }
405}
406
407impl Drop for BufPool {
408    fn drop(&mut self) {
409        // NOTE: See [`BufPool::backpressure`] implementation for rationale behind poison recovery
410
411        let mut guard = self.shutdown_lock.lock().unwrap_or_else(|e| e.into_inner());
412        while self.active_allocations.load(atomic::Ordering::Acquire) != 0 {
413            guard = self.shutdown_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
414        }
415    }
416}
417
418/// A RAII safe allocation object containing allocated buffers
419///
420/// ## Lifetime
421///
422/// The object can/may outlive the scope that created it, while also being able to transfer across
423/// threads. As, internally the [`BufPool`] tracks all the active allocations and delays the drop
424/// until every allocation and all there references are dropped from memory.
425///
426/// ## Example
427///
428/// ```
429/// use frozen_core::{
430///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
431///     utils::BufferSize,
432/// };
433///
434/// const BUF_SIZE: BufferSize = BufferSize::S16;
435///
436/// let pool = BufPool::new(BufPoolCfg {
437///     buffer_size: BUF_SIZE,
438///     max_memory: BUF_SIZE as usize * 0x10,
439/// });
440///
441/// let alloc = pool.allocate(0x0A);
442///
443/// assert_eq!(alloc.length(), 0x0A);
444/// assert!(!alloc.first().is_null());
445/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
446///
447/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
448/// assert_eq!(ptrs.len(), 0x0A);
449/// ```
450#[derive(Debug)]
451pub struct BufPoolAllocation {
452    buffers: usize,
453    layout: alloc::Layout,
454    pointer: ptr::NonNull<u8>,
455    pool: ptr::NonNull<BufPool>,
456    required_bytes: usize,
457}
458
459unsafe impl Send for BufPoolAllocation {}
460
461impl BufPoolAllocation {
462    /// Returns a [`BufferPointer`] to the first buffer from the allocated list of buffers
463    ///
464    /// _NOTE:_ The returned [`BufferPointer`] can also be used as a _base_pointer_ to operate on
465    /// the entire allocated memory slice.
466    ///
467    /// ## Example
468    ///
469    /// ```
470    /// use frozen_core::{
471    ///     bufpool::{BufPool, BufPoolCfg},
472    ///     utils::BufferSize,
473    /// };
474    ///
475    /// const BUF_SIZE: BufferSize = BufferSize::S32;
476    ///
477    /// let pool = BufPool::new(BufPoolCfg {
478    ///     buffer_size: BUF_SIZE,
479    ///     max_memory: BUF_SIZE as usize * 0x0A,
480    /// });
481    ///
482    /// let alloc = pool.allocate(0x0A);
483    /// assert!(!alloc.first().is_null());
484    /// ```
485    #[inline]
486    pub const fn first(&self) -> BufferPointer {
487        self.pointer.as_ptr()
488    }
489
490    /// Returns the total number of allocated buffers
491    ///
492    /// _IMPORTANT:_ The returned value is always equal to the `required` value using while
493    /// calling [`BufPool::allocate`].
494    ///
495    /// ## Example
496    ///
497    /// ```
498    /// use frozen_core::{
499    ///     bufpool::{BufPool, BufPoolCfg},
500    ///     utils::BufferSize,
501    /// };
502    ///
503    /// const BUF_SIZE: BufferSize = BufferSize::S64;
504    ///
505    /// let pool = BufPool::new(BufPoolCfg {
506    ///     buffer_size: BUF_SIZE,
507    ///     max_memory: BUF_SIZE as usize * 0x0A,
508    /// });
509    ///
510    /// let alloc = pool.allocate(0x0A);
511    /// assert_eq!(alloc.length(), 0x0A);
512    /// ```
513    #[inline]
514    pub const fn length(&self) -> usize {
515        self.buffers
516    }
517
518    /// Returns the total number of bytes of memory allocated
519    ///
520    /// ## Example
521    ///
522    /// ```
523    /// use frozen_core::{
524    ///     bufpool::{BufPool, BufPoolCfg},
525    ///     utils::BufferSize,
526    /// };
527    ///
528    /// const BUF_SIZE: BufferSize = BufferSize::S16;
529    ///
530    /// let pool = BufPool::new(BufPoolCfg {
531    ///     buffer_size: BUF_SIZE,
532    ///     max_memory: BUF_SIZE as usize * 0x0A,
533    /// });
534    ///
535    /// let alloc = pool.allocate(0x0A);
536    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
537    /// ```
538    #[inline]
539    pub const fn allocated_bytes(&self) -> usize {
540        self.required_bytes
541    }
542
543    /// A custom [`Iterator`] implementation to enable iteration over the list of allocated buffers
544    /// from [`BufPoolAllocation`]
545    ///
546    /// _NOTE:_ Each yielded pointer refers to a unique individual buffer each of size
547    /// [`BufPoolCfg::buffer_size`].
548    ///
549    /// ## Example
550    ///
551    /// ```
552    /// use frozen_core::{
553    ///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
554    ///     utils::BufferSize,
555    /// };
556    ///
557    /// const BUF_SIZE: BufferSize = BufferSize::S16;
558    ///
559    /// let pool = BufPool::new(BufPoolCfg {
560    ///     buffer_size: BUF_SIZE,
561    ///     max_memory: BUF_SIZE as usize * 0x14,
562    /// });
563    ///
564    /// let alloc = pool.allocate(0x0A);
565    /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
566    ///
567    /// assert_eq!(ptrs.len(), 0x0A);
568    /// ```
569    #[inline]
570    pub fn iter(&self) -> BufPoolAllocationIter {
571        let pool = unsafe { self.pool.as_ref() };
572        BufPoolAllocationIter {
573            pointer: self.pointer,
574            buffer_size: pool.cfg.buffer_size.bytes(),
575            remaining: self.buffers,
576        }
577    }
578}
579
580impl Drop for BufPoolAllocation {
581    fn drop(&mut self) {
582        let pool = unsafe { self.pool.as_ref() };
583        deallocate_memory(self.pointer, self.layout);
584
585        pool.allocated_memory.fetch_sub(self.required_bytes, atomic::Ordering::Release);
586        pool.allocation_cv.notify_one();
587
588        if pool.active_allocations.fetch_sub(1, atomic::Ordering::Release) == 1 {
589            pool.shutdown_cv.notify_one();
590        }
591    }
592}
593
594/// A custom [`Iterator`] object to iterate over the list of allocated buffers
595///
596/// _NOTE:_ Buffers are yielded in allocation order and are backed by a single contiguous memory
597/// region.
598///
599/// ## Example
600///
601/// ```rs
602/// use frozen_core::{
603///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
604///     utils::BufferSize,
605/// };
606///
607/// const BUF_SIZE: BufferSize = BufferSize::S32;
608///
609/// let pool = BufPool::new(BufPoolCfg {
610///     buffer_size: BUF_SIZE,
611///     max_memory: BUF_SIZE as usize * 0x14,
612/// });
613///
614/// let alloc = pool.allocate(0x0A);
615/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
616///
617/// assert_eq!(ptrs.len(), 0x0A);
618/// ```
619#[derive(Debug)]
620pub struct BufPoolAllocationIter {
621    pointer: ptr::NonNull<u8>,
622    buffer_size: usize,
623    remaining: usize,
624}
625
626impl Iterator for BufPoolAllocationIter {
627    type Item = BufferPointer;
628
629    #[inline(always)]
630    fn next(&mut self) -> Option<Self::Item> {
631        if self.remaining == 0 {
632            return None;
633        }
634
635        let curr_ptr = self.pointer;
636
637        self.pointer = unsafe { self.pointer.add(self.buffer_size) };
638        self.remaining -= 1;
639
640        Some(curr_ptr.as_ptr())
641    }
642}
643
644/// Creates a array layout with given `capacity`
645///
646/// _NOTE:_ Use of `unwrap` is totally safe as the panic, if any, would be caught by unit tests and
647/// would be the indication of incorrect impl and not any runtime failures.
648#[inline]
649fn create_layout(required_bytes: usize) -> alloc::Layout {
650    match alloc::Layout::array::<u8>(required_bytes) {
651        Ok(layout) => layout,
652        Err(e) => panic!("Invalid Layout: {e}"),
653    }
654}
655
656/// Allocate a memory slice with given allocation `layout`
657///
658/// ## Allocation Failure
659///
660/// If the allocator is unable to satisfy the request (typically due to an OOM condition),
661/// [`alloc::alloc`] returns a null pointer.
662///
663/// In such cases we delegate to [`alloc::handle_alloc_error`], matching the behavior of std library
664/// types such as [`Vec`], [`Box`] and [`String`].
665///
666/// This path aborts the process and never returns. Allocation failures are therefore treated as
667/// fatal runtime conditions rather than recoverable errors.
668///
669/// Under normal operation this path should never be reached, as memory usage is expected to be
670/// bounded by the buffer pools memory budget and backpressure mechanisms.
671///
672/// ## Why not return `FrozenErr`
673///
674/// A null return from [`alloc::alloc`] indicates that the global allocator itself was unable to
675/// satisfy the request.
676///
677/// Delegating to [`alloc::handle_alloc_error`] matches the behavior of standard library containers
678/// and avoids continuing execution after a catastrophic allocator failure, where further
679/// allocations required for error handling, logging or recovery may also fail.
680#[inline]
681fn allocate_layout(layout: alloc::Layout) -> ptr::NonNull<u8> {
682    let pointer = unsafe { alloc::alloc(layout) };
683    match ptr::NonNull::new(pointer) {
684        Some(p) => p,
685        None => alloc::handle_alloc_error(layout),
686    }
687}
688
689/// Deallocate the manually allocated slice of memory with help of given `pointer` and mem `layout`
690#[inline]
691fn deallocate_memory(pointer: ptr::NonNull<u8>, layout: alloc::Layout) {
692    unsafe { alloc::dealloc(pointer.as_ptr(), layout) };
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698
699    const BUF_SIZE: crate::utils::BufferSize = crate::utils::BufferSize::S32;
700
701    #[inline]
702    fn create_bufpool(max_mem: usize) -> BufPool {
703        BufPool::new(BufPoolCfg { buffer_size: BUF_SIZE, max_memory: max_mem })
704    }
705
706    #[test]
707    #[should_panic]
708    #[cfg(debug_assertions)]
709    fn err_new_with_invalid_cfg() {
710        create_bufpool(BUF_SIZE.bytes() >> 1);
711    }
712
713    #[test]
714    #[should_panic]
715    #[cfg(debug_assertions)]
716    fn err_alloc_zero() {
717        let bpool = create_bufpool(BUF_SIZE.bytes());
718        let _ = bpool.allocate(0);
719    }
720
721    #[test]
722    #[should_panic]
723    #[cfg(debug_assertions)]
724    fn err_alloc_more_then_max_memory() {
725        let bpool = create_bufpool(BUF_SIZE.bytes());
726        let _ = bpool.allocate(2);
727    }
728
729    #[test]
730    fn ok_alloc_single() {
731        let bpool = create_bufpool(BUF_SIZE.bytes() * 2);
732        let alloc = bpool.allocate(1);
733
734        assert_eq!(alloc.buffers, 1);
735        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes());
736    }
737
738    #[test]
739    fn ok_alloc_multiple() {
740        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
741        let alloc = bpool.allocate(0x10);
742
743        assert_eq!(alloc.buffers, 0x10);
744        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x10);
745    }
746
747    #[test]
748    fn ok_alloc_max_memory() {
749        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
750        let alloc = bpool.allocate(0x0A);
751
752        assert_eq!(alloc.buffers, 0x0A);
753        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x0A);
754    }
755
756    #[test]
757    fn ok_alloc_updates_memory_accounting() {
758        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
759        let alloc = bpool.allocate(0x10);
760
761        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 0x10);
762        drop(alloc);
763        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
764    }
765
766    #[test]
767    fn ok_alloc_updates_active_allocation_tracking() {
768        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x2A);
769
770        let alloc1 = bpool.allocate(0x10);
771        let alloc2 = bpool.allocate(0x10);
772
773        assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 2);
774        let _ = (drop(alloc1), drop(alloc2));
775        assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 0);
776    }
777
778    #[test]
779    fn ok_alloc_decrments_allocated_memory_after_deallocations() {
780        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x80);
781        let allocations: Vec<_> = (0..0x20).map(|_| bpool.allocate(2)).collect();
782
783        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0x20 * 0x40);
784        drop(allocations);
785        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
786    }
787
788    #[test]
789    fn ok_backpressure_blocks_till_memory_is_deallocated() {
790        let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
791        let alloc = bpool.allocate(1);
792
793        let pool2 = bpool.clone();
794        let barrier = sync::Arc::new(sync::Barrier::new(2));
795        let barrier2 = barrier.clone();
796
797        let handle = std::thread::spawn(move || {
798            barrier2.wait();
799
800            let start = std::time::Instant::now();
801            let _alloc = pool2.allocate(2);
802
803            start.elapsed()
804        });
805
806        barrier.wait();
807
808        std::thread::sleep(std::time::Duration::from_millis(100));
809        drop(alloc);
810
811        let elapsed = handle.join().expect("allocation thread should not panic");
812        assert!(elapsed >= std::time::Duration::from_millis(100));
813    }
814
815    #[test]
816    fn ok_concurrent_allocations() {
817        let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1000));
818
819        let mut handles = Vec::new();
820        for _ in 0..0x0A {
821            let pool = pool.clone();
822
823            handles.push(std::thread::spawn(move || {
824                for _ in 0..0x64 {
825                    drop(pool.allocate(1));
826                }
827            }));
828        }
829
830        for h in handles {
831            h.join().unwrap();
832        }
833
834        assert_eq!(pool.allocated_memory.load(atomic::Ordering::Acquire), 0);
835        assert_eq!(pool.active_allocations.load(atomic::Ordering::Acquire), 0);
836    }
837
838    mod drop {
839        use super::*;
840
841        #[test]
842        fn ok_partial_drop_updates_accounting() {
843            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
844
845            let alloc1 = bpool.allocate(2);
846            let alloc2 = bpool.allocate(2);
847
848            assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 4);
849            drop(alloc1);
850
851            assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 2);
852            drop(alloc2);
853
854            assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
855        }
856
857        #[test]
858        fn ok_drop_waits_for_active_allocations() {
859            let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1A));
860            let alloc = bpool.allocate(0x10);
861
862            let handle = std::thread::spawn(move || {
863                drop(bpool);
864            });
865
866            std::thread::sleep(std::time::Duration::from_millis(0x64));
867            assert!(!handle.is_finished());
868            drop(alloc);
869
870            handle.join().unwrap();
871        }
872    }
873
874    mod memory_tests {
875        use super::*;
876
877        #[test]
878        fn ok_create_layout() {
879            let layout = create_layout(0x1000);
880
881            assert_eq!(layout.align(), 1);
882            assert_eq!(layout.size(), 0x1000);
883        }
884
885        #[test]
886        #[should_panic(expected = "Invalid Layout")]
887        fn err_create_layout() {
888            create_layout(usize::MAX);
889        }
890
891        #[test]
892        fn ok_allocate_layout() {
893            let layout = create_layout(0x10);
894            let pointer = allocate_layout(layout);
895            let raw_ptr = pointer.as_ptr();
896
897            assert!(!raw_ptr.is_null());
898            deallocate_memory(pointer, layout);
899        }
900
901        #[test]
902        fn ok_allocate_layout_allows_write() {
903            let layout = create_layout(0x80);
904            let pointer = allocate_layout(layout);
905
906            unsafe {
907                pointer.as_ptr().write(0x40);
908                assert_eq!(pointer.as_ptr().read(), 0x40);
909            }
910
911            deallocate_memory(pointer, layout);
912        }
913
914        #[test]
915        fn ok_allocate_layout_allows_write_to_entire_slice() {
916            let layout = create_layout(0x200);
917            let pointer = allocate_layout(layout);
918
919            unsafe {
920                for i in 0..0x200 {
921                    pointer.as_ptr().add(i).write((i % 0xFF) as u8);
922                }
923
924                for i in 0..0x200 {
925                    assert_eq!(pointer.as_ptr().add(i).read(), (i % 0xFF) as u8);
926                }
927            }
928
929            deallocate_memory(pointer, layout);
930        }
931    }
932
933    mod alloc_struct {
934        use super::*;
935
936        #[test]
937        fn ok_first_returns_ptr_to_first_buf_from_alloc() {
938            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
939            let alloc = bpool.allocate(0x10);
940
941            assert_eq!(alloc.first(), alloc.pointer.as_ptr());
942        }
943
944        #[test]
945        fn ok_length_returns_length_of_alloc() {
946            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
947            let alloc = bpool.allocate(0x10);
948
949            assert_eq!(alloc.length(), alloc.buffers);
950        }
951
952        #[test]
953        fn ok_allocated_bytes_return_total_allocated_bytes() {
954            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
955            let alloc = bpool.allocate(0x10);
956
957            assert_eq!(alloc.allocated_bytes(), alloc.buffers * BUF_SIZE.bytes());
958        }
959
960        #[test]
961        fn ok_alloc_can_be_shared_across_threads() {
962            let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
963            let alloc = pool.allocate(1);
964
965            std::thread::spawn(move || {
966                drop(alloc);
967            })
968            .join()
969            .unwrap();
970        }
971    }
972
973    mod iterator {
974        use super::*;
975
976        #[test]
977        fn ok_iter_yeilds_all_buffers() {
978            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
979            let alloc = bpool.allocate(4);
980
981            let ptrs: Vec<_> = alloc.iter().collect();
982            assert_eq!(ptrs.len(), 4);
983
984            assert_eq!(ptrs[1] as usize - ptrs[0] as usize, 0x20);
985            assert_eq!(ptrs[2] as usize - ptrs[1] as usize, 0x20);
986            assert_eq!(ptrs[3] as usize - ptrs[2] as usize, 0x20);
987        }
988
989        #[test]
990        fn ok_iter_yeilds_none_when_exhausted() {
991            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
992            let alloc = bpool.allocate(2);
993            let mut iter = alloc.iter();
994
995            assert!(iter.next().is_some());
996            assert!(iter.next().is_some());
997            assert!(iter.next().is_none());
998            assert!(iter.next().is_none());
999        }
1000    }
1001}