frozen_core/bufpool.rs
1//! A low-latency memory-budgeted buffer pool to manage fixed-sized buffer allocations
2//!
3//! ## Memory Allocations
4//!
5//! All the allocations are allocated using the global memory allocator as requested (on-the-fly).
6//!
7//! While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
8//! memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
9//!
10//! Memory layout structure,
11//!
12//! ```text
13//! allocation = [[buf0][buf1][buf2]]
14//! where,
15//! - every buffer is of size `buffer_size`
16//! - each buffer is pointed using `*mut u8`
17//! ```
18//!
19//! ## Backpressure
20//!
21//! Every allocation reserves a memory budget and is only allowed to allocate memory if enough
22//! budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
23//! space is available.
24//!
25//! When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
26//! deallocated while relaxing the budget and dropping the backpressure (if any).
27//!
28//! _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
29//! backpressure, as the waiting callers are awaken opportunistically.
30//!
31//! ## Benchmarks
32//!
33//! Observed measurements of latency and throughput,
34//!
35//! | Metric | Value |
36//! |:-----------------------------|:-------------------|
37//! | Allocation Latency (avg) | ~254 nanosecond |
38//! | Allocation Throughput (avg) | ~3.94 million/sec |
39//!
40//! _NOTE:_ All measurements includes the complete RAII lifecycle (i.e. allocation + deallocation).
41//!
42//! Observed allocation latency for `N` buffers,
43//!
44//! | Buffers | Latency |
45//! |:---------|:---------|
46//! | 0x01 | 246 ns |
47//! | 0x10 | 251 ns |
48//! | 0x400 | 300 ns |
49//!
50//! _INFO:_ As seen, the allocation latency stays near constant irrespective to the size of buffers
51//! and the allocated bytes.
52//!
53//! Environment used for benching,
54//!
55//! * OS: NixOS (WSL2)
56//! * Architecture: x86_64
57//! * Memory: 8 GiB RAM (DDR4)
58//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
59//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
60//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
61//!
62//! ## Example
63//!
64//! ```
65//! use frozen_core::{
66//! bufpool::{BufPool, BufPoolCfg, BufferPointer},
67//! utils::BufferSize,
68//! };
69//!
70//! const BUF_SIZE: BufferSize = BufferSize::S16;
71//!
72//! let pool = BufPool::new(BufPoolCfg {
73//! buffer_size: BUF_SIZE,
74//! max_memory: BUF_SIZE as usize * 0x40,
75//! });
76//!
77//! let alloc = pool.allocate(0x2A);
78//!
79//! assert_eq!(alloc.length(), 0x2A);
80//! assert!(!alloc.first().is_null());
81//! assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x2A);
82//!
83//! let ptrs: Vec<BufferPointer> = alloc.iter().collect();
84//! assert_eq!(ptrs.len(), 0x2A);
85//! ```
86
87use std::{alloc, ptr, sync, sync::atomic};
88
89/// An unsafe pointer to an individual in memory buffer
90///
91/// ## Safety
92///
93/// The pointer is untyped and uninitialized. Caller is responsible for:
94///
95/// * Writes stay within the bounds, while the size of each buffer is [`BufPoolCfg::buffer_size`]
96/// * Reads should only occur after initilization/write is completed on the buffer
97/// * The pointer must not outlive the lifetime of [`BufPoolAllocation`] object
98///
99/// ## Example
100///
101/// ```
102/// use frozen_core::{
103/// bufpool::{BufPool, BufPoolCfg},
104/// utils::BufferSize,
105/// };
106///
107/// const BUF_SIZE: BufferSize = BufferSize::S16;
108/// const BUFFER: [u8; BUF_SIZE as usize] = [1u8; BUF_SIZE as usize];
109///
110/// let pool = BufPool::new(BufPoolCfg {
111/// buffer_size: BUF_SIZE,
112/// max_memory: BUF_SIZE as usize * 0x0A,
113/// });
114///
115/// let alloc = pool.allocate(1);
116/// unsafe {
117/// std::ptr::copy_nonoverlapping(BUFFER.as_ptr(), alloc.first(), BUF_SIZE as usize);
118/// }
119/// ```
120pub type BufferPointer = *mut u8;
121
122/// All the available configrations for [`BufPool`]
123///
124/// ## Example
125///
126/// ```
127/// use frozen_core::{bufpool::BufPoolCfg, utils::BufferSize};
128///
129/// const BUF_SIZE: BufferSize = BufferSize::S64;
130///
131/// let cfg = BufPoolCfg {
132/// buffer_size: BUF_SIZE,
133/// max_memory: BUF_SIZE as usize * 0x1000,
134/// };
135///
136/// assert_ne!(cfg.max_memory, 0);
137/// assert!(cfg.max_memory > cfg.buffer_size.bytes());
138/// ```
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140pub struct BufPoolCfg {
141 /// Size (in bytes) of an indivdual buffer unit allocated
142 pub buffer_size: crate::utils::BufferSize,
143
144 /// Maximum allowed memory (in bytes) to be simultaneosuly allocated by [`BufPool`]
145 ///
146 /// _IMPORTANT:_ When trying to allocate more memory then [`BufPoolCfg::max_memory`] via
147 /// [`BufPool::allocate`], a deadlock will happen due to memory budgeting in place. The caller
148 /// must make sure the `max_meory` is high enough to avoid this scenerio.
149 ///
150 /// _NOTE:_ To avoid backpressure, set the `max_memory` to an arbitrary large value. This
151 /// would not have any direct impact on performance or resource usage, and will avoid
152 /// backpressure under heavy workload.
153 pub max_memory: usize,
154}
155
156/// An implementation of a low-latency memory-budgeted buffer pool managing fixed-sized buffer
157/// allocations
158///
159/// ## Blocking `drop`
160///
161/// Dropping the [`BufPool`] instance from memory blocks unitl all the allocated instances of
162/// [`BufPoolAllocations`] and all there references are dropped from memory.
163///
164/// This is in place to avoid memory leaks, as well as to enable sending [`BufPoolAllocation`]
165/// objects across threads while being tied to the lifecyle of [`BufPool`].
166///
167/// ## Memory Allocations
168///
169/// All the allocations are allocated using the global memory allocator as requested (on-the-fly).
170///
171/// While a single allocation retunred as [`BufPoolAllocation`] is a large contineous slice of
172/// memory w/ size as `BufPoolCfg::buffer_size.bytes() * n_buffers`.
173///
174/// Memory layout structure,
175///
176/// ```text
177/// allocation = [[buf0][buf1][buf2]]
178/// where,
179/// - every buffer is of size `buffer_size`
180/// - each buffer is pointed using `*mut u8`
181/// ```
182///
183/// ## Backpressure
184///
185/// Every allocation reserves a memory budget and is only allowed to allocate memory if enough
186/// budget (i.e. memory space) is available. Otherwise, the caller is blocked/polled till enough
187/// space is available.
188///
189/// When the [`BufPoolAllocation`] and all its references are dropped, the underlying memory is
190/// deallocated while relaxing the budget and dropping the backpressure (if any).
191///
192/// _NOTE:_ There is no faireness guarantee for the caller's who are polled when faced with
193/// backpressure, as the waiting callers are awaken opportunistically.
194///
195/// ## Example
196///
197/// ```
198/// use frozen_core::{
199/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
200/// utils::BufferSize,
201/// };
202///
203/// const BUF_SIZE: BufferSize = BufferSize::S16;
204///
205/// let pool = BufPool::new(BufPoolCfg {
206/// buffer_size: BUF_SIZE,
207/// max_memory: BUF_SIZE as usize * 0x40,
208/// });
209///
210/// let alloc = pool.allocate(0x30);
211///
212/// assert_eq!(alloc.length(), 0x30);
213/// assert!(!alloc.first().is_null());
214/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x30);
215///
216/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
217/// assert_eq!(ptrs.len(), 0x30);
218/// ```
219#[derive(Debug)]
220pub struct BufPool {
221 active_allocations: atomic::AtomicUsize,
222 allocation_cv: sync::Condvar,
223 allocation_lock: sync::Mutex<()>,
224 allocated_memory: atomic::AtomicUsize,
225 cfg: BufPoolCfg,
226 shutdown_cv: sync::Condvar,
227 shutdown_lock: sync::Mutex<()>,
228}
229
230unsafe impl Send for BufPool {}
231unsafe impl Sync for BufPool {}
232
233impl BufPool {
234 /// Create a new instance of [`BufPool`]
235 ///
236 /// ## Debug Assertions
237 ///
238 /// In debug builds, this function uses `debug_assertion` to prevent invalid configurations.
239 /// Caller must refer to [`BufPoolCfg`] for details about the config params.
240 ///
241 /// ## Example
242 ///
243 /// ```
244 /// use frozen_core::{
245 /// bufpool::{BufPool, BufPoolCfg},
246 /// utils::BufferSize,
247 /// };
248 ///
249 /// const BUF_SIZE: BufferSize = BufferSize::S16;
250 ///
251 /// let pool = BufPool::new(BufPoolCfg {
252 /// buffer_size: BUF_SIZE,
253 /// max_memory: BUF_SIZE as usize * 0x0A,
254 /// });
255 ///
256 /// let alloc = pool.allocate(1);
257 ///
258 /// assert_eq!(alloc.length(), 1);
259 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize);
260 /// ```
261 #[inline]
262 pub fn new(cfg: BufPoolCfg) -> Self {
263 // sanity check
264 debug_assert!(
265 cfg.buffer_size.bytes() < cfg.max_memory,
266 "MAX_MEMORY must always be larger than the BUFFER_SIZE"
267 );
268
269 Self {
270 cfg,
271 active_allocations: atomic::AtomicUsize::new(0),
272 allocated_memory: atomic::AtomicUsize::new(0),
273 allocation_cv: sync::Condvar::new(),
274 allocation_lock: sync::Mutex::new(()),
275 shutdown_cv: sync::Condvar::new(),
276 shutdown_lock: sync::Mutex::new(()),
277 }
278 }
279
280 /// Allocate `required` number of buffers each of [`BufPoolCfg::buffer_size`] size
281 ///
282 /// ## Allocation Layout
283 ///
284 /// The allocation is a large contineous slice of memory w/ size as
285 ///
286 /// ```text
287 /// BufPoolCfg::buffer_size.bytes() * n_buffers
288 /// ```
289 ///
290 /// Memory layout structure,
291 ///
292 /// ```text
293 /// allocation = [[buf0][buf1][buf2]]
294 /// where,
295 /// - every buffer is of size `buffer_size`
296 /// - each buffer is pointed using `*mut u8`
297 /// ```
298 ///
299 /// ## RAII Safety
300 ///
301 /// The allocation object, i.e. [`BufPoolAllocation`], is RAII safe. The allocated memory is
302 /// automatically deallocated as soon as the last reference to the object is dropped, while also
303 /// relaxing the memory budget to eliminate backpressure (if any).
304 ///
305 /// ## Important Considerations
306 ///
307 /// The number of buffers required should never exceed `u16::MAX`. This is an abstract soft
308 /// limit and should be enforced by the public interface to avoid any weird exhaustion issues
309 /// or unknown bugs across the storage system.
310 ///
311 /// As `u16::MAX` is large enough value to almost never cause any problems for a single write
312 /// operation, this soft limit acts as a guidline to safely operate with arithmatic operations
313 /// across storage engine(s), including but not limited to [`frozen_core`].
314 ///
315 /// ## Example
316 ///
317 /// ```
318 /// use frozen_core::{
319 /// bufpool::{BufPool, BufPoolCfg, BufferPointer},
320 /// utils::BufferSize,
321 /// };
322 ///
323 /// const BUF_SIZE: BufferSize = BufferSize::S16;
324 ///
325 /// let pool = BufPool::new(BufPoolCfg {
326 /// buffer_size: BUF_SIZE,
327 /// max_memory: BUF_SIZE as usize * 0x14,
328 /// });
329 ///
330 /// let alloc = pool.allocate(0x0A);
331 ///
332 /// assert_eq!(alloc.length(), 0x0A);
333 /// assert!(!alloc.first().is_null());
334 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
335 ///
336 /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
337 /// assert_eq!(ptrs.len(), 0x0A);
338 /// ```
339 #[inline(always)]
340 pub fn allocate(&self, required: usize) -> BufPoolAllocation {
341 // sanity checks
342 debug_assert!(required > 0, "required buffers must never be 0");
343 debug_assert!(
344 required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
345 "Total required bytes must be smaller then the MAX_MEMORY allowed to avoid deadlock"
346 );
347 debug_assert!(
348 required * self.cfg.buffer_size.bytes() <= self.cfg.max_memory,
349 "Total required bytes must never exceed `u16::MAX` to avoid arithmatic overflows"
350 );
351
352 let required_bytes = self.cfg.buffer_size.bytes() * required;
353 loop {
354 let current_bytes = self.allocated_memory.load(atomic::Ordering::Acquire);
355 if current_bytes + required_bytes > self.cfg.max_memory {
356 self.backpressure(required_bytes);
357 continue;
358 }
359
360 match self.allocated_memory.compare_exchange(
361 current_bytes,
362 current_bytes + required_bytes,
363 atomic::Ordering::AcqRel,
364 atomic::Ordering::Acquire,
365 ) {
366 Ok(_) => break,
367 Err(_) => continue,
368 }
369 }
370
371 let layout = create_layout(required_bytes);
372 let pointer = allocate_layout(layout);
373 self.active_allocations.fetch_add(1, atomic::Ordering::Relaxed);
374
375 BufPoolAllocation { layout, pointer, required_bytes, buffers: required, pool: ptr::NonNull::from(self) }
376 }
377
378 /// Applies backpressure until enough memory budget is available for the allocation
379 ///
380 /// ## Why we ignore [`std::sync::PoisonError`]
381 ///
382 /// The mutex used for lock, is solely used as a parking primitive for [`Condvar`] and does not
383 /// protect any mutable state. All the pool invariants and accounting are maintained via
384 /// atomics and are completely seperated from the mutex.
385 ///
386 /// A poisoned mutex only indicates that another tx panicked while holding the lock, and
387 /// indicates an inconsistent state of the protected value. Since no state can be left
388 /// partially modified under this lock, there is no possible consistency risk to recover from
389 /// and propagating the poison error would only introduce unnecessary failures into the
390 /// allocation path.
391 ///
392 /// Therefore, as best effort, we consume the [`std::sync::PoisonError`] and continue operating
393 /// with the recovered guard.
394 #[inline]
395 fn backpressure(&self, required_bytes: usize) {
396 let mut guard = self.allocation_lock.lock().unwrap_or_else(|e| e.into_inner());
397 while self.allocated_memory.load(atomic::Ordering::Acquire) + required_bytes > self.cfg.max_memory {
398 guard = self.allocation_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
399 }
400 }
401}
402
403impl Drop for BufPool {
404 fn drop(&mut self) {
405 // NOTE: See [`BufPool::backpressure`] implementation for rationale behind poison recovery
406
407 let mut guard = self.shutdown_lock.lock().unwrap_or_else(|e| e.into_inner());
408 while self.active_allocations.load(atomic::Ordering::Acquire) != 0 {
409 guard = self.shutdown_cv.wait(guard).unwrap_or_else(|e| e.into_inner());
410 }
411 }
412}
413
414/// A RAII safe allocation object containing allocated buffers
415///
416/// ## Lifetime
417///
418/// The object can/may outlive the scope that created it, while also being able to transfer across
419/// threads. As, internally the [`BufPool`] tracks all the active allocations and delays the drop
420/// until every allocation and all there references are dropped from memory.
421///
422/// ## Example
423///
424/// ```
425/// use frozen_core::{
426/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
427/// utils::BufferSize,
428/// };
429///
430/// const BUF_SIZE: BufferSize = BufferSize::S16;
431///
432/// let pool = BufPool::new(BufPoolCfg {
433/// buffer_size: BUF_SIZE,
434/// max_memory: BUF_SIZE as usize * 0x10,
435/// });
436///
437/// let alloc = pool.allocate(0x0A);
438///
439/// assert_eq!(alloc.length(), 0x0A);
440/// assert!(!alloc.first().is_null());
441/// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
442///
443/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
444/// assert_eq!(ptrs.len(), 0x0A);
445/// ```
446#[derive(Debug)]
447pub struct BufPoolAllocation {
448 buffers: usize,
449 layout: alloc::Layout,
450 pointer: ptr::NonNull<u8>,
451 pool: ptr::NonNull<BufPool>,
452 required_bytes: usize,
453}
454
455unsafe impl Send for BufPoolAllocation {}
456
457impl BufPoolAllocation {
458 /// Returns a [`BufferPointer`] to the first buffer from the allocated list of buffers
459 ///
460 /// _NOTE:_ The returned [`BufferPointer`] can also be used as a _base_pointer_ to operate on
461 /// the entire allocated memory slice.
462 ///
463 /// ## Example
464 ///
465 /// ```
466 /// use frozen_core::{
467 /// bufpool::{BufPool, BufPoolCfg},
468 /// utils::BufferSize,
469 /// };
470 ///
471 /// const BUF_SIZE: BufferSize = BufferSize::S32;
472 ///
473 /// let pool = BufPool::new(BufPoolCfg {
474 /// buffer_size: BUF_SIZE,
475 /// max_memory: BUF_SIZE as usize * 0x0A,
476 /// });
477 ///
478 /// let alloc = pool.allocate(0x0A);
479 /// assert!(!alloc.first().is_null());
480 /// ```
481 #[inline]
482 pub const fn first(&self) -> BufferPointer {
483 self.pointer.as_ptr()
484 }
485
486 /// Returns the total number of allocated buffers
487 ///
488 /// _IMPORTANT:_ The returned value is always equal to the `required` value using while
489 /// calling [`BufPool::allocate`].
490 ///
491 /// ## Example
492 ///
493 /// ```
494 /// use frozen_core::{
495 /// bufpool::{BufPool, BufPoolCfg},
496 /// utils::BufferSize,
497 /// };
498 ///
499 /// const BUF_SIZE: BufferSize = BufferSize::S64;
500 ///
501 /// let pool = BufPool::new(BufPoolCfg {
502 /// buffer_size: BUF_SIZE,
503 /// max_memory: BUF_SIZE as usize * 0x0A,
504 /// });
505 ///
506 /// let alloc = pool.allocate(0x0A);
507 /// assert_eq!(alloc.length(), 0x0A);
508 /// ```
509 #[inline]
510 pub const fn length(&self) -> usize {
511 self.buffers
512 }
513
514 /// Returns the total number of bytes of memory allocated
515 ///
516 /// ## Example
517 ///
518 /// ```
519 /// use frozen_core::{
520 /// bufpool::{BufPool, BufPoolCfg},
521 /// utils::BufferSize,
522 /// };
523 ///
524 /// const BUF_SIZE: BufferSize = BufferSize::S16;
525 ///
526 /// let pool = BufPool::new(BufPoolCfg {
527 /// buffer_size: BUF_SIZE,
528 /// max_memory: BUF_SIZE as usize * 0x0A,
529 /// });
530 ///
531 /// let alloc = pool.allocate(0x0A);
532 /// assert_eq!(alloc.allocated_bytes(), BUF_SIZE as usize * 0x0A);
533 /// ```
534 #[inline]
535 pub const fn allocated_bytes(&self) -> usize {
536 self.required_bytes
537 }
538
539 /// A custom [`Iterator`] implementation to enable iteration over the list of allocated buffers
540 /// from [`BufPoolAllocation`]
541 ///
542 /// _NOTE:_ Each yielded pointer refers to a unique individual buffer each of size
543 /// [`BufPoolCfg::buffer_size`].
544 ///
545 /// ## Example
546 ///
547 /// ```
548 /// use frozen_core::{
549 /// bufpool::{BufPool, BufPoolCfg, BufferPointer},
550 /// utils::BufferSize,
551 /// };
552 ///
553 /// const BUF_SIZE: BufferSize = BufferSize::S16;
554 ///
555 /// let pool = BufPool::new(BufPoolCfg {
556 /// buffer_size: BUF_SIZE,
557 /// max_memory: BUF_SIZE as usize * 0x14,
558 /// });
559 ///
560 /// let alloc = pool.allocate(0x0A);
561 /// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
562 ///
563 /// assert_eq!(ptrs.len(), 0x0A);
564 /// ```
565 #[inline]
566 pub fn iter(&self) -> BufPoolAllocationIter {
567 let pool = unsafe { self.pool.as_ref() };
568 BufPoolAllocationIter {
569 pointer: self.pointer,
570 buffer_size: pool.cfg.buffer_size.bytes(),
571 remaining: self.buffers,
572 }
573 }
574}
575
576impl Drop for BufPoolAllocation {
577 fn drop(&mut self) {
578 let pool = unsafe { self.pool.as_ref() };
579 deallocate_memory(self.pointer, self.layout);
580
581 pool.allocated_memory.fetch_sub(self.required_bytes, atomic::Ordering::Release);
582 pool.allocation_cv.notify_one();
583
584 if pool.active_allocations.fetch_sub(1, atomic::Ordering::Release) == 1 {
585 pool.shutdown_cv.notify_one();
586 }
587 }
588}
589
590/// A custom [`Iterator`] object to iterate over the list of allocated buffers
591///
592/// _NOTE:_ Buffers are yielded in allocation order and are backed by a single contiguous memory
593/// region.
594///
595/// ## Example
596///
597/// ```rs
598/// use frozen_core::{
599/// bufpool::{BufPool, BufPoolCfg, BufferPointer},
600/// utils::BufferSize,
601/// };
602///
603/// const BUF_SIZE: BufferSize = BufferSize::S32;
604///
605/// let pool = BufPool::new(BufPoolCfg {
606/// buffer_size: BUF_SIZE,
607/// max_memory: BUF_SIZE as usize * 0x14,
608/// });
609///
610/// let alloc = pool.allocate(0x0A);
611/// let ptrs: Vec<BufferPointer> = alloc.iter().collect();
612///
613/// assert_eq!(ptrs.len(), 0x0A);
614/// ```
615#[derive(Debug)]
616pub struct BufPoolAllocationIter {
617 pointer: ptr::NonNull<u8>,
618 buffer_size: usize,
619 remaining: usize,
620}
621
622impl Iterator for BufPoolAllocationIter {
623 type Item = BufferPointer;
624
625 #[inline(always)]
626 fn next(&mut self) -> Option<Self::Item> {
627 if self.remaining == 0 {
628 return None;
629 }
630
631 let curr_ptr = self.pointer;
632
633 self.pointer = unsafe { self.pointer.add(self.buffer_size) };
634 self.remaining -= 1;
635
636 Some(curr_ptr.as_ptr())
637 }
638}
639
640/// Creates a array layout with given `capacity`
641///
642/// _NOTE:_ Use of `unwrap` is totally safe as the panic, if any, would be caught by unit tests and
643/// would be the indication of incorrect impl and not any runtime failures.
644#[inline]
645fn create_layout(required_bytes: usize) -> alloc::Layout {
646 match alloc::Layout::array::<u8>(required_bytes) {
647 Ok(layout) => layout,
648 Err(e) => panic!("Invalid Layout: {e}"),
649 }
650}
651
652/// Allocate a memory slice with given allocation `layout`
653///
654/// ## Allocation Failure
655///
656/// If the allocator is unable to satisfy the request (typically due to an OOM condition),
657/// [`alloc::alloc`] returns a null pointer.
658///
659/// In such cases we delegate to [`alloc::handle_alloc_error`], matching the behavior of std library
660/// types such as [`Vec`], [`Box`] and [`String`].
661///
662/// This path aborts the process and never returns. Allocation failures are therefore treated as
663/// fatal runtime conditions rather than recoverable errors.
664///
665/// Under normal operation this path should never be reached, as memory usage is expected to be
666/// bounded by the buffer pools memory budget and backpressure mechanisms.
667///
668/// ## Why not return `FrozenErr`
669///
670/// A null return from [`alloc::alloc`] indicates that the global allocator itself was unable to
671/// satisfy the request.
672///
673/// Delegating to [`alloc::handle_alloc_error`] matches the behavior of standard library containers
674/// and avoids continuing execution after a catastrophic allocator failure, where further
675/// allocations required for error handling, logging or recovery may also fail.
676#[inline]
677fn allocate_layout(layout: alloc::Layout) -> ptr::NonNull<u8> {
678 let pointer = unsafe { alloc::alloc(layout) };
679 match ptr::NonNull::new(pointer) {
680 Some(p) => p,
681 None => alloc::handle_alloc_error(layout),
682 }
683}
684
685/// Deallocate the manually allocated slice of memory with help of given `pointer` and mem `layout`
686#[inline]
687fn deallocate_memory(pointer: ptr::NonNull<u8>, layout: alloc::Layout) {
688 unsafe { alloc::dealloc(pointer.as_ptr(), layout) };
689}
690
691#[cfg(test)]
692mod tests {
693 use super::*;
694
695 const BUF_SIZE: crate::utils::BufferSize = crate::utils::BufferSize::S32;
696
697 #[inline]
698 fn create_bufpool(max_mem: usize) -> BufPool {
699 BufPool::new(BufPoolCfg { buffer_size: BUF_SIZE, max_memory: max_mem })
700 }
701
702 #[test]
703 #[should_panic]
704 #[cfg(debug_assertions)]
705 fn err_new_with_invalid_cfg() {
706 create_bufpool(BUF_SIZE.bytes() >> 1);
707 }
708
709 #[test]
710 #[should_panic]
711 #[cfg(debug_assertions)]
712 fn err_alloc_zero() {
713 let bpool = create_bufpool(BUF_SIZE.bytes());
714 let _ = bpool.allocate(0);
715 }
716
717 #[test]
718 #[should_panic]
719 #[cfg(debug_assertions)]
720 fn err_alloc_more_then_max_memory() {
721 let bpool = create_bufpool(BUF_SIZE.bytes());
722 let _ = bpool.allocate(2);
723 }
724
725 #[test]
726 fn ok_alloc_single() {
727 let bpool = create_bufpool(BUF_SIZE.bytes() * 2);
728 let alloc = bpool.allocate(1);
729
730 assert_eq!(alloc.buffers, 1);
731 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes());
732 }
733
734 #[test]
735 fn ok_alloc_multiple() {
736 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
737 let alloc = bpool.allocate(0x10);
738
739 assert_eq!(alloc.buffers, 0x10);
740 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x10);
741 }
742
743 #[test]
744 fn ok_alloc_max_memory() {
745 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
746 let alloc = bpool.allocate(0x0A);
747
748 assert_eq!(alloc.buffers, 0x0A);
749 assert_eq!(alloc.required_bytes, BUF_SIZE.bytes() * 0x0A);
750 }
751
752 #[test]
753 fn ok_alloc_updates_memory_accounting() {
754 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x14);
755 let alloc = bpool.allocate(0x10);
756
757 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 0x10);
758 drop(alloc);
759 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
760 }
761
762 #[test]
763 fn ok_alloc_updates_active_allocation_tracking() {
764 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x2A);
765
766 let alloc1 = bpool.allocate(0x10);
767 let alloc2 = bpool.allocate(0x10);
768
769 assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 2);
770 let _ = (drop(alloc1), drop(alloc2));
771 assert_eq!(bpool.active_allocations.load(atomic::Ordering::Acquire), 0);
772 }
773
774 #[test]
775 fn ok_alloc_decrments_allocated_memory_after_deallocations() {
776 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x80);
777 let allocations: Vec<_> = (0..0x20).map(|_| bpool.allocate(2)).collect();
778
779 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0x20 * 0x40);
780 drop(allocations);
781 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
782 }
783
784 #[test]
785 fn ok_backpressure_blocks_till_memory_is_deallocated() {
786 let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
787 let alloc = bpool.allocate(1);
788
789 let pool2 = bpool.clone();
790 let barrier = sync::Arc::new(sync::Barrier::new(2));
791 let barrier2 = barrier.clone();
792
793 let handle = std::thread::spawn(move || {
794 barrier2.wait();
795
796 let start = std::time::Instant::now();
797 let _alloc = pool2.allocate(2);
798
799 start.elapsed()
800 });
801
802 barrier.wait();
803
804 std::thread::sleep(std::time::Duration::from_millis(100));
805 drop(alloc);
806
807 let elapsed = handle.join().expect("allocation thread should not panic");
808 assert!(elapsed >= std::time::Duration::from_millis(100));
809 }
810
811 #[test]
812 fn ok_concurrent_allocations() {
813 let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1000));
814
815 let mut handles = Vec::new();
816 for _ in 0..0x0A {
817 let pool = pool.clone();
818
819 handles.push(std::thread::spawn(move || {
820 for _ in 0..0x64 {
821 drop(pool.allocate(1));
822 }
823 }));
824 }
825
826 for h in handles {
827 h.join().unwrap();
828 }
829
830 assert_eq!(pool.allocated_memory.load(atomic::Ordering::Acquire), 0);
831 assert_eq!(pool.active_allocations.load(atomic::Ordering::Acquire), 0);
832 }
833
834 mod drop {
835 use super::*;
836
837 #[test]
838 fn ok_partial_drop_updates_accounting() {
839 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
840
841 let alloc1 = bpool.allocate(2);
842 let alloc2 = bpool.allocate(2);
843
844 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 4);
845 drop(alloc1);
846
847 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), BUF_SIZE.bytes() * 2);
848 drop(alloc2);
849
850 assert_eq!(bpool.allocated_memory.load(atomic::Ordering::Acquire), 0);
851 }
852
853 #[test]
854 fn ok_drop_waits_for_active_allocations() {
855 let bpool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 0x1A));
856 let alloc = bpool.allocate(0x10);
857
858 let handle = std::thread::spawn(move || {
859 drop(bpool);
860 });
861
862 std::thread::sleep(std::time::Duration::from_millis(0x64));
863 assert!(!handle.is_finished());
864 drop(alloc);
865
866 handle.join().unwrap();
867 }
868 }
869
870 mod memory_tests {
871 use super::*;
872
873 #[test]
874 fn ok_create_layout() {
875 let layout = create_layout(0x1000);
876
877 assert_eq!(layout.align(), 1);
878 assert_eq!(layout.size(), 0x1000);
879 }
880
881 #[test]
882 #[should_panic(expected = "Invalid Layout")]
883 fn err_create_layout() {
884 create_layout(usize::MAX);
885 }
886
887 #[test]
888 fn ok_allocate_layout() {
889 let layout = create_layout(0x10);
890 let pointer = allocate_layout(layout);
891 let raw_ptr = pointer.as_ptr();
892
893 assert!(!raw_ptr.is_null());
894 deallocate_memory(pointer, layout);
895 }
896
897 #[test]
898 fn ok_allocate_layout_allows_write() {
899 let layout = create_layout(0x80);
900 let pointer = allocate_layout(layout);
901
902 unsafe {
903 pointer.as_ptr().write(0x40);
904 assert_eq!(pointer.as_ptr().read(), 0x40);
905 }
906
907 deallocate_memory(pointer, layout);
908 }
909
910 #[test]
911 fn ok_allocate_layout_allows_write_to_entire_slice() {
912 let layout = create_layout(0x200);
913 let pointer = allocate_layout(layout);
914
915 unsafe {
916 for i in 0..0x200 {
917 pointer.as_ptr().add(i).write((i % 0xFF) as u8);
918 }
919
920 for i in 0..0x200 {
921 assert_eq!(pointer.as_ptr().add(i).read(), (i % 0xFF) as u8);
922 }
923 }
924
925 deallocate_memory(pointer, layout);
926 }
927 }
928
929 mod alloc_struct {
930 use super::*;
931
932 #[test]
933 fn ok_first_returns_ptr_to_first_buf_from_alloc() {
934 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
935 let alloc = bpool.allocate(0x10);
936
937 assert_eq!(alloc.first(), alloc.pointer.as_ptr());
938 }
939
940 #[test]
941 fn ok_length_returns_length_of_alloc() {
942 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
943 let alloc = bpool.allocate(0x10);
944
945 assert_eq!(alloc.length(), alloc.buffers);
946 }
947
948 #[test]
949 fn ok_allocated_bytes_return_total_allocated_bytes() {
950 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x20);
951 let alloc = bpool.allocate(0x10);
952
953 assert_eq!(alloc.allocated_bytes(), alloc.buffers * BUF_SIZE.bytes());
954 }
955
956 #[test]
957 fn ok_alloc_can_be_shared_across_threads() {
958 let pool = sync::Arc::new(create_bufpool(BUF_SIZE.bytes() * 2));
959 let alloc = pool.allocate(1);
960
961 std::thread::spawn(move || {
962 drop(alloc);
963 })
964 .join()
965 .unwrap();
966 }
967 }
968
969 mod iterator {
970 use super::*;
971
972 #[test]
973 fn ok_iter_yeilds_all_buffers() {
974 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
975 let alloc = bpool.allocate(4);
976
977 let ptrs: Vec<_> = alloc.iter().collect();
978 assert_eq!(ptrs.len(), 4);
979
980 assert_eq!(ptrs[1] as usize - ptrs[0] as usize, 0x20);
981 assert_eq!(ptrs[2] as usize - ptrs[1] as usize, 0x20);
982 assert_eq!(ptrs[3] as usize - ptrs[2] as usize, 0x20);
983 }
984
985 #[test]
986 fn ok_iter_yeilds_none_when_exhausted() {
987 let bpool = create_bufpool(BUF_SIZE.bytes() * 0x0A);
988 let alloc = bpool.allocate(2);
989 let mut iter = alloc.iter();
990
991 assert!(iter.next().is_some());
992 assert!(iter.next().is_some());
993 assert!(iter.next().is_none());
994 assert!(iter.next().is_none());
995 }
996 }
997}