frozen_core/bufpool.rs
1//! A low-latency memory-budgeted buffer pool to manage fixed-sized buffer allocations
2//!
3//! ## Memory Allocations
4//!
5//! All the allocations are allocated using the global memory allocator as requested (on-the-fly).
6//!
7//! While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
8//! memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
9//!
10//! Memory layout structure,
11//!
12//! ```text
13//! allocation = [[buf0][buf1][buf2]]
14//! where,
15//! - every buffer is of size `buffer_size`
16//! - each buffer is pointed using `*mut u8`
17//! ```
18//!
19//! ## Backpressure
20//!
21//! Every allocation reserves a memory budget and is only allowed to allocate memory if enough
22//! budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
23//! space is available.
24//!
25//! When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
26//! deallocated while relaxing the budget and dropping the backpressure (if any).
27//!
28//! _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
29//! backpressure, as the waiting callers are awaken opportunistically.
30//!
31//! ## Benchmarks
32//!
33//! Observed measurements of latency and throughput,
34//!
35//! | Metric | Value |
36//! |:-----------------------------|:-------------------|
37//! | Allocation Latency (avg) | ~254 nanosecond |
38//! | Allocation Throughput (avg) | ~3.94 million/sec |
39//!
40//! _NOTE:_ All measurements includes the complete RAII lifecycle (i.e. allocation + deallocation).
41//!
42//! Observed allocation latency for `N` buffers,
43//!
44//! | Buffers | Latency |
45//! |:---------|:---------|
46//! | 0x01 | 246 ns |
47//! | 0x10 | 251 ns |
48//! | 0x400 | 300 ns |
49//!
50//! _INFO:_ As seen, the allocation latency stays near constant irrespective to the size of buffers
51//! and the allocated bytes.
52//!
53//! Environment used for benching,
54//!
55//! * OS: NixOS (WSL2)
56//! * Architecture: x86_64
57//! * Memory: 8 GiB RAM (DDR4)
58//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
59//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
60//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
61//!
62//! ## Example
63//!
64//! ```
65//! use frozen_core::{
66//! bufpool::{BufPool, BufPoolCfg, BufferPointer},
67//! utils::BufferSize,
68//! };
69//!
70//! const BUF_SIZE: BufferSize = BufferSize::S16;
71//!
72//! let pool = BufPool::new(BufPoolCfg {
73//! buffer_size: BUF_SIZE,
74//! max_memory: BUF_SIZE as usize * 0x40,
75//! });
76//!
77//! let alloc = pool.allocate(0x2A);
78//!
79//! assert_eq!(alloc.length(), 0x2A);
80//! assert!(!alloc.first().is_null());
81//! assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x2A);
82//!
83//! let ptrs: Vec<BufferPointer> = alloc.iter().collect();
84//! assert_eq!(ptrs.len(), 0x2A);
85//! ```
86
87use std::{alloc, ptr, sync, sync::atomic};
88
89/// An unsafe pointer to an individual in memory buffer
90///
91/// ## Safety
92///
93/// The pointer is untyped and uninitialized. Caller is responsible for:
94///
95/// * Writes stay within the bounds, while the size of each buffer is [`BufPoolCfg::buffer_size`]
96/// * Reads should only occur after initilization/write is completed on the buffer
97/// * The pointer must not outlive the lifetime of [`BufPoolAllocation`] object
98///
99/// ## Example
100///
101/// ```
102/// use frozen_core::{
103/// bufpool::{BufPool, BufPoolCfg},
104/// utils::BufferSize,
105/// };
106///
107/// const BUF_SIZE: BufferSize = BufferSize::S16;
108/// const BUFFER: [u8; BUF_SIZE as usize] = [1u8; BUF_SIZE as usize];
109///
110/// let pool = BufPool::new(BufPoolCfg {
111/// buffer_size: BUF_SIZE,
112/// max_memory: BUF_SIZE as usize * 0x0A,
113/// });
114///
115/// let alloc = pool.allocate(1);
116/// unsafe {
117/// std::ptr::copy_nonoverlapping(BUFFER.as_ptr(), alloc.first(), BUF_SIZE as usize);
118/// }
119/// ```
120pub type BufferPointer = *mut u8;
121
122/// All the available configrations for [`BufPool`]
123///
124/// ## Example
125///
126/// ```
127/// use frozen_core::{bufpool::BufPoolCfg, utils::BufferSize};
128///
129/// const BUF_SIZE: BufferSize = BufferSize::S64;
130///
131/// let cfg = BufPoolCfg {
132/// buffer_size: BUF_SIZE,
133/// max_memory: BUF_SIZE as usize * 0x1000,
134/// };
135///
136/// assert_ne!(cfg.max_memory, 0);
137/// assert!(cfg.max_memory > cfg.buffer_size.bytes());
138/// ```
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140pub struct BufPoolCfg {
141 /// Size (in bytes) of an indivdual buffer unit allocated
142 pub buffer_size: crate::utils::BufferSize,
143
144 /// Maximum allowed memory (in bytes) to be simultaneosuly allocated by [`BufPool`]
145 ///
146 /// _IMPORTANT:_ When trying to allocate more memory then [`BufPoolCfg::max_memory`] via
147 /// [`BufPool::allocate`], a deadlock will happen due to memory budgeting in place. The caller
148 /// must make sure the `max_meory` is high enough to avoid this scenerio.
149 ///
150 /// _NOTE:_ To avoid backpressure, set the `max_memory` to an arbitrary large value. This
151 /// would not have any direct impact on performance or resource usage, and will avoid
152 /// backpressure under heavy workload.
153 pub max_memory: usize,
154}
155
156/// An implementation of a low-latency memory-budgeted buffer pool managing fixed-sized buffer
157/// allocations
158///
159/// ## Blocking `drop`
160///
161/// Dropping the [`BufPool`] instance from memory blocks unitl all the allocated instances of
162/// [`BufPoolAllocations`] and all there references are dropped from memory.
163///
164/// This is in place to avoid memory leaks, as well as to enable sending [`BufPoolAllocation`]
165/// objects across threads while being tied to the lifecyle of [`BufPool`].
166///
167/// ## Memory Allocations
168///
169/// All the allocations are allocated using the global memory allocator as requested (on-the-fly).
170///
171/// While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
172/// memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
173///
174/// Memory layout structure,
175///
176/// ```text
177/// allocation = [[buf0][buf1][buf2]]
178/// where,
179/// - every buffer is of size `buffer_size`
180/// - each buffer is pointed using `*mut u8`
181/// ```
182///
183/// ## Backpressure
184///
185/// Every allocation reserves a memory budget and is only allowed to allocate memory if enough
186/// budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
187/// space is available.
188///
189/// When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
190/// deallocated while relaxing the budget and dropping the backpressure (if any).
191///
192/// _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
193/// backpressure, as the waiting callers are awaken opportunistically.
194///
195/// ## Example
196///
197/// ```
198/// use frozen_core::{
199/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
200/// utils::BufferSize,
201/// };
202///
203/// const BUF_SIZE: BufferSize = BufferSize::S16;
204///
205/// let pool = BufPool::new(BufPoolCfg {
206/// buffer_size: BUF_SIZE,
207/// max_memory: BUF_SIZE as usize * 0x40,
208/// });
209///
210/// let alloc = pool.allocate(0x30);
211///
212/// assert_eq!(alloc.length(), 0x30);
213/// assert!(!alloc.first().is_null());
214/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x30);
215///
216/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
217/// assert_eq!(ptrs.len(), 0x30);
218/// ```
219#[derive(Debug)]
220pub struct BufPool {
221 active_allocations: atomic::AtomicUsize,
222 allocation_cv: sync::Condvar,
223 allocation_lock: sync::Mutex<()>,
224 allocated_memory: atomic::AtomicUsize,
225 cfg: BufPoolCfg,
226 shutdown_cv: sync::Condvar,
227 shutdown_lock: sync::Mutex<()>,
228}
229
230unsafe impl Send for BufPool {}
231unsafe impl Sync for BufPool {}
232
233impl BufPool {
234 /// Create a new instance of [`BufPool`]
235 ///
236 /// ## Debug Assertions
237 ///
238 /// In debug builds, this function uses `debug_assertion` to prevent invalid configurations.
239 /// Caller must refer to [`BufPoolCfg`] for details about the config params.
240 ///
241 /// ## Example
242 ///
243 /// ```
244 /// use frozen_core::{
245 /// bufpool::{BufPool, BufPoolCfg},
246 /// utils::BufferSize,
247 /// };
248 ///
249 /// const BUF_SIZE: BufferSize = BufferSize::S16;
250 ///
251 /// let pool = BufPool::new(BufPoolCfg {
252 /// buffer_size: BUF_SIZE,
253 /// max_memory: BUF_SIZE as usize * 0x0A,
254 /// });
255 ///
256 /// let alloc = pool.allocate(1);
257 ///
258 /// assert_eq!(alloc.length(), 1);
259 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize);
260 /// ```
261 #[inline]
262 pub fn new(cfg: BufPoolCfg) -> Self {
263 // sanity check
264 debug_assert!(
265 cfg.buffer_size.bytes() < cfg.max_memory,
266 "MAX_MEMORY must always be larger than the BUFFER_SIZE"
267 );
268
269 Self {
270 cfg,
271 active_allocations: atomic::AtomicUsize::new(0),
272 allocated_memory: atomic::AtomicUsize::new(0),
273 allocation_cv: sync::Condvar::new(),
274 allocation_lock: sync::Mutex::new(()),
275 shutdown_cv: sync::Condvar::new(),
276 shutdown_lock: sync::Mutex::new(()),
277 }
278 }
279
280 /// Allocate `required` number of buffers each of [`BufPoolCfg::buffer_size`] size
281 ///
282 /// ## Allocation Layout
283 ///
284 /// The allocation is a large contineous slice of memory w/ size as
285 ///
286 /// ```text
287 /// BufPoolCfg::buffer_size.bytes() * n_buffers
288 /// ```
289 ///
290 /// Memory layout structure,
291 ///
292 /// ```text
293 /// allocation = [[buf0][buf1][buf2]]
294 /// where,
295 /// - every buffer is of size `buffer_size`
296 /// - each buffer is pointed using `*mut u8`
297 /// ```
298 ///
299 /// ## RAII Safety
300 ///
301 /// The allocation object, i.e. [`BufPoolAllocation`], is RAII safe. The allocated memory is
302 /// automatically deallocated as soon as the last reference to the object is dropped, while also
303 /// relaxing the memory budget to eliminate backpressure (if any).
304 ///
305 /// ## Important Considerations
306 ///
307 /// The number of buffers required should never exceed `u16::MAX`. This is an abstract soft
308 /// limit and should be enforced by the public interface to avoid any weird exhaustion issues
309 /// or unknown bugs across the storage system.
310 ///
311 /// As `u16::MAX` is large enough value to almost never cause any problems for a single write
312 /// operation, this soft limit acts as a guidline to safely operate with arithmatic operations
313 /// across storage engine(s), including but not limited to [`frozen_core`].
314 ///
315 /// ## Example
316 ///
317 /// ```
318 /// use frozen_core::{
319 /// bufpool::{BufPool, BufPoolCfg, BufferPointer},
320 /// utils::BufferSize,
321 /// };
322 ///
323 /// const BUF_SIZE: BufferSize = BufferSize::S16;
324 ///
325 /// let pool = BufPool::new(BufPoolCfg {
326 /// buffer_size: BUF_SIZE,
327 /// max_memory: BUF_SIZE as usize * 0x14,
328 /// });
329 ///
330 /// let alloc = pool.allocate(0x0A);
331 ///
332 /// assert_eq!(alloc.length(), 0x0A);
333 /// assert!(!alloc.first().is_null());
334 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
335 ///
336 /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
337 /// assert_eq!(ptrs.len(), 0x0A);
338 /// ```
339 #[inline(always)]
340 pub fn allocate(&self, required: usize) -> BufPoolAllocation {
341 // sanity checks
342 debug_assert!(required > 0, "required buffers must never be 0");
343 debug_assert!(
344 required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
345 "Total required bytes must be smaller then the MAX_MEMORY allowed to avoid deadlock"
346 );
347 debug_assert!(
348 required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
349 "Total required bytes must never exceed `u16::MAX` to avoid arithmatic overflows"
350 );
351
352 let required_bytes = self.cfg.buffer_size.bytes() * required;
353 loop {
354 let current_bytes = self.allocated_memory.load(atomic::Ordering::Acquire);
355 if current_bytes + required_bytes > self.cfg.max_memory {
356 self.backpressure(required_bytes);
357 continue;
358 }
359
360 match self.allocated_memory.compare_exchange(
361 current_bytes,
362 current_bytes + required_bytes,
363 atomic::Ordering::AcqRel,
364 atomic::Ordering::Acquire,
365 ) {
366 Ok(_) => break,
367 Err(_) => continue,
368 }
369 }
370
371 let layout = create_layout(required_bytes);
372 let pointer = allocate_layout(layout);
373 self.active_allocations.fetch_add(1, atomic::Ordering::Relaxed);
374
375 BufPoolAllocation {
376 layout,
377 pointer,
378 required_bytes,
379 buffers: required,
380 pool: ptr::NonNull::from(self),
381 }
382 }
383
384 /// Applies backpressure until enough memory budget is available for the allocation
385 ///
386 /// ## Why we ignore [`std::sync::PoisonError`]
387 ///
388 /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
389 /// protect any mutable state. All the pool invariants and accounting are maintained via
390 /// atomics and are completely seperated from the mutex.
391 ///
392 /// A poisoned mutex only indicates that another tx panicked while holding the lock, and
393 /// indicates an inconsistent state of the protected value. Since no state can be left
394 /// partially modified under this lock, there is no possible consistency risk to recover from
395 /// and propagating the poison error would only introduce unnecessary failures into the
396 /// allocation path.
397 ///
398 /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
399 /// with the recovered guard.
400 #[inline]
401 fn backpressure(&self, required_bytes: usize) {
402 let mut guard = self.allocation_lock.lock().unwrap_or_else(|e| e.into_inner());
403 while self.allocated_memory.load(atomic::Ordering::Acquire) + required_bytes
404 > self.cfg.max_memory
405 {
406 guard = self.allocation_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
407 }
408 }
409}
410
411impl Drop for BufPool {
412 fn drop(&mut self) {
413 // NOTE: See [`BufPool::backpressure`] implementation for rationale behind poison recovery
414
415 let mut guard = self.shutdown_lock.lock().unwrap_or_else(|e| e.into_inner());
416 while self.active_allocations.load(atomic::Ordering::Acquire) != 0 {
417 guard = self.shutdown_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
418 }
419 }
420}
421
422/// A RAII safe allocation object containing allocated buffers
423///
424/// ## Lifetime
425///
426/// The object can/may outlive the scope that created it, while also being able to transfer across
427/// threads. As, internally the [`BufPool`] tracks all the active allocations and delays the drop
428/// until every allocation and all there references are dropped from memory.
429///
430/// ## Example
431///
432/// ```
433/// use frozen_core::{
434/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
435/// utils::BufferSize,
436/// };
437///
438/// const BUF_SIZE: BufferSize = BufferSize::S16;
439///
440/// let pool = BufPool::new(BufPoolCfg {
441/// buffer_size: BUF_SIZE,
442/// max_memory: BUF_SIZE as usize * 0x10,
443/// });
444///
445/// let alloc = pool.allocate(0x0A);
446///
447/// assert_eq!(alloc.length(), 0x0A);
448/// assert!(!alloc.first().is_null());
449/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
450///
451/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
452/// assert_eq!(ptrs.len(), 0x0A);
453/// ```
454#[derive(Debug)]
455pub struct BufPoolAllocation {
456 buffers: usize,
457 layout: alloc::Layout,
458 pointer: ptr::NonNull<u8>,
459 pool: ptr::NonNull<BufPool>,
460 required_bytes: usize,
461}
462
463unsafe impl Send for BufPoolAllocation {}
464
465impl BufPoolAllocation {
466 /// Returns a [`BufferPointer`] to the first buffer from the allocated list of buffers
467 ///
468 /// _NOTE:_ The returned [`BufferPointer`] can also be used as a _base_pointer_ to operate on
469 /// the entire allocated memory slice.
470 ///
471 /// ## Example
472 ///
473 /// ```
474 /// use frozen_core::{
475 /// bufpool::{BufPool, BufPoolCfg},
476 /// utils::BufferSize,
477 /// };
478 ///
479 /// const BUF_SIZE: BufferSize = BufferSize::S32;
480 ///
481 /// let pool = BufPool::new(BufPoolCfg {
482 /// buffer_size: BUF_SIZE,
483 /// max_memory: BUF_SIZE as usize * 0x0A,
484 /// });
485 ///
486 /// let alloc = pool.allocate(0x0A);
487 /// assert!(!alloc.first().is_null());
488 /// ```
489 #[inline]
490 pub const fn first(&self) -> BufferPointer {
491 self.pointer.as_ptr()
492 }
493
494 /// Returns the total number of allocated buffers
495 ///
496 /// _IMPORTANT:_ The returned value is always equal to the `required` value using while
497 /// calling [`BufPool::allocate`].
498 ///
499 /// ## Example
500 ///
501 /// ```
502 /// use frozen_core::{
503 /// bufpool::{BufPool, BufPoolCfg},
504 /// utils::BufferSize,
505 /// };
506 ///
507 /// const BUF_SIZE: BufferSize = BufferSize::S64;
508 ///
509 /// let pool = BufPool::new(BufPoolCfg {
510 /// buffer_size: BUF_SIZE,
511 /// max_memory: BUF_SIZE as usize * 0x0A,
512 /// });
513 ///
514 /// let alloc = pool.allocate(0x0A);
515 /// assert_eq!(alloc.length(), 0x0A);
516 /// ```
517 #[inline]
518 pub const fn length(&self) -> usize {
519 self.buffers
520 }
521
522 /// Returns the total number of bytes of memory allocated
523 ///
524 /// ## Example
525 ///
526 /// ```
527 /// use frozen_core::{
528 /// bufpool::{BufPool, BufPoolCfg},
529 /// utils::BufferSize,
530 /// };
531 ///
532 /// const BUF_SIZE: BufferSize = BufferSize::S16;
533 ///
534 /// let pool = BufPool::new(BufPoolCfg {
535 /// buffer_size: BUF_SIZE,
536 /// max_memory: BUF_SIZE as usize * 0x0A,
537 /// });
538 ///
539 /// let alloc = pool.allocate(0x0A);
540 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
541 /// ```
542 #[inline]
543 pub const fn allocated_bytes(&self) -> usize {
544 self.required_bytes
545 }
546
547 /// A custom [`Iterator`] implementation to enable iteration over the list of allocated buffers
548 /// from [`BufPoolAllocation`]
549 ///
550 /// _NOTE:_ Each yielded pointer refers to a unique individual buffer each of size
551 /// [`BufPoolCfg::buffer_size`].
552 ///
553 /// ## Example
554 ///
555 /// ```
556 /// use frozen_core::{
557 /// bufpool::{BufPool, BufPoolCfg, BufferPointer},
558 /// utils::BufferSize,
559 /// };
560 ///
561 /// const BUF_SIZE: BufferSize = BufferSize::S16;
562 ///
563 /// let pool = BufPool::new(BufPoolCfg {
564 /// buffer_size: BUF_SIZE,
565 /// max_memory: BUF_SIZE as usize * 0x14,
566 /// });
567 ///
568 /// let alloc = pool.allocate(0x0A);
569 /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
570 ///
571 /// assert_eq!(ptrs.len(), 0x0A);
572 /// ```
573 #[inline]
574 pub fn iter(&self) -> BufPoolAllocationIter {
575 let pool = unsafe { self.pool.as_ref() };
576 BufPoolAllocationIter {
577 pointer: self.pointer,
578 buffer_size: pool.cfg.buffer_size.bytes(),
579 remaining: self.buffers,
580 }
581 }
582}
583
584impl Drop for BufPoolAllocation {
585 fn drop(&mut self) {
586 let pool = unsafe { self.pool.as_ref() };
587 deallocate_memory(self.pointer, self.layout);
588
589 pool.allocated_memory.fetch_sub(self.required_bytes, atomic::Ordering::Release);
590 pool.allocation_cv.notify_one();
591
592 if pool.active_allocations.fetch_sub(1, atomic::Ordering::Release) == 1 {
593 pool.shutdown_cv.notify_one();
594 }
595 }
596}
597
598/// A custom [`Iterator`] object to iterate over the list of allocated buffers
599///
600/// _NOTE:_ Buffers are yielded in allocation order and are backed by a single contiguous memory
601/// region.
602///
603/// ## Example
604///
605/// ```rs
606/// use frozen_core::{
607/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
608/// utils::BufferSize,
609/// };
610///
611/// const BUF_SIZE: BufferSize = BufferSize::S32;
612///
613/// let pool = BufPool::new(BufPoolCfg {
614/// buffer_size: BUF_SIZE,
615/// max_memory: BUF_SIZE as usize * 0x14,
616/// });
617///
618/// let alloc = pool.allocate(0x0A);
619/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
620///
621/// assert_eq!(ptrs.len(), 0x0A);
622/// ```
623#[derive(Debug)]
624pub struct BufPoolAllocationIter {
625 pointer: ptr::NonNull<u8>,
626 buffer_size: usize,
627 remaining: usize,
628}
629
630impl Iterator for BufPoolAllocationIter {
631 type Item = BufferPointer;
632
633 #[inline(always)]
634 fn next(&mut self) -> Option<Self::Item> {
635 if self.remaining == 0 {
636 return None;
637 }
638
639 let curr_ptr = self.pointer;
640
641 self.pointer = unsafe { self.pointer.add(self.buffer_size) };
642 self.remaining -= 1;
643
644 Some(curr_ptr.as_ptr())
645 }
646}
647
648/// Creates a array layout with given `capacity`
649///
650/// _NOTE:_ Use of `unwrap` is totally safe as the panic, if any, would be caught by unit tests and
651/// would be the indication of incorrect impl and not any runtime failures.
652#[inline]
653fn create_layout(required_bytes: usize) -> alloc::Layout {
654 match alloc::Layout::array::<u8>(required_bytes) {
655 Ok(layout) => layout,
656 Err(e) => panic!("Invalid Layout: {e}"),
657 }
658}
659
660/// Allocate a memory slice with given allocation `layout`
661///
662/// ## Allocation Failure
663///
664/// If the allocator is unable to satisfy the request (typically due to an OOM condition),
665/// [`alloc::alloc`] returns a null pointer.
666///
667/// In such cases we delegate to [`alloc::handle_alloc_error`], matching the behavior of std library
668/// types such as [`Vec`], [`Box`] and [`String`].
669///
670/// This path aborts the process and never returns. Allocation failures are therefore treated as
671/// fatal runtime conditions rather than recoverable errors.
672///
673/// Under normal operation this path should never be reached, as memory usage is expected to be
674/// bounded by the buffer pools memory budget and backpressure mechanisms.
675///
676/// ## Why not return `FrozenErr`
677///
678/// A null return from [`alloc::alloc`] indicates that the global allocator itself was unable to
679/// satisfy the request.
680///
681/// Delegating to [`alloc::handle_alloc_error`] matches the behavior of standard library containers
682/// and avoids continuing execution after a catastrophic allocator failure, where further
683/// allocations required for error handling, logging or recovery may also fail.
684#[inline]
685fn allocate_layout(layout: alloc::Layout) -> ptr::NonNull<u8> {
686 let pointer = unsafe { alloc::alloc(layout) };
687 match ptr::NonNull::new(pointer) {
688 Some(p) => p,
689 None => alloc::handle_alloc_error(layout),
690 }
691}
692
693/// Deallocate the manually allocated slice of memory with help of given `pointer` and mem `layout`
694#[inline]
695fn deallocate_memory(pointer: ptr::NonNull<u8>, layout: alloc::Layout) {
696 unsafe { alloc::dealloc(pointer.as_ptr(), layout) };
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702
703 const BUF_SIZE: crate::utils::BufferSize = crate::utils::BufferSize::S32;
704
705 #[inline]
706 fn create_bufpool(max_mem: usize) -> BufPool {
707 BufPool::new(BufPoolCfg { buffer_size: BUF_SIZE, max_memory: max_mem })
708 }
709
710 #[test]
711 #[should_panic]
712 #[cfg(debug_assertions)]
713 fn err_new_with_invalid_cfg() {
714 create_bufpool(BUF_SIZE.bytes() >> 1);
715 }
716
717 #[test]
718 #[should_panic]
719 #[cfg(debug_assertions)]
720 fn err_alloc_zero() {
721 let bpool = create_bufpool(BUF_SIZE.bytes());
722 let _ = bpool.allocate(0);
723 }
724
725 #[test]
726 #[should_panic]
727 #[cfg(debug_assertions)]
728 fn err_alloc_more_then_max_memory() {
729 let bpool = create_bufpool(BUF_SIZE.bytes());
730 let _ = bpool.allocate(2);
731 }
732
733 #[test]
734 fn ok_alloc_single() {
735 let bpool = create_bufpool(BUF_SIZE.bytes() * 2);
736 let alloc = bpool.allocate(1);
737
738 assert_eq!(alloc.buffers, 1);
739 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes());
740 }
741
742 #[test]
743 fn ok_alloc_multiple() {
744 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
745 let alloc = bpool.allocate(0x10);
746
747 assert_eq!(alloc.buffers, 0x10);
748 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x10);
749 }
750
751 #[test]
752 fn ok_alloc_max_memory() {
753 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
754 let alloc = bpool.allocate(0x0A);
755
756 assert_eq!(alloc.buffers, 0x0A);
757 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x0A);
758 }
759
760 #[test]
761 fn ok_alloc_updates_memory_accounting() {
762 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
763 let alloc = bpool.allocate(0x10);
764
765 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 0x10);
766 drop(alloc);
767 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
768 }
769
770 #[test]
771 fn ok_alloc_updates_active_allocation_tracking() {
772 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x2A);
773
774 let alloc1 = bpool.allocate(0x10);
775 let alloc2 = bpool.allocate(0x10);
776
777 assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 2);
778 let _ = (drop(alloc1), drop(alloc2));
779 assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 0);
780 }
781
782 #[test]
783 fn ok_alloc_decrments_allocated_memory_after_deallocations() {
784 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x80);
785 let allocations: Vec<_> = (0..0x20).map(|_| bpool.allocate(2)).collect();
786
787 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0x20 * 0x40);
788 drop(allocations);
789 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
790 }
791
792 #[test]
793 fn ok_backpressure_blocks_till_memory_is_deallocated() {
794 let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
795 let alloc = bpool.allocate(1);
796
797 let pool2 = bpool.clone();
798 let barrier = sync::Arc::new(sync::Barrier::new(2));
799 let barrier2 = barrier.clone();
800
801 let handle = std::thread::spawn(move || {
802 barrier2.wait();
803
804 let start = std::time::Instant::now();
805 let _alloc = pool2.allocate(2);
806
807 start.elapsed()
808 });
809
810 barrier.wait();
811
812 std::thread::sleep(std::time::Duration::from_millis(100));
813 drop(alloc);
814
815 let elapsed = handle.join().expect("allocation thread should not panic");
816 assert!(elapsed >= std::time::Duration::from_millis(100));
817 }
818
819 #[test]
820 fn ok_concurrent_allocations() {
821 let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1000));
822
823 let mut handles = Vec::new();
824 for _ in 0..0x0A {
825 let pool = pool.clone();
826
827 handles.push(std::thread::spawn(move || {
828 for _ in 0..0x64 {
829 drop(pool.allocate(1));
830 }
831 }));
832 }
833
834 for h in handles {
835 h.join().unwrap();
836 }
837
838 assert_eq!(pool.allocated_memory.load(atomic::Ordering::Acquire), 0);
839 assert_eq!(pool.active_allocations.load(atomic::Ordering::Acquire), 0);
840 }
841
842 mod drop {
843 use super::*;
844
845 #[test]
846 fn ok_partial_drop_updates_accounting() {
847 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
848
849 let alloc1 = bpool.allocate(2);
850 let alloc2 = bpool.allocate(2);
851
852 assert_eq!(
853 bpool.allocated_memory.load(atomic::Ordering::Acquire),
854 BUF_SIZE.bytes() * 4
855 );
856 drop(alloc1);
857
858 assert_eq!(
859 bpool.allocated_memory.load(atomic::Ordering::Acquire),
860 BUF_SIZE.bytes() * 2
861 );
862 drop(alloc2);
863
864 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
865 }
866
867 #[test]
868 fn ok_drop_waits_for_active_allocations() {
869 let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1A));
870 let alloc = bpool.allocate(0x10);
871
872 let handle = std::thread::spawn(move || {
873 drop(bpool);
874 });
875
876 std::thread::sleep(std::time::Duration::from_millis(0x64));
877 assert!(!handle.is_finished());
878 drop(alloc);
879
880 handle.join().unwrap();
881 }
882 }
883
884 mod memory_tests {
885 use super::*;
886
887 #[test]
888 fn ok_create_layout() {
889 let layout = create_layout(0x1000);
890
891 assert_eq!(layout.align(), 1);
892 assert_eq!(layout.size(), 0x1000);
893 }
894
895 #[test]
896 #[should_panic(expected = "Invalid Layout")]
897 fn err_create_layout() {
898 create_layout(usize::MAX);
899 }
900
901 #[test]
902 fn ok_allocate_layout() {
903 let layout = create_layout(0x10);
904 let pointer = allocate_layout(layout);
905 let raw_ptr = pointer.as_ptr();
906
907 assert!(!raw_ptr.is_null());
908 deallocate_memory(pointer, layout);
909 }
910
911 #[test]
912 fn ok_allocate_layout_allows_write() {
913 let layout = create_layout(0x80);
914 let pointer = allocate_layout(layout);
915
916 unsafe {
917 pointer.as_ptr().write(0x40);
918 assert_eq!(pointer.as_ptr().read(), 0x40);
919 }
920
921 deallocate_memory(pointer, layout);
922 }
923
924 #[test]
925 fn ok_allocate_layout_allows_write_to_entire_slice() {
926 let layout = create_layout(0x200);
927 let pointer = allocate_layout(layout);
928
929 unsafe {
930 for i in 0..0x200 {
931 pointer.as_ptr().add(i).write((i % 0xFF) as u8);
932 }
933
934 for i in 0..0x200 {
935 assert_eq!(pointer.as_ptr().add(i).read(), (i % 0xFF) as u8);
936 }
937 }
938
939 deallocate_memory(pointer, layout);
940 }
941 }
942
943 mod alloc_struct {
944 use super::*;
945
946 #[test]
947 fn ok_first_returns_ptr_to_first_buf_from_alloc() {
948 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
949 let alloc = bpool.allocate(0x10);
950
951 assert_eq!(alloc.first(), alloc.pointer.as_ptr());
952 }
953
954 #[test]
955 fn ok_length_returns_length_of_alloc() {
956 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
957 let alloc = bpool.allocate(0x10);
958
959 assert_eq!(alloc.length(), alloc.buffers);
960 }
961
962 #[test]
963 fn ok_allocated_bytes_return_total_allocated_bytes() {
964 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
965 let alloc = bpool.allocate(0x10);
966
967 assert_eq!(alloc.allocated_bytes(), alloc.buffers * BUF_SIZE.bytes());
968 }
969
970 #[test]
971 fn ok_alloc_can_be_shared_across_threads() {
972 let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
973 let alloc = pool.allocate(1);
974
975 std::thread::spawn(move || {
976 drop(alloc);
977 })
978 .join()
979 .unwrap();
980 }
981 }
982
983 mod iterator {
984 use super::*;
985
986 #[test]
987 fn ok_iter_yeilds_all_buffers() {
988 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
989 let alloc = bpool.allocate(4);
990
991 let ptrs: Vec<_> = alloc.iter().collect();
992 assert_eq!(ptrs.len(), 4);
993
994 assert_eq!(ptrs[1] as usize - ptrs[0] as usize, 0x20);
995 assert_eq!(ptrs[2] as usize - ptrs[1] as usize, 0x20);
996 assert_eq!(ptrs[3] as usize - ptrs[2] as usize, 0x20);
997 }
998
999 #[test]
1000 fn ok_iter_yeilds_none_when_exhausted() {
1001 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
1002 let alloc = bpool.allocate(2);
1003 let mut iter = alloc.iter();
1004
1005 assert!(iter.next().is_some());
1006 assert!(iter.next().is_some());
1007 assert!(iter.next().is_none());
1008 assert!(iter.next().is_none());
1009 }
1010 }
1011}