frozen_core/bufpool.rs
1//! A low-latency memory-budgeted buffer pool to manage fixed-sized buffer allocations
2//!
3//! ## Memory Allocations
4//!
5//! All the allocations are allocated using the global memory allocator as requested (on-the-fly).
6//!
7//! While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
8//! memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
9//!
10//! Memory layout structure,
11//!
12//! ```text
13//! allocation = [[buf0][buf1][buf2]]
14//! where,
15//! - every buffer is of size `buffer_size`
16//! - each buffer is pointed using `*mut u8`
17//! ```
18//!
19//! ## Backpressure
20//!
21//! Every allocation reserves a memory budget and is only allowed to allocate memory if enough
22//! budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
23//! space is available.
24//!
25//! When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
26//! deallocated while relaxing the budget and dropping the backpressure (if any).
27//!
28//! _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
29//! backpressure, as the waiting callers are awaken opportunistically.
30//!
31//! ## Benchmarks
32//!
33//! Observed measurements of latency and throughput,
34//!
35//! ```md
36//! | Metric | Value |
37//! |:-----------------------------|:-------------------|
38//! | Allocation Latency (avg) | ~254 nanosecond |
39//! | Allocation Throughput (avg) | ~3.94 million/sec |
40//! ```
41//!
42//! _NOTE:_ All measurements includes the complete RAII lifecycle (i.e. allocation + deallocation).
43//!
44//! Observed allocation latency for `N` buffers,
45//!
46//! ```md
47//! | Buffers | Latency |
48//! |:---------|:---------|
49//! | 0x01 | 246 ns |
50//! | 0x10 | 251 ns |
51//! | 0x400 | 300 ns |
52//! ```
53//!
54//! _INFO:_ As seen, the allocation latency stays near constant irrespective to the size of buffers
55//! and the allocated bytes.
56//!
57//! Environment used for benching,
58//!
59//! * OS: NixOS (WSL2)
60//! * Architecture: x86_64
61//! * Memory: 8 GiB RAM (DDR4)
62//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
63//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
64//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
65//!
66//! ## Example
67//!
68//! ```
69//! use frozen_core::{
70//! bufpool::{BufPool, BufPoolCfg, BufferPointer},
71//! utils::BufferSize,
72//! };
73//!
74//! const BUF_SIZE: BufferSize = BufferSize::S16;
75//!
76//! let pool = BufPool::new(BufPoolCfg {
77//! buffer_size: BUF_SIZE,
78//! max_memory: BUF_SIZE as usize * 0x40,
79//! });
80//!
81//! let alloc = pool.allocate(0x2A);
82//!
83//! assert_eq!(alloc.length(), 0x2A);
84//! assert!(!alloc.first().is_null());
85//! assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x2A);
86//!
87//! let ptrs: Vec<BufferPointer> = alloc.iter().collect();
88//! assert_eq!(ptrs.len(), 0x2A);
89//! ```
90
91use std::{alloc, ptr, sync, sync::atomic};
92
93/// An unsafe pointer to an individual in memory buffer
94///
95/// ## Safety
96///
97/// The pointer is untyped and uninitialized. Caller is responsible for:
98///
99/// * Writes stay within the bounds, while the size of each buffer is [`BufPoolCfg::buffer_size`]
100/// * Reads should only occur after initilization/write is completed on the buffer
101/// * The pointer must not outlive the lifetime of [`BufPoolAllocation`] object
102///
103/// ## Example
104///
105/// ```
106/// use frozen_core::{
107/// bufpool::{BufPool, BufPoolCfg},
108/// utils::BufferSize,
109/// };
110///
111/// const BUF_SIZE: BufferSize = BufferSize::S16;
112/// const BUFFER: [u8; BUF_SIZE as usize] = [1u8; BUF_SIZE as usize];
113///
114/// let pool = BufPool::new(BufPoolCfg {
115/// buffer_size: BUF_SIZE,
116/// max_memory: BUF_SIZE as usize * 0x0A,
117/// });
118///
119/// let alloc = pool.allocate(1);
120/// unsafe {
121/// std::ptr::copy_nonoverlapping(BUFFER.as_ptr(), alloc.first(), BUF_SIZE as usize);
122/// }
123/// ```
124pub type BufferPointer = *mut u8;
125
126/// All the available configrations for [`BufPool`]
127///
128/// ## Example
129///
130/// ```
131/// use frozen_core::{bufpool::BufPoolCfg, utils::BufferSize};
132///
133/// const BUF_SIZE: BufferSize = BufferSize::S64;
134///
135/// let cfg = BufPoolCfg {
136/// buffer_size: BUF_SIZE,
137/// max_memory: BUF_SIZE as usize * 0x1000,
138/// };
139///
140/// assert_ne!(cfg.max_memory, 0);
141/// assert!(cfg.max_memory > cfg.buffer_size.bytes());
142/// ```
143#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub struct BufPoolCfg {
145 /// Size (in bytes) of an indivdual buffer unit allocated
146 pub buffer_size: crate::utils::BufferSize,
147
148 /// Maximum allowed memory (in bytes) to be simultaneosuly allocated by [`BufPool`]
149 ///
150 /// _IMPORTANT:_ When trying to allocate more memory then [`BufPoolCfg::max_memory`] via
151 /// [`BufPool::allocate`], a deadlock will happen due to memory budgeting in place. The caller
152 /// must make sure the `max_meory` is high enough to avoid this scenerio.
153 ///
154 /// _NOTE:_ To avoid backpressure, set the `max_memory` to an arbitrary large value. This
155 /// would not have any direct impact on performance or resource usage, and will avoid
156 /// backpressure under heavy workload.
157 pub max_memory: usize,
158}
159
160/// An implementation of a low-latency memory-budgeted buffer pool managing fixed-sized buffer
161/// allocations
162///
163/// ## Blocking `drop`
164///
165/// Dropping the [`BufPool`] instance from memory blocks unitl all the allocated instances of
166/// [`BufPoolAllocations`] and all there references are dropped from memory.
167///
168/// This is in place to avoid memory leaks, as well as to enable sending [`BufPoolAllocation`]
169/// objects across threads while being tied to the lifecyle of [`BufPool`].
170///
171/// ## Memory Allocations
172///
173/// All the allocations are allocated using the global memory allocator as requested (on-the-fly).
174///
175/// While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
176/// memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
177///
178/// Memory layout structure,
179///
180/// ```text
181/// allocation = [[buf0][buf1][buf2]]
182/// where,
183/// - every buffer is of size `buffer_size`
184/// - each buffer is pointed using `*mut u8`
185/// ```
186///
187/// ## Backpressure
188///
189/// Every allocation reserves a memory budget and is only allowed to allocate memory if enough
190/// budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
191/// space is available.
192///
193/// When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
194/// deallocated while relaxing the budget and dropping the backpressure (if any).
195///
196/// _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
197/// backpressure, as the waiting callers are awaken opportunistically.
198///
199/// ## Example
200///
201/// ```
202/// use frozen_core::{
203/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
204/// utils::BufferSize,
205/// };
206///
207/// const BUF_SIZE: BufferSize = BufferSize::S16;
208///
209/// let pool = BufPool::new(BufPoolCfg {
210/// buffer_size: BUF_SIZE,
211/// max_memory: BUF_SIZE as usize * 0x40,
212/// });
213///
214/// let alloc = pool.allocate(0x30);
215///
216/// assert_eq!(alloc.length(), 0x30);
217/// assert!(!alloc.first().is_null());
218/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x30);
219///
220/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
221/// assert_eq!(ptrs.len(), 0x30);
222/// ```
223#[derive(Debug)]
224pub struct BufPool {
225 active_allocations: atomic::AtomicUsize,
226 allocation_cv: sync::Condvar,
227 allocation_lock: sync::Mutex<()>,
228 allocated_memory: atomic::AtomicUsize,
229 cfg: BufPoolCfg,
230 shutdown_cv: sync::Condvar,
231 shutdown_lock: sync::Mutex<()>,
232}
233
234unsafe impl Send for BufPool {}
235unsafe impl Sync for BufPool {}
236
237impl BufPool {
238 /// Create a new instance of [`BufPool`]
239 ///
240 /// ## Debug Assertions
241 ///
242 /// In debug builds, this function uses `debug_assertion` to prevent invalid configurations.
243 /// Caller must refer to [`BufPoolCfg`] for details about the config params.
244 ///
245 /// ## Example
246 ///
247 /// ```
248 /// use frozen_core::{
249 /// bufpool::{BufPool, BufPoolCfg},
250 /// utils::BufferSize,
251 /// };
252 ///
253 /// const BUF_SIZE: BufferSize = BufferSize::S16;
254 ///
255 /// let pool = BufPool::new(BufPoolCfg {
256 /// buffer_size: BUF_SIZE,
257 /// max_memory: BUF_SIZE as usize * 0x0A,
258 /// });
259 ///
260 /// let alloc = pool.allocate(1);
261 ///
262 /// assert_eq!(alloc.length(), 1);
263 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize);
264 /// ```
265 #[inline]
266 pub fn new(cfg: BufPoolCfg) -> Self {
267 // sanity check
268 debug_assert!(
269 cfg.buffer_size.bytes() < cfg.max_memory,
270 "MAX_MEMORY must always be larger than the BUFFER_SIZE"
271 );
272
273 Self {
274 cfg,
275 active_allocations: atomic::AtomicUsize::new(0),
276 allocated_memory: atomic::AtomicUsize::new(0),
277 allocation_cv: sync::Condvar::new(),
278 allocation_lock: sync::Mutex::new(()),
279 shutdown_cv: sync::Condvar::new(),
280 shutdown_lock: sync::Mutex::new(()),
281 }
282 }
283
284 /// Allocate `required` number of buffers each of [`BufPoolCfg::buffer_size`] size
285 ///
286 /// ## Allocation Layout
287 ///
288 /// The allocation is a large contineous slice of memory w/ size as
289 ///
290 /// ```text
291 /// BufPoolCfg::buffer_size.bytes() * n_buffers
292 /// ```
293 ///
294 /// Memory layout structure,
295 ///
296 /// ```text
297 /// allocation = [[buf0][buf1][buf2]]
298 /// where,
299 /// - every buffer is of size `buffer_size`
300 /// - each buffer is pointed using `*mut u8`
301 /// ```
302 ///
303 /// ## RAII Safety
304 ///
305 /// The allocation object, i.e. [`BufPoolAllocation`], is RAII safe. The allocated memory is
306 /// automatically deallocated as soon as the last reference to the object is dropped, while also
307 /// relaxing the memory budget to eliminate backpressure (if any).
308 ///
309 /// ## Important Considerations
310 ///
311 /// The number of buffers required should never exceed `u16::MAX`. This is an abstract soft
312 /// limit and should be enforced by the public interface to avoid any weird exhaustion issues
313 /// or unknown bugs across the storage system.
314 ///
315 /// As `u16::MAX` is large enough value to almost never cause any problems for a single write
316 /// operation, this soft limit acts as a guidline to safely operate with arithmatic operations
317 /// across storage engine(s), including but not limited to [`frozen_core`].
318 ///
319 /// ## Example
320 ///
321 /// ```
322 /// use frozen_core::{
323 /// bufpool::{BufPool, BufPoolCfg, BufferPointer},
324 /// utils::BufferSize,
325 /// };
326 ///
327 /// const BUF_SIZE: BufferSize = BufferSize::S16;
328 ///
329 /// let pool = BufPool::new(BufPoolCfg {
330 /// buffer_size: BUF_SIZE,
331 /// max_memory: BUF_SIZE as usize * 0x14,
332 /// });
333 ///
334 /// let alloc = pool.allocate(0x0A);
335 ///
336 /// assert_eq!(alloc.length(), 0x0A);
337 /// assert!(!alloc.first().is_null());
338 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
339 ///
340 /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
341 /// assert_eq!(ptrs.len(), 0x0A);
342 /// ```
343 #[inline(always)]
344 pub fn allocate(&self, required: usize) -> BufPoolAllocation {
345 // sanity checks
346 debug_assert!(required > 0, "required buffers must never be 0");
347 debug_assert!(
348 required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
349 "Total required bytes must be smaller then the MAX_MEMORY allowed to avoid deadlock"
350 );
351 debug_assert!(
352 required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
353 "Total required bytes must never exceed `u16::MAX` to avoid arithmatic overflows"
354 );
355
356 let required_bytes = self.cfg.buffer_size.bytes() * required;
357 loop {
358 let current_bytes = self.allocated_memory.load(atomic::Ordering::Acquire);
359 if current_bytes + required_bytes > self.cfg.max_memory {
360 self.backpressure(required_bytes);
361 continue;
362 }
363
364 match self.allocated_memory.compare_exchange(
365 current_bytes,
366 current_bytes + required_bytes,
367 atomic::Ordering::AcqRel,
368 atomic::Ordering::Acquire,
369 ) {
370 Ok(_) => break,
371 Err(_) => continue,
372 }
373 }
374
375 let layout = create_layout(required_bytes);
376 let pointer = allocate_layout(layout);
377 self.active_allocations.fetch_add(1, atomic::Ordering::Relaxed);
378
379 BufPoolAllocation { layout, pointer, required_bytes, buffers: required, pool: ptr::NonNull::from(self) }
380 }
381
382 /// Applies backpressure until enough memory budget is available for the allocation
383 ///
384 /// ## Why we ignore [`std::sync::PoisonError`]
385 ///
386 /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
387 /// protect any mutable state. All the pool invariants and accounting are maintained via
388 /// atomics and are completely seperated from the mutex.
389 ///
390 /// A poisoned mutex only indicates that another tx panicked while holding the lock, and
391 /// indicates an inconsistent state of the protected value. Since no state can be left
392 /// partially modified under this lock, there is no possible consistency risk to recover from
393 /// and propagating the poison error would only introduce unnecessary failures into the
394 /// allocation path.
395 ///
396 /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
397 /// with the recovered guard.
398 #[inline]
399 fn backpressure(&self, required_bytes: usize) {
400 let mut guard = self.allocation_lock.lock().unwrap_or_else(|e| e.into_inner());
401 while self.allocated_memory.load(atomic::Ordering::Acquire) + required_bytes > self.cfg.max_memory {
402 guard = self.allocation_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
403 }
404 }
405}
406
407impl Drop for BufPool {
408 fn drop(&mut self) {
409 // NOTE: See [`BufPool::backpressure`] implementation for rationale behind poison recovery
410
411 let mut guard = self.shutdown_lock.lock().unwrap_or_else(|e| e.into_inner());
412 while self.active_allocations.load(atomic::Ordering::Acquire) != 0 {
413 guard = self.shutdown_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
414 }
415 }
416}
417
418/// A RAII safe allocation object containing allocated buffers
419///
420/// ## Lifetime
421///
422/// The object can/may outlive the scope that created it, while also being able to transfer across
423/// threads. As, internally the [`BufPool`] tracks all the active allocations and delays the drop
424/// until every allocation and all there references are dropped from memory.
425///
426/// ## Example
427///
428/// ```
429/// use frozen_core::{
430/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
431/// utils::BufferSize,
432/// };
433///
434/// const BUF_SIZE: BufferSize = BufferSize::S16;
435///
436/// let pool = BufPool::new(BufPoolCfg {
437/// buffer_size: BUF_SIZE,
438/// max_memory: BUF_SIZE as usize * 0x10,
439/// });
440///
441/// let alloc = pool.allocate(0x0A);
442///
443/// assert_eq!(alloc.length(), 0x0A);
444/// assert!(!alloc.first().is_null());
445/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
446///
447/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
448/// assert_eq!(ptrs.len(), 0x0A);
449/// ```
450#[derive(Debug)]
451pub struct BufPoolAllocation {
452 buffers: usize,
453 layout: alloc::Layout,
454 pointer: ptr::NonNull<u8>,
455 pool: ptr::NonNull<BufPool>,
456 required_bytes: usize,
457}
458
459unsafe impl Send for BufPoolAllocation {}
460
461impl BufPoolAllocation {
462 /// Returns a [`BufferPointer`] to the first buffer from the allocated list of buffers
463 ///
464 /// _NOTE:_ The returned [`BufferPointer`] can also be used as a _base_pointer_ to operate on
465 /// the entire allocated memory slice.
466 ///
467 /// ## Example
468 ///
469 /// ```
470 /// use frozen_core::{
471 /// bufpool::{BufPool, BufPoolCfg},
472 /// utils::BufferSize,
473 /// };
474 ///
475 /// const BUF_SIZE: BufferSize = BufferSize::S32;
476 ///
477 /// let pool = BufPool::new(BufPoolCfg {
478 /// buffer_size: BUF_SIZE,
479 /// max_memory: BUF_SIZE as usize * 0x0A,
480 /// });
481 ///
482 /// let alloc = pool.allocate(0x0A);
483 /// assert!(!alloc.first().is_null());
484 /// ```
485 #[inline]
486 pub const fn first(&self) -> BufferPointer {
487 self.pointer.as_ptr()
488 }
489
490 /// Returns the total number of allocated buffers
491 ///
492 /// _IMPORTANT:_ The returned value is always equal to the `required` value using while
493 /// calling [`BufPool::allocate`].
494 ///
495 /// ## Example
496 ///
497 /// ```
498 /// use frozen_core::{
499 /// bufpool::{BufPool, BufPoolCfg},
500 /// utils::BufferSize,
501 /// };
502 ///
503 /// const BUF_SIZE: BufferSize = BufferSize::S64;
504 ///
505 /// let pool = BufPool::new(BufPoolCfg {
506 /// buffer_size: BUF_SIZE,
507 /// max_memory: BUF_SIZE as usize * 0x0A,
508 /// });
509 ///
510 /// let alloc = pool.allocate(0x0A);
511 /// assert_eq!(alloc.length(), 0x0A);
512 /// ```
513 #[inline]
514 pub const fn length(&self) -> usize {
515 self.buffers
516 }
517
518 /// Returns the total number of bytes of memory allocated
519 ///
520 /// ## Example
521 ///
522 /// ```
523 /// use frozen_core::{
524 /// bufpool::{BufPool, BufPoolCfg},
525 /// utils::BufferSize,
526 /// };
527 ///
528 /// const BUF_SIZE: BufferSize = BufferSize::S16;
529 ///
530 /// let pool = BufPool::new(BufPoolCfg {
531 /// buffer_size: BUF_SIZE,
532 /// max_memory: BUF_SIZE as usize * 0x0A,
533 /// });
534 ///
535 /// let alloc = pool.allocate(0x0A);
536 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
537 /// ```
538 #[inline]
539 pub const fn allocated_bytes(&self) -> usize {
540 self.required_bytes
541 }
542
543 /// A custom [`Iterator`] implementation to enable iteration over the list of allocated buffers
544 /// from [`BufPoolAllocation`]
545 ///
546 /// _NOTE:_ Each yielded pointer refers to a unique individual buffer each of size
547 /// [`BufPoolCfg::buffer_size`].
548 ///
549 /// ## Example
550 ///
551 /// ```
552 /// use frozen_core::{
553 /// bufpool::{BufPool, BufPoolCfg, BufferPointer},
554 /// utils::BufferSize,
555 /// };
556 ///
557 /// const BUF_SIZE: BufferSize = BufferSize::S16;
558 ///
559 /// let pool = BufPool::new(BufPoolCfg {
560 /// buffer_size: BUF_SIZE,
561 /// max_memory: BUF_SIZE as usize * 0x14,
562 /// });
563 ///
564 /// let alloc = pool.allocate(0x0A);
565 /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
566 ///
567 /// assert_eq!(ptrs.len(), 0x0A);
568 /// ```
569 #[inline]
570 pub fn iter(&self) -> BufPoolAllocationIter {
571 let pool = unsafe { self.pool.as_ref() };
572 BufPoolAllocationIter {
573 pointer: self.pointer,
574 buffer_size: pool.cfg.buffer_size.bytes(),
575 remaining: self.buffers,
576 }
577 }
578}
579
580impl Drop for BufPoolAllocation {
581 fn drop(&mut self) {
582 let pool = unsafe { self.pool.as_ref() };
583 deallocate_memory(self.pointer, self.layout);
584
585 pool.allocated_memory.fetch_sub(self.required_bytes, atomic::Ordering::Release);
586 pool.allocation_cv.notify_one();
587
588 if pool.active_allocations.fetch_sub(1, atomic::Ordering::Release) == 1 {
589 pool.shutdown_cv.notify_one();
590 }
591 }
592}
593
594/// A custom [`Iterator`] object to iterate over the list of allocated buffers
595///
596/// _NOTE:_ Buffers are yielded in allocation order and are backed by a single contiguous memory
597/// region.
598///
599/// ## Example
600///
601/// ```rs
602/// use frozen_core::{
603/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
604/// utils::BufferSize,
605/// };
606///
607/// const BUF_SIZE: BufferSize = BufferSize::S32;
608///
609/// let pool = BufPool::new(BufPoolCfg {
610/// buffer_size: BUF_SIZE,
611/// max_memory: BUF_SIZE as usize * 0x14,
612/// });
613///
614/// let alloc = pool.allocate(0x0A);
615/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
616///
617/// assert_eq!(ptrs.len(), 0x0A);
618/// ```
619#[derive(Debug)]
620pub struct BufPoolAllocationIter {
621 pointer: ptr::NonNull<u8>,
622 buffer_size: usize,
623 remaining: usize,
624}
625
626impl Iterator for BufPoolAllocationIter {
627 type Item = BufferPointer;
628
629 #[inline(always)]
630 fn next(&mut self) -> Option<Self::Item> {
631 if self.remaining == 0 {
632 return None;
633 }
634
635 let curr_ptr = self.pointer;
636
637 self.pointer = unsafe { self.pointer.add(self.buffer_size) };
638 self.remaining -= 1;
639
640 Some(curr_ptr.as_ptr())
641 }
642}
643
644/// Creates a array layout with given `capacity`
645///
646/// _NOTE:_ Use of `unwrap` is totally safe as the panic, if any, would be caught by unit tests and
647/// would be the indication of incorrect impl and not any runtime failures.
648#[inline]
649fn create_layout(required_bytes: usize) -> alloc::Layout {
650 match alloc::Layout::array::<u8>(required_bytes) {
651 Ok(layout) => layout,
652 Err(e) => panic!("Invalid Layout: {e}"),
653 }
654}
655
656/// Allocate a memory slice with given allocation `layout`
657///
658/// ## Allocation Failure
659///
660/// If the allocator is unable to satisfy the request (typically due to an OOM condition),
661/// [`alloc::alloc`] returns a null pointer.
662///
663/// In such cases we delegate to [`alloc::handle_alloc_error`], matching the behavior of std library
664/// types such as [`Vec`], [`Box`] and [`String`].
665///
666/// This path aborts the process and never returns. Allocation failures are therefore treated as
667/// fatal runtime conditions rather than recoverable errors.
668///
669/// Under normal operation this path should never be reached, as memory usage is expected to be
670/// bounded by the buffer pools memory budget and backpressure mechanisms.
671///
672/// ## Why not return `FrozenErr`
673///
674/// A null return from [`alloc::alloc`] indicates that the global allocator itself was unable to
675/// satisfy the request.
676///
677/// Delegating to [`alloc::handle_alloc_error`] matches the behavior of standard library containers
678/// and avoids continuing execution after a catastrophic allocator failure, where further
679/// allocations required for error handling, logging or recovery may also fail.
680#[inline]
681fn allocate_layout(layout: alloc::Layout) -> ptr::NonNull<u8> {
682 let pointer = unsafe { alloc::alloc(layout) };
683 match ptr::NonNull::new(pointer) {
684 Some(p) => p,
685 None => alloc::handle_alloc_error(layout),
686 }
687}
688
689/// Deallocate the manually allocated slice of memory with help of given `pointer` and mem `layout`
690#[inline]
691fn deallocate_memory(pointer: ptr::NonNull<u8>, layout: alloc::Layout) {
692 unsafe { alloc::dealloc(pointer.as_ptr(), layout) };
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698
699 const BUF_SIZE: crate::utils::BufferSize = crate::utils::BufferSize::S32;
700
701 #[inline]
702 fn create_bufpool(max_mem: usize) -> BufPool {
703 BufPool::new(BufPoolCfg { buffer_size: BUF_SIZE, max_memory: max_mem })
704 }
705
706 #[test]
707 #[should_panic]
708 #[cfg(debug_assertions)]
709 fn err_new_with_invalid_cfg() {
710 create_bufpool(BUF_SIZE.bytes() >> 1);
711 }
712
713 #[test]
714 #[should_panic]
715 #[cfg(debug_assertions)]
716 fn err_alloc_zero() {
717 let bpool = create_bufpool(BUF_SIZE.bytes());
718 let _ = bpool.allocate(0);
719 }
720
721 #[test]
722 #[should_panic]
723 #[cfg(debug_assertions)]
724 fn err_alloc_more_then_max_memory() {
725 let bpool = create_bufpool(BUF_SIZE.bytes());
726 let _ = bpool.allocate(2);
727 }
728
729 #[test]
730 fn ok_alloc_single() {
731 let bpool = create_bufpool(BUF_SIZE.bytes() * 2);
732 let alloc = bpool.allocate(1);
733
734 assert_eq!(alloc.buffers, 1);
735 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes());
736 }
737
738 #[test]
739 fn ok_alloc_multiple() {
740 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
741 let alloc = bpool.allocate(0x10);
742
743 assert_eq!(alloc.buffers, 0x10);
744 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x10);
745 }
746
747 #[test]
748 fn ok_alloc_max_memory() {
749 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
750 let alloc = bpool.allocate(0x0A);
751
752 assert_eq!(alloc.buffers, 0x0A);
753 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x0A);
754 }
755
756 #[test]
757 fn ok_alloc_updates_memory_accounting() {
758 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
759 let alloc = bpool.allocate(0x10);
760
761 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 0x10);
762 drop(alloc);
763 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
764 }
765
766 #[test]
767 fn ok_alloc_updates_active_allocation_tracking() {
768 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x2A);
769
770 let alloc1 = bpool.allocate(0x10);
771 let alloc2 = bpool.allocate(0x10);
772
773 assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 2);
774 let _ = (drop(alloc1), drop(alloc2));
775 assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 0);
776 }
777
778 #[test]
779 fn ok_alloc_decrments_allocated_memory_after_deallocations() {
780 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x80);
781 let allocations: Vec<_> = (0..0x20).map(|_| bpool.allocate(2)).collect();
782
783 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0x20 * 0x40);
784 drop(allocations);
785 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
786 }
787
788 #[test]
789 fn ok_backpressure_blocks_till_memory_is_deallocated() {
790 let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
791 let alloc = bpool.allocate(1);
792
793 let pool2 = bpool.clone();
794 let barrier = sync::Arc::new(sync::Barrier::new(2));
795 let barrier2 = barrier.clone();
796
797 let handle = std::thread::spawn(move || {
798 barrier2.wait();
799
800 let start = std::time::Instant::now();
801 let _alloc = pool2.allocate(2);
802
803 start.elapsed()
804 });
805
806 barrier.wait();
807
808 std::thread::sleep(std::time::Duration::from_millis(100));
809 drop(alloc);
810
811 let elapsed = handle.join().expect("allocation thread should not panic");
812 assert!(elapsed >= std::time::Duration::from_millis(100));
813 }
814
815 #[test]
816 fn ok_concurrent_allocations() {
817 let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1000));
818
819 let mut handles = Vec::new();
820 for _ in 0..0x0A {
821 let pool = pool.clone();
822
823 handles.push(std::thread::spawn(move || {
824 for _ in 0..0x64 {
825 drop(pool.allocate(1));
826 }
827 }));
828 }
829
830 for h in handles {
831 h.join().unwrap();
832 }
833
834 assert_eq!(pool.allocated_memory.load(atomic::Ordering::Acquire), 0);
835 assert_eq!(pool.active_allocations.load(atomic::Ordering::Acquire), 0);
836 }
837
838 mod drop {
839 use super::*;
840
841 #[test]
842 fn ok_partial_drop_updates_accounting() {
843 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
844
845 let alloc1 = bpool.allocate(2);
846 let alloc2 = bpool.allocate(2);
847
848 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 4);
849 drop(alloc1);
850
851 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 2);
852 drop(alloc2);
853
854 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
855 }
856
857 #[test]
858 fn ok_drop_waits_for_active_allocations() {
859 let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1A));
860 let alloc = bpool.allocate(0x10);
861
862 let handle = std::thread::spawn(move || {
863 drop(bpool);
864 });
865
866 std::thread::sleep(std::time::Duration::from_millis(0x64));
867 assert!(!handle.is_finished());
868 drop(alloc);
869
870 handle.join().unwrap();
871 }
872 }
873
874 mod memory_tests {
875 use super::*;
876
877 #[test]
878 fn ok_create_layout() {
879 let layout = create_layout(0x1000);
880
881 assert_eq!(layout.align(), 1);
882 assert_eq!(layout.size(), 0x1000);
883 }
884
885 #[test]
886 #[should_panic(expected = "Invalid Layout")]
887 fn err_create_layout() {
888 create_layout(usize::MAX);
889 }
890
891 #[test]
892 fn ok_allocate_layout() {
893 let layout = create_layout(0x10);
894 let pointer = allocate_layout(layout);
895 let raw_ptr = pointer.as_ptr();
896
897 assert!(!raw_ptr.is_null());
898 deallocate_memory(pointer, layout);
899 }
900
901 #[test]
902 fn ok_allocate_layout_allows_write() {
903 let layout = create_layout(0x80);
904 let pointer = allocate_layout(layout);
905
906 unsafe {
907 pointer.as_ptr().write(0x40);
908 assert_eq!(pointer.as_ptr().read(), 0x40);
909 }
910
911 deallocate_memory(pointer, layout);
912 }
913
914 #[test]
915 fn ok_allocate_layout_allows_write_to_entire_slice() {
916 let layout = create_layout(0x200);
917 let pointer = allocate_layout(layout);
918
919 unsafe {
920 for i in 0..0x200 {
921 pointer.as_ptr().add(i).write((i % 0xFF) as u8);
922 }
923
924 for i in 0..0x200 {
925 assert_eq!(pointer.as_ptr().add(i).read(), (i % 0xFF) as u8);
926 }
927 }
928
929 deallocate_memory(pointer, layout);
930 }
931 }
932
933 mod alloc_struct {
934 use super::*;
935
936 #[test]
937 fn ok_first_returns_ptr_to_first_buf_from_alloc() {
938 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
939 let alloc = bpool.allocate(0x10);
940
941 assert_eq!(alloc.first(), alloc.pointer.as_ptr());
942 }
943
944 #[test]
945 fn ok_length_returns_length_of_alloc() {
946 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
947 let alloc = bpool.allocate(0x10);
948
949 assert_eq!(alloc.length(), alloc.buffers);
950 }
951
952 #[test]
953 fn ok_allocated_bytes_return_total_allocated_bytes() {
954 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
955 let alloc = bpool.allocate(0x10);
956
957 assert_eq!(alloc.allocated_bytes(), alloc.buffers * BUF_SIZE.bytes());
958 }
959
960 #[test]
961 fn ok_alloc_can_be_shared_across_threads() {
962 let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
963 let alloc = pool.allocate(1);
964
965 std::thread::spawn(move || {
966 drop(alloc);
967 })
968 .join()
969 .unwrap();
970 }
971 }
972
973 mod iterator {
974 use super::*;
975
976 #[test]
977 fn ok_iter_yeilds_all_buffers() {
978 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
979 let alloc = bpool.allocate(4);
980
981 let ptrs: Vec<_> = alloc.iter().collect();
982 assert_eq!(ptrs.len(), 4);
983
984 assert_eq!(ptrs[1] as usize - ptrs[0] as usize, 0x20);
985 assert_eq!(ptrs[2] as usize - ptrs[1] as usize, 0x20);
986 assert_eq!(ptrs[3] as usize - ptrs[2] as usize, 0x20);
987 }
988
989 #[test]
990 fn ok_iter_yeilds_none_when_exhausted() {
991 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
992 let alloc = bpool.allocate(2);
993 let mut iter = alloc.iter();
994
995 assert!(iter.next().is_some());
996 assert!(iter.next().is_some());
997 assert!(iter.next().is_none());
998 assert!(iter.next().is_none());
999 }
1000 }
1001}