Skip to main content

frozen_core/
bufpool.rs

1//! A low-latency memory-budgeted buffer pool to manage fixed-sized buffer allocations
2//!
3//! ## Memory Allocations
4//!
5//! All the allocations are allocated using the global memory allocator as requested (on-the-fly).
6//!
7//! While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
8//! memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
9//!
10//! Memory layout structure,
11//!
12//! ```text
13//! allocation = [[buf0][buf1][buf2]]
14//! where,
15//!   - every buffer is of size `buffer_size`
16//!   - each buffer is pointed using `*mut u8`
17//! ```
18//!
19//! ## Backpressure
20//!
21//! Every allocation reserves a memory budget and is only allowed to allocate memory if enough
22//! budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
23//! space is available.
24//!
25//! When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
26//! deallocated while relaxing the budget and dropping the backpressure (if any).
27//!
28//! _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
29//! backpressure, as the waiting callers are awaken opportunistically.
30//!
31//! ## Benchmarks
32//!
33//! Observed measurements of latency and throughput,
34//!
35//! | Metric                       | Value              |
36//! |:-----------------------------|:-------------------|
37//! | Allocation Latency (avg)     | ~254 nanosecond    |
38//! | Allocation Throughput (avg)  | ~3.94 million/sec  |
39//!
40//! _NOTE:_ All measurements includes the complete RAII lifecycle (i.e. allocation + deallocation).
41//!
42//! Observed allocation latency for `N` buffers,
43//!
44//! | Buffers  | Latency  |
45//! |:---------|:---------|
46//! | 0x01     | 246 ns   |
47//! | 0x10     | 251 ns   |
48//! | 0x400    | 300 ns   |
49//!
50//! _INFO:_ As seen, the allocation latency stays near constant irrespective to the size of buffers
51//! and the allocated bytes.
52//!
53//! Environment used for benching,
54//!
55//! * OS: NixOS (WSL2)
56//! * Architecture: x86_64
57//! * Memory: 8 GiB RAM (DDR4)
58//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
59//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
60//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
61//!
62//! ## Example
63//!
64//! ```
65//! use frozen_core::{
66//!     bufpool::{BufPool, BufPoolCfg, BufferPointer},
67//!     utils::BufferSize,
68//! };
69//!
70//! const BUF_SIZE: BufferSize = BufferSize::S16;
71//!
72//! let pool = BufPool::new(BufPoolCfg {
73//!     buffer_size: BUF_SIZE,
74//!     max_memory: BUF_SIZE as usize * 0x40,
75//! });
76//!
77//! let alloc = pool.allocate(0x2A);
78//!
79//! assert_eq!(alloc.length(), 0x2A);
80//! assert!(!alloc.first().is_null());
81//! assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x2A);
82//!
83//! let ptrs: Vec<BufferPointer> = alloc.iter().collect();
84//! assert_eq!(ptrs.len(), 0x2A);
85//! ```
86
87use std::{alloc, ptr, sync, sync::atomic};
88
89/// An unsafe pointer to an individual in memory buffer
90///
91/// ## Safety
92///
93/// The pointer is untyped and uninitialized. Caller is responsible for:
94///
95/// * Writes stay within the bounds, while the size of each buffer is [`BufPoolCfg::buffer_size`]
96/// * Reads should only occur after initilization/write is completed on the buffer
97/// * The pointer must not outlive the lifetime of [`BufPoolAllocation`] object
98///
99/// ## Example
100///
101/// ```
102/// use frozen_core::{
103///     bufpool::{BufPool, BufPoolCfg},
104///     utils::BufferSize,
105/// };
106///
107/// const BUF_SIZE: BufferSize = BufferSize::S16;
108/// const BUFFER: [u8; BUF_SIZE as usize] = [1u8; BUF_SIZE as usize];
109///
110/// let pool = BufPool::new(BufPoolCfg {
111///     buffer_size: BUF_SIZE,
112///     max_memory: BUF_SIZE as usize * 0x0A,
113/// });
114///
115/// let alloc = pool.allocate(1);
116/// unsafe {
117///     std::ptr::copy_nonoverlapping(BUFFER.as_ptr(), alloc.first(), BUF_SIZE as usize);
118/// }
119/// ```
120pub type BufferPointer = *mut u8;
121
122/// All the available configrations for [`BufPool`]
123///
124/// ## Example
125///
126/// ```
127/// use frozen_core::{bufpool::BufPoolCfg, utils::BufferSize};
128///
129/// const BUF_SIZE: BufferSize = BufferSize::S64;
130///
131/// let cfg = BufPoolCfg {
132///     buffer_size: BUF_SIZE,
133///     max_memory: BUF_SIZE as usize * 0x1000,
134/// };
135///
136/// assert_ne!(cfg.max_memory, 0);
137/// assert!(cfg.max_memory > cfg.buffer_size.bytes());
138/// ```
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140pub struct BufPoolCfg {
141    /// Size (in bytes) of an indivdual buffer unit allocated
142    pub buffer_size: crate::utils::BufferSize,
143
144    /// Maximum allowed memory (in bytes) to be simultaneosuly allocated by [`BufPool`]
145    ///
146    /// _IMPORTANT:_ When trying to allocate more memory then [`BufPoolCfg::max_memory`] via
147    /// [`BufPool::allocate`], a deadlock will happen due to memory budgeting in place. The caller
148    /// must make sure the `max_meory` is high enough to avoid this scenerio.
149    ///
150    /// _NOTE:_ To avoid backpressure, set the `max_memory` to an arbitrary large value. This
151    /// would not have any direct impact on performance or resource usage, and will avoid
152    /// backpressure under heavy workload.
153    pub max_memory: usize,
154}
155
156/// An implementation of a low-latency memory-budgeted buffer pool managing fixed-sized buffer
157/// allocations
158///
159/// ## Blocking `drop`
160///
161/// Dropping the [`BufPool`] instance from memory blocks unitl all the allocated instances of
162/// [`BufPoolAllocations`] and all there references are dropped from memory.
163///
164/// This is in place to avoid memory leaks, as well as to enable sending [`BufPoolAllocation`]
165/// objects across threads while being tied to the lifecyle of [`BufPool`].
166///
167/// ## Memory Allocations
168///
169/// All the allocations are allocated using the global memory allocator as requested (on-the-fly).
170///
171/// While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
172/// memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
173///
174/// Memory layout structure,
175///
176/// ```text
177/// allocation = [[buf0][buf1][buf2]]
178/// where,
179///   - every buffer is of size `buffer_size`
180///   - each buffer is pointed using `*mut u8`
181/// ```
182///
183/// ## Backpressure
184///
185/// Every allocation reserves a memory budget and is only allowed to allocate memory if enough
186/// budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
187/// space is available.
188///
189/// When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
190/// deallocated while relaxing the budget and dropping the backpressure (if any).
191///
192/// _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
193/// backpressure, as the waiting callers are awaken opportunistically.
194///
195/// ## Example
196///
197/// ```
198/// use frozen_core::{
199///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
200///     utils::BufferSize,
201/// };
202///
203/// const BUF_SIZE: BufferSize = BufferSize::S16;
204///
205/// let pool = BufPool::new(BufPoolCfg {
206///     buffer_size: BUF_SIZE,
207///     max_memory: BUF_SIZE as usize * 0x40,
208/// });
209///
210/// let alloc = pool.allocate(0x30);
211///
212/// assert_eq!(alloc.length(), 0x30);
213/// assert!(!alloc.first().is_null());
214/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x30);
215///
216/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
217/// assert_eq!(ptrs.len(), 0x30);
218/// ```
219#[derive(Debug)]
220pub struct BufPool {
221    active_allocations: atomic::AtomicUsize,
222    allocation_cv: sync::Condvar,
223    allocation_lock: sync::Mutex<()>,
224    allocated_memory: atomic::AtomicUsize,
225    cfg: BufPoolCfg,
226    shutdown_cv: sync::Condvar,
227    shutdown_lock: sync::Mutex<()>,
228}
229
230unsafe impl Send for BufPool {}
231unsafe impl Sync for BufPool {}
232
233impl BufPool {
234    /// Create a new instance of [`BufPool`]
235    ///
236    /// ## Debug Assertions
237    ///
238    /// In debug builds, this function uses `debug_assertion` to prevent invalid configurations.
239    /// Caller must refer to [`BufPoolCfg`] for details about the config params.
240    ///
241    /// ## Example
242    ///
243    /// ```
244    /// use frozen_core::{
245    ///     bufpool::{BufPool, BufPoolCfg},
246    ///     utils::BufferSize,
247    /// };
248    ///
249    /// const BUF_SIZE: BufferSize = BufferSize::S16;
250    ///
251    /// let pool = BufPool::new(BufPoolCfg {
252    ///     buffer_size: BUF_SIZE,
253    ///     max_memory: BUF_SIZE as usize * 0x0A,
254    /// });
255    ///
256    /// let alloc = pool.allocate(1);
257    ///
258    /// assert_eq!(alloc.length(), 1);
259    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize);
260    /// ```
261    #[inline]
262    pub fn new(cfg: BufPoolCfg) -> Self {
263        // sanity check
264        debug_assert!(
265            cfg.buffer_size.bytes() < cfg.max_memory,
266            "MAX_MEMORY must always be larger than the BUFFER_SIZE"
267        );
268
269        Self {
270            cfg,
271            active_allocations: atomic::AtomicUsize::new(0),
272            allocated_memory: atomic::AtomicUsize::new(0),
273            allocation_cv: sync::Condvar::new(),
274            allocation_lock: sync::Mutex::new(()),
275            shutdown_cv: sync::Condvar::new(),
276            shutdown_lock: sync::Mutex::new(()),
277        }
278    }
279
280    /// Allocate `required` number of buffers each of [`BufPoolCfg::buffer_size`] size
281    ///
282    /// ## Allocation Layout
283    ///
284    /// The allocation is a large contineous slice of memory w/ size as
285    ///
286    /// ```text
287    /// BufPoolCfg::buffer_size.bytes() * n_buffers
288    /// ```
289    ///
290    /// Memory layout structure,
291    ///
292    /// ```text
293    /// allocation = [[buf0][buf1][buf2]]
294    /// where,
295    ///   - every buffer is of size `buffer_size`
296    ///   - each buffer is pointed using `*mut u8`
297    /// ```
298    ///
299    /// ## RAII Safety
300    ///
301    /// The allocation object, i.e. [`BufPoolAllocation`], is RAII safe. The allocated memory is
302    /// automatically deallocated as soon as the last reference to the object is dropped, while also
303    /// relaxing the memory budget to eliminate backpressure (if any).
304    ///
305    /// ## Important Considerations
306    ///
307    /// The number of buffers required should never exceed `u16::MAX`. This is an abstract soft
308    /// limit and should be enforced by the public interface to avoid any weird exhaustion issues
309    /// or unknown bugs across the storage system.
310    ///
311    /// As `u16::MAX` is large enough value to almost never cause any problems for a single write
312    /// operation, this soft limit acts as a guidline to safely operate with arithmatic operations
313    /// across storage engine(s), including but not limited to [`frozen_core`].
314    ///
315    /// ## Example
316    ///
317    /// ```
318    /// use frozen_core::{
319    ///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
320    ///     utils::BufferSize,
321    /// };
322    ///
323    /// const BUF_SIZE: BufferSize = BufferSize::S16;
324    ///
325    /// let pool = BufPool::new(BufPoolCfg {
326    ///     buffer_size: BUF_SIZE,
327    ///     max_memory: BUF_SIZE as usize * 0x14,
328    /// });
329    ///
330    /// let alloc = pool.allocate(0x0A);
331    ///
332    /// assert_eq!(alloc.length(), 0x0A);
333    /// assert!(!alloc.first().is_null());
334    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
335    ///
336    /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
337    /// assert_eq!(ptrs.len(), 0x0A);
338    /// ```
339    #[inline(always)]
340    pub fn allocate(&self, required: usize) -> BufPoolAllocation {
341        // sanity checks
342        debug_assert!(required > 0, "required buffers must never be 0");
343        debug_assert!(
344            required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
345            "Total required bytes must be smaller then the MAX_MEMORY allowed to avoid deadlock"
346        );
347        debug_assert!(
348            required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
349            "Total required bytes must never exceed `u16::MAX` to avoid arithmatic overflows"
350        );
351
352        let required_bytes = self.cfg.buffer_size.bytes() * required;
353        loop {
354            let current_bytes = self.allocated_memory.load(atomic::Ordering::Acquire);
355            if current_bytes + required_bytes > self.cfg.max_memory {
356                self.backpressure(required_bytes);
357                continue;
358            }
359
360            match self.allocated_memory.compare_exchange(
361                current_bytes,
362                current_bytes + required_bytes,
363                atomic::Ordering::AcqRel,
364                atomic::Ordering::Acquire,
365            ) {
366                Ok(_) => break,
367                Err(_) => continue,
368            }
369        }
370
371        let layout = create_layout(required_bytes);
372        let pointer = allocate_layout(layout);
373        self.active_allocations.fetch_add(1, atomic::Ordering::Relaxed);
374
375        BufPoolAllocation {
376            layout,
377            pointer,
378            required_bytes,
379            buffers: required,
380            pool: ptr::NonNull::from(self),
381        }
382    }
383
384    /// Applies backpressure until enough memory budget is available for the allocation
385    ///
386    /// ## Why we ignore [`std::sync::PoisonError`]
387    ///
388    /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
389    /// protect any mutable state. All the pool invariants and accounting are maintained via
390    /// atomics and are completely seperated from the mutex.
391    ///
392    /// A poisoned mutex only indicates that another tx panicked while holding the lock, and
393    /// indicates an inconsistent state of the protected value. Since no state can be left
394    /// partially modified under this lock, there is no possible consistency risk to recover from
395    /// and propagating the poison error would only introduce unnecessary failures into the
396    /// allocation path.
397    ///
398    /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
399    /// with the recovered guard.
400    #[inline]
401    fn backpressure(&self, required_bytes: usize) {
402        let mut guard = self.allocation_lock.lock().unwrap_or_else(|e| e.into_inner());
403        while self.allocated_memory.load(atomic::Ordering::Acquire) + required_bytes
404            > self.cfg.max_memory
405        {
406            guard = self.allocation_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
407        }
408    }
409}
410
411impl Drop for BufPool {
412    fn drop(&mut self) {
413        // NOTE: See [`BufPool::backpressure`] implementation for rationale behind poison recovery
414
415        let mut guard = self.shutdown_lock.lock().unwrap_or_else(|e| e.into_inner());
416        while self.active_allocations.load(atomic::Ordering::Acquire) != 0 {
417            guard = self.shutdown_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
418        }
419    }
420}
421
422/// A RAII safe allocation object containing allocated buffers
423///
424/// ## Lifetime
425///
426/// The object can/may outlive the scope that created it, while also being able to transfer across
427/// threads. As, internally the [`BufPool`] tracks all the active allocations and delays the drop
428/// until every allocation and all there references are dropped from memory.
429///
430/// ## Example
431///
432/// ```
433/// use frozen_core::{
434///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
435///     utils::BufferSize,
436/// };
437///
438/// const BUF_SIZE: BufferSize = BufferSize::S16;
439///
440/// let pool = BufPool::new(BufPoolCfg {
441///     buffer_size: BUF_SIZE,
442///     max_memory: BUF_SIZE as usize * 0x10,
443/// });
444///
445/// let alloc = pool.allocate(0x0A);
446///
447/// assert_eq!(alloc.length(), 0x0A);
448/// assert!(!alloc.first().is_null());
449/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
450///
451/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
452/// assert_eq!(ptrs.len(), 0x0A);
453/// ```
454#[derive(Debug)]
455pub struct BufPoolAllocation {
456    buffers: usize,
457    layout: alloc::Layout,
458    pointer: ptr::NonNull<u8>,
459    pool: ptr::NonNull<BufPool>,
460    required_bytes: usize,
461}
462
463unsafe impl Send for BufPoolAllocation {}
464
465impl BufPoolAllocation {
466    /// Returns a [`BufferPointer`] to the first buffer from the allocated list of buffers
467    ///
468    /// _NOTE:_ The returned [`BufferPointer`] can also be used as a _base_pointer_ to operate on
469    /// the entire allocated memory slice.
470    ///
471    /// ## Example
472    ///
473    /// ```
474    /// use frozen_core::{
475    ///     bufpool::{BufPool, BufPoolCfg},
476    ///     utils::BufferSize,
477    /// };
478    ///
479    /// const BUF_SIZE: BufferSize = BufferSize::S32;
480    ///
481    /// let pool = BufPool::new(BufPoolCfg {
482    ///     buffer_size: BUF_SIZE,
483    ///     max_memory: BUF_SIZE as usize * 0x0A,
484    /// });
485    ///
486    /// let alloc = pool.allocate(0x0A);
487    /// assert!(!alloc.first().is_null());
488    /// ```
489    #[inline]
490    pub const fn first(&self) -> BufferPointer {
491        self.pointer.as_ptr()
492    }
493
494    /// Returns the total number of allocated buffers
495    ///
496    /// _IMPORTANT:_ The returned value is always equal to the `required` value using while
497    /// calling [`BufPool::allocate`].
498    ///
499    /// ## Example
500    ///
501    /// ```
502    /// use frozen_core::{
503    ///     bufpool::{BufPool, BufPoolCfg},
504    ///     utils::BufferSize,
505    /// };
506    ///
507    /// const BUF_SIZE: BufferSize = BufferSize::S64;
508    ///
509    /// let pool = BufPool::new(BufPoolCfg {
510    ///     buffer_size: BUF_SIZE,
511    ///     max_memory: BUF_SIZE as usize * 0x0A,
512    /// });
513    ///
514    /// let alloc = pool.allocate(0x0A);
515    /// assert_eq!(alloc.length(), 0x0A);
516    /// ```
517    #[inline]
518    pub const fn length(&self) -> usize {
519        self.buffers
520    }
521
522    /// Returns the total number of bytes of memory allocated
523    ///
524    /// ## Example
525    ///
526    /// ```
527    /// use frozen_core::{
528    ///     bufpool::{BufPool, BufPoolCfg},
529    ///     utils::BufferSize,
530    /// };
531    ///
532    /// const BUF_SIZE: BufferSize = BufferSize::S16;
533    ///
534    /// let pool = BufPool::new(BufPoolCfg {
535    ///     buffer_size: BUF_SIZE,
536    ///     max_memory: BUF_SIZE as usize * 0x0A,
537    /// });
538    ///
539    /// let alloc = pool.allocate(0x0A);
540    /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
541    /// ```
542    #[inline]
543    pub const fn allocated_bytes(&self) -> usize {
544        self.required_bytes
545    }
546
547    /// A custom [`Iterator`] implementation to enable iteration over the list of allocated buffers
548    /// from [`BufPoolAllocation`]
549    ///
550    /// _NOTE:_ Each yielded pointer refers to a unique individual buffer each of size
551    /// [`BufPoolCfg::buffer_size`].
552    ///
553    /// ## Example
554    ///
555    /// ```
556    /// use frozen_core::{
557    ///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
558    ///     utils::BufferSize,
559    /// };
560    ///
561    /// const BUF_SIZE: BufferSize = BufferSize::S16;
562    ///
563    /// let pool = BufPool::new(BufPoolCfg {
564    ///     buffer_size: BUF_SIZE,
565    ///     max_memory: BUF_SIZE as usize * 0x14,
566    /// });
567    ///
568    /// let alloc = pool.allocate(0x0A);
569    /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
570    ///
571    /// assert_eq!(ptrs.len(), 0x0A);
572    /// ```
573    #[inline]
574    pub fn iter(&self) -> BufPoolAllocationIter {
575        let pool = unsafe { self.pool.as_ref() };
576        BufPoolAllocationIter {
577            pointer: self.pointer,
578            buffer_size: pool.cfg.buffer_size.bytes(),
579            remaining: self.buffers,
580        }
581    }
582}
583
584impl Drop for BufPoolAllocation {
585    fn drop(&mut self) {
586        let pool = unsafe { self.pool.as_ref() };
587        deallocate_memory(self.pointer, self.layout);
588
589        pool.allocated_memory.fetch_sub(self.required_bytes, atomic::Ordering::Release);
590        pool.allocation_cv.notify_one();
591
592        if pool.active_allocations.fetch_sub(1, atomic::Ordering::Release) == 1 {
593            pool.shutdown_cv.notify_one();
594        }
595    }
596}
597
598/// A custom [`Iterator`] object to iterate over the list of allocated buffers
599///
600/// _NOTE:_ Buffers are yielded in allocation order and are backed by a single contiguous memory
601/// region.
602///
603/// ## Example
604///
605/// ```rs
606/// use frozen_core::{
607///     bufpool::{BufPool, BufPoolCfg, BufferPointer},
608///     utils::BufferSize,
609/// };
610///
611/// const BUF_SIZE: BufferSize = BufferSize::S32;
612///
613/// let pool = BufPool::new(BufPoolCfg {
614///     buffer_size: BUF_SIZE,
615///     max_memory: BUF_SIZE as usize * 0x14,
616/// });
617///
618/// let alloc = pool.allocate(0x0A);
619/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
620///
621/// assert_eq!(ptrs.len(), 0x0A);
622/// ```
623#[derive(Debug)]
624pub struct BufPoolAllocationIter {
625    pointer: ptr::NonNull<u8>,
626    buffer_size: usize,
627    remaining: usize,
628}
629
630impl Iterator for BufPoolAllocationIter {
631    type Item = BufferPointer;
632
633    #[inline(always)]
634    fn next(&mut self) -> Option<Self::Item> {
635        if self.remaining == 0 {
636            return None;
637        }
638
639        let curr_ptr = self.pointer;
640
641        self.pointer = unsafe { self.pointer.add(self.buffer_size) };
642        self.remaining -= 1;
643
644        Some(curr_ptr.as_ptr())
645    }
646}
647
648/// Creates a array layout with given `capacity`
649///
650/// _NOTE:_ Use of `unwrap` is totally safe as the panic, if any, would be caught by unit tests and
651/// would be the indication of incorrect impl and not any runtime failures.
652#[inline]
653fn create_layout(required_bytes: usize) -> alloc::Layout {
654    match alloc::Layout::array::<u8>(required_bytes) {
655        Ok(layout) => layout,
656        Err(e) => panic!("Invalid Layout: {e}"),
657    }
658}
659
660/// Allocate a memory slice with given allocation `layout`
661///
662/// ## Allocation Failure
663///
664/// If the allocator is unable to satisfy the request (typically due to an OOM condition),
665/// [`alloc::alloc`] returns a null pointer.
666///
667/// In such cases we delegate to [`alloc::handle_alloc_error`], matching the behavior of std library
668/// types such as [`Vec`], [`Box`] and [`String`].
669///
670/// This path aborts the process and never returns. Allocation failures are therefore treated as
671/// fatal runtime conditions rather than recoverable errors.
672///
673/// Under normal operation this path should never be reached, as memory usage is expected to be
674/// bounded by the buffer pools memory budget and backpressure mechanisms.
675///
676/// ## Why not return `FrozenErr`
677///
678/// A null return from [`alloc::alloc`] indicates that the global allocator itself was unable to
679/// satisfy the request.
680///
681/// Delegating to [`alloc::handle_alloc_error`] matches the behavior of standard library containers
682/// and avoids continuing execution after a catastrophic allocator failure, where further
683/// allocations required for error handling, logging or recovery may also fail.
684#[inline]
685fn allocate_layout(layout: alloc::Layout) -> ptr::NonNull<u8> {
686    let pointer = unsafe { alloc::alloc(layout) };
687    match ptr::NonNull::new(pointer) {
688        Some(p) => p,
689        None => alloc::handle_alloc_error(layout),
690    }
691}
692
693/// Deallocate the manually allocated slice of memory with help of given `pointer` and mem `layout`
694#[inline]
695fn deallocate_memory(pointer: ptr::NonNull<u8>, layout: alloc::Layout) {
696    unsafe { alloc::dealloc(pointer.as_ptr(), layout) };
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702
703    const BUF_SIZE: crate::utils::BufferSize = crate::utils::BufferSize::S32;
704
705    #[inline]
706    fn create_bufpool(max_mem: usize) -> BufPool {
707        BufPool::new(BufPoolCfg { buffer_size: BUF_SIZE, max_memory: max_mem })
708    }
709
710    #[test]
711    #[should_panic]
712    #[cfg(debug_assertions)]
713    fn err_new_with_invalid_cfg() {
714        create_bufpool(BUF_SIZE.bytes() >> 1);
715    }
716
717    #[test]
718    #[should_panic]
719    #[cfg(debug_assertions)]
720    fn err_alloc_zero() {
721        let bpool = create_bufpool(BUF_SIZE.bytes());
722        let _ = bpool.allocate(0);
723    }
724
725    #[test]
726    #[should_panic]
727    #[cfg(debug_assertions)]
728    fn err_alloc_more_then_max_memory() {
729        let bpool = create_bufpool(BUF_SIZE.bytes());
730        let _ = bpool.allocate(2);
731    }
732
733    #[test]
734    fn ok_alloc_single() {
735        let bpool = create_bufpool(BUF_SIZE.bytes() * 2);
736        let alloc = bpool.allocate(1);
737
738        assert_eq!(alloc.buffers, 1);
739        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes());
740    }
741
742    #[test]
743    fn ok_alloc_multiple() {
744        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
745        let alloc = bpool.allocate(0x10);
746
747        assert_eq!(alloc.buffers, 0x10);
748        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x10);
749    }
750
751    #[test]
752    fn ok_alloc_max_memory() {
753        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
754        let alloc = bpool.allocate(0x0A);
755
756        assert_eq!(alloc.buffers, 0x0A);
757        assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x0A);
758    }
759
760    #[test]
761    fn ok_alloc_updates_memory_accounting() {
762        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
763        let alloc = bpool.allocate(0x10);
764
765        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 0x10);
766        drop(alloc);
767        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
768    }
769
770    #[test]
771    fn ok_alloc_updates_active_allocation_tracking() {
772        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x2A);
773
774        let alloc1 = bpool.allocate(0x10);
775        let alloc2 = bpool.allocate(0x10);
776
777        assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 2);
778        let _ = (drop(alloc1), drop(alloc2));
779        assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 0);
780    }
781
782    #[test]
783    fn ok_alloc_decrments_allocated_memory_after_deallocations() {
784        let bpool = create_bufpool(BUF_SIZE.bytes() * 0x80);
785        let allocations: Vec<_> = (0..0x20).map(|_| bpool.allocate(2)).collect();
786
787        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0x20 * 0x40);
788        drop(allocations);
789        assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
790    }
791
792    #[test]
793    fn ok_backpressure_blocks_till_memory_is_deallocated() {
794        let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
795        let alloc = bpool.allocate(1);
796
797        let pool2 = bpool.clone();
798        let barrier = sync::Arc::new(sync::Barrier::new(2));
799        let barrier2 = barrier.clone();
800
801        let handle = std::thread::spawn(move || {
802            barrier2.wait();
803
804            let start = std::time::Instant::now();
805            let _alloc = pool2.allocate(2);
806
807            start.elapsed()
808        });
809
810        barrier.wait();
811
812        std::thread::sleep(std::time::Duration::from_millis(100));
813        drop(alloc);
814
815        let elapsed = handle.join().expect("allocation thread should not panic");
816        assert!(elapsed >= std::time::Duration::from_millis(100));
817    }
818
819    #[test]
820    fn ok_concurrent_allocations() {
821        let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1000));
822
823        let mut handles = Vec::new();
824        for _ in 0..0x0A {
825            let pool = pool.clone();
826
827            handles.push(std::thread::spawn(move || {
828                for _ in 0..0x64 {
829                    drop(pool.allocate(1));
830                }
831            }));
832        }
833
834        for h in handles {
835            h.join().unwrap();
836        }
837
838        assert_eq!(pool.allocated_memory.load(atomic::Ordering::Acquire), 0);
839        assert_eq!(pool.active_allocations.load(atomic::Ordering::Acquire), 0);
840    }
841
842    mod drop {
843        use super::*;
844
845        #[test]
846        fn ok_partial_drop_updates_accounting() {
847            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
848
849            let alloc1 = bpool.allocate(2);
850            let alloc2 = bpool.allocate(2);
851
852            assert_eq!(
853                bpool.allocated_memory.load(atomic::Ordering::Acquire),
854                BUF_SIZE.bytes() * 4
855            );
856            drop(alloc1);
857
858            assert_eq!(
859                bpool.allocated_memory.load(atomic::Ordering::Acquire),
860                BUF_SIZE.bytes() * 2
861            );
862            drop(alloc2);
863
864            assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
865        }
866
867        #[test]
868        fn ok_drop_waits_for_active_allocations() {
869            let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1A));
870            let alloc = bpool.allocate(0x10);
871
872            let handle = std::thread::spawn(move || {
873                drop(bpool);
874            });
875
876            std::thread::sleep(std::time::Duration::from_millis(0x64));
877            assert!(!handle.is_finished());
878            drop(alloc);
879
880            handle.join().unwrap();
881        }
882    }
883
884    mod memory_tests {
885        use super::*;
886
887        #[test]
888        fn ok_create_layout() {
889            let layout = create_layout(0x1000);
890
891            assert_eq!(layout.align(), 1);
892            assert_eq!(layout.size(), 0x1000);
893        }
894
895        #[test]
896        #[should_panic(expected = "Invalid Layout")]
897        fn err_create_layout() {
898            create_layout(usize::MAX);
899        }
900
901        #[test]
902        fn ok_allocate_layout() {
903            let layout = create_layout(0x10);
904            let pointer = allocate_layout(layout);
905            let raw_ptr = pointer.as_ptr();
906
907            assert!(!raw_ptr.is_null());
908            deallocate_memory(pointer, layout);
909        }
910
911        #[test]
912        fn ok_allocate_layout_allows_write() {
913            let layout = create_layout(0x80);
914            let pointer = allocate_layout(layout);
915
916            unsafe {
917                pointer.as_ptr().write(0x40);
918                assert_eq!(pointer.as_ptr().read(), 0x40);
919            }
920
921            deallocate_memory(pointer, layout);
922        }
923
924        #[test]
925        fn ok_allocate_layout_allows_write_to_entire_slice() {
926            let layout = create_layout(0x200);
927            let pointer = allocate_layout(layout);
928
929            unsafe {
930                for i in 0..0x200 {
931                    pointer.as_ptr().add(i).write((i % 0xFF) as u8);
932                }
933
934                for i in 0..0x200 {
935                    assert_eq!(pointer.as_ptr().add(i).read(), (i % 0xFF) as u8);
936                }
937            }
938
939            deallocate_memory(pointer, layout);
940        }
941    }
942
943    mod alloc_struct {
944        use super::*;
945
946        #[test]
947        fn ok_first_returns_ptr_to_first_buf_from_alloc() {
948            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
949            let alloc = bpool.allocate(0x10);
950
951            assert_eq!(alloc.first(), alloc.pointer.as_ptr());
952        }
953
954        #[test]
955        fn ok_length_returns_length_of_alloc() {
956            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
957            let alloc = bpool.allocate(0x10);
958
959            assert_eq!(alloc.length(), alloc.buffers);
960        }
961
962        #[test]
963        fn ok_allocated_bytes_return_total_allocated_bytes() {
964            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
965            let alloc = bpool.allocate(0x10);
966
967            assert_eq!(alloc.allocated_bytes(), alloc.buffers * BUF_SIZE.bytes());
968        }
969
970        #[test]
971        fn ok_alloc_can_be_shared_across_threads() {
972            let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
973            let alloc = pool.allocate(1);
974
975            std::thread::spawn(move || {
976                drop(alloc);
977            })
978            .join()
979            .unwrap();
980        }
981    }
982
983    mod iterator {
984        use super::*;
985
986        #[test]
987        fn ok_iter_yeilds_all_buffers() {
988            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
989            let alloc = bpool.allocate(4);
990
991            let ptrs: Vec<_> = alloc.iter().collect();
992            assert_eq!(ptrs.len(), 4);
993
994            assert_eq!(ptrs[1] as usize - ptrs[0] as usize, 0x20);
995            assert_eq!(ptrs[2] as usize - ptrs[1] as usize, 0x20);
996            assert_eq!(ptrs[3] as usize - ptrs[2] as usize, 0x20);
997        }
998
999        #[test]
1000        fn ok_iter_yeilds_none_when_exhausted() {
1001            let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
1002            let alloc = bpool.allocate(2);
1003            let mut iter = alloc.iter();
1004
1005            assert!(iter.next().is_some());
1006            assert!(iter.next().is_some());
1007            assert!(iter.next().is_none());
1008            assert!(iter.next().is_none());
1009        }
1010    }
1011}