Skip to main content

commonware_runtime/iobuf/
pool.rs

1//! Buffer pool for efficient I/O operations.
2//!
3//! Provides pooled, aligned buffers that can be reused to reduce allocation
4//! overhead. Buffer alignment is configurable: use page alignment for storage I/O
5//! (required for direct I/O and DMA), or cache-line alignment for network I/O
6//! (reduces fragmentation).
7//!
8//! # Thread Safety
9//!
10//! [`BufferPool`] is `Send + Sync` and can be safely shared across threads.
11//! Allocation and deallocation are lock-free operations using atomic counters
12//! and a lock-free queue ([`crossbeam_queue::ArrayQueue`]).
13//!
14//! # Pool Lifecycle
15//!
16//! The pool uses reference counting internally. Buffers hold a weak reference
17//! to the pool, so:
18//! - If a buffer is returned after the pool is dropped, it is deallocated
19//!   directly instead of being returned to the freelist.
20//! - The pool can be dropped while buffers are still in use; those buffers
21//!   remain valid and will be deallocated when they are dropped.
22//!
23//! # Size Classes
24//!
25//! Buffers are organized into power-of-two size classes from `min_size` to
26//! `max_size`. For example, with `min_size = 4096` and `max_size = 32768`:
27//! - Class 0: 4096 bytes
28//! - Class 1: 8192 bytes
29//! - Class 2: 16384 bytes
30//! - Class 3: 32768 bytes
31//!
32//! Allocation requests are rounded up to the next size class. Requests larger
33//! than `max_size` return [`PoolError::Oversized`] from [`BufferPool::try_alloc`],
34//! or fall back to an untracked aligned heap allocation from [`BufferPool::alloc`].
35
36use super::{IoBuf, IoBufMut};
37use bytes::{Buf, BufMut, Bytes};
38use commonware_utils::NZUsize;
39use crossbeam_queue::ArrayQueue;
40use prometheus_client::{
41    encoding::EncodeLabelSet,
42    metrics::{counter::Counter, family::Family, gauge::Gauge},
43    registry::Registry,
44};
45use std::{
46    alloc::{alloc, dealloc, Layout},
47    mem::ManuallyDrop,
48    num::NonZeroUsize,
49    ptr::NonNull,
50    sync::{
51        atomic::{AtomicUsize, Ordering},
52        Arc, Weak,
53    },
54};
55
56/// Error returned when buffer pool allocation fails.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum PoolError {
59    /// The requested capacity exceeds the maximum buffer size.
60    Oversized,
61    /// The pool is exhausted for the required size class.
62    Exhausted,
63}
64
65impl std::fmt::Display for PoolError {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
69            Self::Exhausted => write!(f, "pool exhausted for required size class"),
70        }
71    }
72}
73
74impl std::error::Error for PoolError {}
75
76/// Returns the system page size.
77///
78/// On Unix systems, queries the actual page size via `sysconf`.
79/// On other systems (Windows), defaults to 4KB.
80#[cfg(unix)]
81fn page_size() -> usize {
82    // SAFETY: sysconf is safe to call.
83    let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
84    if size <= 0 {
85        4096 // Safe fallback if sysconf fails
86    } else {
87        size as usize
88    }
89}
90
91#[cfg(not(unix))]
92#[allow(clippy::missing_const_for_fn)]
93fn page_size() -> usize {
94    4096
95}
96
97/// Returns the cache line size for the current architecture.
98///
99/// Uses 128 bytes for x86_64 and aarch64 as a conservative estimate that
100/// accounts for spatial prefetching. Uses 64 bytes for other architectures.
101///
102/// See: <https://github.com/crossbeam-rs/crossbeam/blob/983d56b6007ca4c22b56a665a7785f40f55c2a53/crossbeam-utils/src/cache_padded.rs>
103const fn cache_line_size() -> usize {
104    cfg_if::cfg_if! {
105        if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
106            128
107        } else {
108            64
109        }
110    }
111}
112
113/// Configuration for a buffer pool.
114#[derive(Debug, Clone)]
115pub struct BufferPoolConfig {
116    /// Minimum buffer size. Must be >= alignment and a power of two.
117    pub min_size: NonZeroUsize,
118    /// Maximum buffer size. Must be a power of two and >= min_size.
119    pub max_size: NonZeroUsize,
120    /// Maximum number of buffers per size class.
121    pub max_per_class: NonZeroUsize,
122    /// Whether to pre-allocate all buffers on pool creation.
123    pub prefill: bool,
124    /// Buffer alignment. Must be a power of two.
125    /// Use `page_size()` for storage I/O, `cache_line_size()` for network I/O.
126    pub alignment: NonZeroUsize,
127}
128
129impl Default for BufferPoolConfig {
130    fn default() -> Self {
131        Self::for_network()
132    }
133}
134
135impl BufferPoolConfig {
136    /// Network I/O preset: cache-line aligned, cache_line_size to 64KB buffers,
137    /// 4096 per class, not prefilled.
138    ///
139    /// Network operations typically need multiple concurrent buffers per connection
140    /// (message, encoding, encryption) so we allow 4096 buffers per size class.
141    /// Cache-line alignment is used because network buffers don't require page
142    /// alignment for DMA, and smaller alignment reduces internal fragmentation.
143    pub const fn for_network() -> Self {
144        let cache_line = NZUsize!(cache_line_size());
145        Self {
146            min_size: cache_line,
147            max_size: NZUsize!(64 * 1024),
148            max_per_class: NZUsize!(4096),
149            prefill: false,
150            alignment: cache_line,
151        }
152    }
153
154    /// Storage I/O preset: page-aligned, page_size to 64KB buffers, 32 per class,
155    /// not prefilled.
156    ///
157    /// Page alignment is required for direct I/O and efficient DMA transfers.
158    pub fn for_storage() -> Self {
159        let page = NZUsize!(page_size());
160        Self {
161            min_size: page,
162            max_size: NZUsize!(64 * 1024),
163            max_per_class: NZUsize!(32),
164            prefill: false,
165            alignment: page,
166        }
167    }
168
169    /// Validates the configuration, panicking on invalid values.
170    ///
171    /// # Panics
172    ///
173    /// - `alignment` is not a power of two
174    /// - `min_size` is not a power of two
175    /// - `max_size` is not a power of two
176    /// - `min_size < alignment`
177    /// - `max_size < min_size`
178    fn validate(&self) {
179        assert!(
180            self.alignment.is_power_of_two(),
181            "alignment must be a power of two"
182        );
183        assert!(
184            self.min_size.is_power_of_two(),
185            "min_size must be a power of two"
186        );
187        assert!(
188            self.max_size.is_power_of_two(),
189            "max_size must be a power of two"
190        );
191        assert!(
192            self.min_size >= self.alignment,
193            "min_size ({}) must be >= alignment ({})",
194            self.min_size,
195            self.alignment
196        );
197        assert!(
198            self.max_size >= self.min_size,
199            "max_size must be >= min_size"
200        );
201    }
202
203    /// Returns the number of size classes.
204    fn num_classes(&self) -> usize {
205        if self.max_size < self.min_size {
206            return 0;
207        }
208        // Classes are: min_size, min_size*2, min_size*4, ..., max_size
209        (self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
210    }
211
212    /// Returns the size class index for a given size.
213    /// Returns None if size > max_size.
214    fn class_index(&self, size: usize) -> Option<usize> {
215        if size > self.max_size.get() {
216            return None;
217        }
218        if size <= self.min_size.get() {
219            return Some(0);
220        }
221        // Find the smallest power-of-two class that fits
222        let size_class = size.next_power_of_two();
223        let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
224        if index < self.num_classes() {
225            Some(index)
226        } else {
227            None
228        }
229    }
230
231    /// Returns the buffer size for a given class index.
232    const fn class_size(&self, index: usize) -> usize {
233        self.min_size.get() << index
234    }
235}
236
237/// Label for buffer pool metrics, identifying the size class.
238#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
239struct SizeClassLabel {
240    size_class: u64,
241}
242
243/// Metrics for the buffer pool.
244struct PoolMetrics {
245    /// Number of buffers currently allocated (out of pool).
246    allocated: Family<SizeClassLabel, Gauge>,
247    /// Number of buffers available in the pool.
248    available: Family<SizeClassLabel, Gauge>,
249    /// Total number of successful allocations.
250    allocations_total: Family<SizeClassLabel, Counter>,
251    /// Total number of failed allocations (pool exhausted).
252    exhausted_total: Family<SizeClassLabel, Counter>,
253    /// Total number of oversized allocation requests.
254    oversized_total: Counter,
255}
256
257impl PoolMetrics {
258    fn new(registry: &mut Registry) -> Self {
259        let metrics = Self {
260            allocated: Family::default(),
261            available: Family::default(),
262            allocations_total: Family::default(),
263            exhausted_total: Family::default(),
264            oversized_total: Counter::default(),
265        };
266
267        registry.register(
268            "buffer_pool_allocated",
269            "Number of buffers currently allocated from the pool",
270            metrics.allocated.clone(),
271        );
272        registry.register(
273            "buffer_pool_available",
274            "Number of buffers available in the pool",
275            metrics.available.clone(),
276        );
277        registry.register(
278            "buffer_pool_allocations_total",
279            "Total number of successful buffer allocations",
280            metrics.allocations_total.clone(),
281        );
282        registry.register(
283            "buffer_pool_exhausted_total",
284            "Total number of failed allocations due to pool exhaustion",
285            metrics.exhausted_total.clone(),
286        );
287        registry.register(
288            "buffer_pool_oversized_total",
289            "Total number of allocation requests exceeding max buffer size",
290            metrics.oversized_total.clone(),
291        );
292
293        metrics
294    }
295}
296
297/// An aligned buffer.
298///
299/// The buffer is allocated with the specified alignment for efficient I/O operations.
300/// Deallocates itself on drop using the stored layout.
301pub(crate) struct AlignedBuffer {
302    ptr: NonNull<u8>,
303    layout: Layout,
304}
305
306// SAFETY: AlignedBuffer owns its memory and can be sent between threads.
307unsafe impl Send for AlignedBuffer {}
308// SAFETY: AlignedBuffer's memory is not shared (no interior mutability of pointer).
309unsafe impl Sync for AlignedBuffer {}
310
311impl AlignedBuffer {
312    /// Allocates a new buffer with the given capacity and alignment.
313    ///
314    /// # Panics
315    ///
316    /// Panics if allocation fails or alignment is not a power of two.
317    fn new(capacity: usize, alignment: usize) -> Self {
318        let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
319
320        // SAFETY: Layout is valid (non-zero size, power-of-two alignment).
321        let ptr = unsafe { alloc(layout) };
322        let ptr = NonNull::new(ptr).expect("allocation failed");
323
324        Self { ptr, layout }
325    }
326
327    /// Returns the capacity of the buffer.
328    #[inline]
329    const fn capacity(&self) -> usize {
330        self.layout.size()
331    }
332
333    /// Returns a raw pointer to the buffer.
334    #[inline]
335    const fn as_ptr(&self) -> *mut u8 {
336        self.ptr.as_ptr()
337    }
338}
339
340impl Drop for AlignedBuffer {
341    fn drop(&mut self) {
342        // SAFETY: ptr was allocated with this layout.
343        unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
344    }
345}
346
347/// Per-size-class state.
348///
349/// The freelist stores `Option<AlignedBuffer>` where:
350/// - `Some(buf)` = a reusable buffer
351/// - `None` = an available slot for creating a new buffer
352struct SizeClass {
353    /// The buffer size for this class.
354    size: usize,
355    /// Buffer alignment.
356    alignment: usize,
357    /// Free list storing either reusable buffers or empty slots.
358    freelist: ArrayQueue<Option<AlignedBuffer>>,
359    /// Number of buffers currently allocated (out of pool).
360    allocated: AtomicUsize,
361}
362
363impl SizeClass {
364    fn new(size: usize, alignment: usize, max_buffers: usize, prefill: bool) -> Self {
365        let freelist = ArrayQueue::new(max_buffers);
366        for _ in 0..max_buffers {
367            let entry = if prefill {
368                Some(AlignedBuffer::new(size, alignment))
369            } else {
370                None
371            };
372            let _ = freelist.push(entry);
373        }
374        Self {
375            size,
376            alignment,
377            freelist,
378            allocated: AtomicUsize::new(0),
379        }
380    }
381}
382
383/// Internal state of the buffer pool.
384pub(crate) struct BufferPoolInner {
385    config: BufferPoolConfig,
386    classes: Vec<SizeClass>,
387    metrics: PoolMetrics,
388}
389
390impl BufferPoolInner {
391    /// Try to allocate a buffer from the given size class.
392    fn try_alloc(&self, class_index: usize) -> Option<AlignedBuffer> {
393        let class = &self.classes[class_index];
394        let label = SizeClassLabel {
395            size_class: class.size as u64,
396        };
397
398        match class.freelist.pop() {
399            Some(Some(buffer)) => {
400                // Reuse existing buffer
401                class.allocated.fetch_add(1, Ordering::Relaxed);
402                self.metrics.allocations_total.get_or_create(&label).inc();
403                self.metrics.allocated.get_or_create(&label).inc();
404                self.metrics.available.get_or_create(&label).dec();
405                Some(buffer)
406            }
407            Some(None) => {
408                // Create new buffer (we have a slot)
409                class.allocated.fetch_add(1, Ordering::Relaxed);
410                self.metrics.allocations_total.get_or_create(&label).inc();
411                self.metrics.allocated.get_or_create(&label).inc();
412                Some(AlignedBuffer::new(class.size, class.alignment))
413            }
414            None => {
415                // Pool exhausted (no slots available)
416                self.metrics.exhausted_total.get_or_create(&label).inc();
417                None
418            }
419        }
420    }
421
422    /// Return a buffer to the pool.
423    fn return_buffer(&self, buffer: AlignedBuffer) {
424        // Find the class for this buffer size
425        if let Some(class_index) = self.config.class_index(buffer.capacity()) {
426            let class = &self.classes[class_index];
427            let label = SizeClassLabel {
428                size_class: class.size as u64,
429            };
430
431            class.allocated.fetch_sub(1, Ordering::Relaxed);
432            self.metrics.allocated.get_or_create(&label).dec();
433
434            // Try to return to freelist
435            match class.freelist.push(Some(buffer)) {
436                Ok(()) => {
437                    self.metrics.available.get_or_create(&label).inc();
438                }
439                Err(_buffer) => {
440                    // Freelist full, buffer is dropped and deallocated
441                }
442            }
443        }
444        // Buffer doesn't match any class (or freelist full) - it's dropped and deallocated
445    }
446}
447
448/// A pool of reusable, aligned buffers.
449///
450/// Buffers are organized into power-of-two size classes. When a buffer is requested,
451/// the smallest size class that fits is used. Buffers are automatically returned to
452/// the pool when dropped.
453///
454/// # Alignment
455///
456/// Buffer alignment is guaranteed only at the base pointer (when `cursor == 0`).
457/// After calling `Buf::advance()`, the pointer returned by `as_mut_ptr()` may
458/// no longer be aligned. For direct I/O operations that require alignment,
459/// do not advance the buffer before use.
460#[derive(Clone)]
461pub struct BufferPool {
462    inner: Arc<BufferPoolInner>,
463}
464
465impl std::fmt::Debug for BufferPool {
466    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467        f.debug_struct("BufferPool")
468            .field("config", &self.inner.config)
469            .field("num_classes", &self.inner.classes.len())
470            .finish()
471    }
472}
473
474impl BufferPool {
475    /// Creates a new buffer pool with the given configuration.
476    ///
477    /// # Panics
478    ///
479    /// Panics if the configuration is invalid.
480    pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
481        config.validate();
482
483        let metrics = PoolMetrics::new(registry);
484
485        let mut classes = Vec::with_capacity(config.num_classes());
486        for i in 0..config.num_classes() {
487            let size = config.class_size(i);
488            let class = SizeClass::new(
489                size,
490                config.alignment.get(),
491                config.max_per_class.get(),
492                config.prefill,
493            );
494            classes.push(class);
495        }
496
497        // Update available metrics after prefill
498        if config.prefill {
499            for class in &classes {
500                let label = SizeClassLabel {
501                    size_class: class.size as u64,
502                };
503                let available = class.freelist.len() as i64;
504                metrics.available.get_or_create(&label).set(available);
505            }
506        }
507
508        Self {
509            inner: Arc::new(BufferPoolInner {
510                config,
511                classes,
512                metrics,
513            }),
514        }
515    }
516
517    /// Allocates a buffer with the given capacity.
518    ///
519    /// The returned buffer has `len() == 0` and `capacity() >= capacity`,
520    /// matching the semantics of [`IoBufMut::with_capacity`] and
521    /// `BytesMut::with_capacity`. Use `put_slice` or other `BufMut` methods
522    /// to write data to the buffer.
523    ///
524    /// If the pool can provide a buffer (capacity within limits and pool not
525    /// exhausted), returns a pooled buffer that will be returned to the pool
526    /// when dropped. Otherwise, falls back to an untracked aligned heap
527    /// allocation that is deallocated when dropped.
528    ///
529    /// Use [`Self::try_alloc`] if you need to distinguish between pooled and
530    /// untracked allocations.
531    ///
532    /// # Initialization
533    ///
534    /// The returned buffer contains **uninitialized memory**. Do not read from
535    /// it until data has been written.
536    pub fn alloc(&self, capacity: usize) -> IoBufMut {
537        self.try_alloc(capacity).unwrap_or_else(|_| {
538            let size = capacity.max(self.inner.config.min_size.get());
539            let buffer = AlignedBuffer::new(size, self.inner.config.alignment.get());
540            // Using Weak::new() means the buffer won't be returned to the pool on drop.
541            IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()))
542        })
543    }
544
545    /// Attempts to allocate a pooled buffer, returning an error on failure.
546    ///
547    /// Unlike [`Self::alloc`], this method does not fall back to untracked
548    /// allocation. Use this when you need to know whether the buffer came
549    /// from the pool.
550    ///
551    /// # Errors
552    ///
553    /// - [`PoolError::Oversized`]: `capacity` exceeds `max_size`
554    /// - [`PoolError::Exhausted`]: Pool exhausted for required size class
555    pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
556        let class_index = match self.inner.config.class_index(capacity) {
557            Some(idx) => idx,
558            None => {
559                self.inner.metrics.oversized_total.inc();
560                return Err(PoolError::Oversized);
561            }
562        };
563
564        let buffer = self
565            .inner
566            .try_alloc(class_index)
567            .ok_or(PoolError::Exhausted)?;
568        let pooled = PooledBufMut::new(buffer, Arc::downgrade(&self.inner));
569        Ok(IoBufMut::from_pooled(pooled))
570    }
571
572    /// Returns the pool configuration.
573    pub fn config(&self) -> &BufferPoolConfig {
574        &self.inner.config
575    }
576}
577
578/// A mutable aligned buffer.
579///
580/// When dropped, the underlying buffer is returned to the pool if tracked,
581/// or deallocated directly if untracked (e.g. fallback allocations).
582///
583/// # Buffer Layout
584///
585/// ```text
586/// [0................cursor..............len.............raw_capacity]
587///  ^                 ^                   ^                 ^
588///  |                 |                   |                 |
589///  allocation start  read position       write position    allocation end
590///                    (consumed prefix)   (initialized)
591///
592/// Regions:
593/// - [0..cursor]:        consumed (via Buf::advance), no longer accessible
594/// - [cursor..len]:      readable bytes (as_ref returns this slice)
595/// - [len..raw_capacity]: uninitialized, writable via BufMut
596/// ```
597///
598/// # Invariants
599///
600/// - `cursor <= len <= raw_capacity`
601/// - Bytes in `0..len` have been initialized (safe to read)
602/// - Bytes in `len..raw_capacity` are uninitialized (write-only via `BufMut`)
603///
604/// # Computed Values
605///
606/// - `len()` = readable bytes = `self.len - cursor`
607/// - `capacity()` = view capacity = `raw_capacity - cursor` (shrinks after advance)
608/// - `remaining_mut()` = writable bytes = `raw_capacity - self.len`
609///
610/// This matches `BytesMut` semantics.
611///
612/// # Fixed Capacity
613///
614/// Unlike `BytesMut`, pooled buffers have **fixed capacity** and do NOT grow
615/// automatically. Calling `put_slice()` or other `BufMut` methods that would
616/// exceed capacity will panic (per the `BufMut` trait contract).
617///
618/// Always check `remaining_mut()` before writing variable-length data.
619pub struct PooledBufMut {
620    buffer: ManuallyDrop<AlignedBuffer>,
621    /// Read cursor position (for `Buf` trait).
622    cursor: usize,
623    /// Number of bytes written (initialized).
624    len: usize,
625    /// Reference to the pool.
626    pool: Weak<BufferPoolInner>,
627}
628
629impl std::fmt::Debug for PooledBufMut {
630    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
631        f.debug_struct("PooledBufMut")
632            .field("cursor", &self.cursor)
633            .field("len", &self.len)
634            .field("capacity", &self.capacity())
635            .finish()
636    }
637}
638
639impl PooledBufMut {
640    const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
641        Self {
642            buffer: ManuallyDrop::new(buffer),
643            cursor: 0,
644            len: 0,
645            pool,
646        }
647    }
648
649    /// Returns `true` if this buffer is tracked by a pool.
650    ///
651    /// Tracked buffers will be returned to their pool when dropped. Untracked
652    /// buffers (from fallback allocations) are deallocated directly.
653    #[inline]
654    pub(crate) fn is_tracked(&self) -> bool {
655        self.pool.strong_count() > 0
656    }
657
658    /// Returns the number of readable bytes remaining in the buffer.
659    ///
660    /// This is `len - cursor`, matching `BytesMut` semantics.
661    #[inline]
662    pub const fn len(&self) -> usize {
663        self.len - self.cursor
664    }
665
666    /// Returns true if no readable bytes remain.
667    #[inline]
668    pub const fn is_empty(&self) -> bool {
669        self.cursor == self.len
670    }
671
672    /// Returns the number of bytes the buffer can hold without reallocating.
673    #[inline]
674    pub fn capacity(&self) -> usize {
675        self.buffer.capacity() - self.cursor
676    }
677
678    /// Returns the raw allocation capacity (internal use only).
679    #[inline]
680    fn raw_capacity(&self) -> usize {
681        self.buffer.capacity()
682    }
683
684    /// Returns an unsafe mutable pointer to the buffer's data.
685    #[inline]
686    pub fn as_mut_ptr(&mut self) -> *mut u8 {
687        // SAFETY: cursor is always <= raw capacity
688        unsafe { self.buffer.as_ptr().add(self.cursor) }
689    }
690
691    /// Sets the length of the buffer (view-relative).
692    ///
693    /// This will explicitly set the size of the buffer without actually
694    /// modifying the data, so it is up to the caller to ensure that the data
695    /// has been initialized.
696    ///
697    /// The `len` parameter is relative to the current view (after any `advance`
698    /// calls), matching `BytesMut::set_len` semantics.
699    ///
700    /// # Safety
701    ///
702    /// Caller must ensure:
703    /// - All bytes in the range `[cursor, cursor + len)` are initialized
704    /// - `len <= capacity()` (where capacity is view-relative)
705    #[inline]
706    pub const unsafe fn set_len(&mut self, len: usize) {
707        self.len = self.cursor + len;
708    }
709
710    /// Clears the buffer, removing all data. Existing capacity is preserved.
711    #[inline]
712    pub const fn clear(&mut self) {
713        self.len = self.cursor;
714    }
715
716    /// Truncates the buffer to at most `len` readable bytes.
717    ///
718    /// If `len` is greater than the current readable length, this has no effect.
719    /// This operates on readable bytes (after cursor), matching `BytesMut::truncate`
720    /// semantics for buffers that have been advanced.
721    #[inline]
722    pub const fn truncate(&mut self, len: usize) {
723        if len < self.len() {
724            self.len = self.cursor + len;
725        }
726    }
727
728    /// Freezes the buffer into an immutable `IoBuf`.
729    ///
730    /// Only the readable portion (`cursor..len`) is included in the result.
731    /// The underlying buffer will be returned to the pool when all references
732    /// to the `IoBuf` (including slices) are dropped.
733    pub fn freeze(self) -> IoBuf {
734        // Wrap self in ManuallyDrop first to prevent Drop from running
735        // if any subsequent code panics.
736        let mut me = ManuallyDrop::new(self);
737        // SAFETY: me is wrapped in ManuallyDrop so its Drop impl won't run.
738        // ManuallyDrop::take moves the inner buffer out, leaving the wrapper empty.
739        let buffer = unsafe { ManuallyDrop::take(&mut me.buffer) };
740        let cursor = me.cursor;
741        let len = me.len;
742        let pool = std::mem::take(&mut me.pool);
743
744        Bytes::from_owner(PooledOwner::new(buffer, cursor, len, pool)).into()
745    }
746}
747
748impl AsRef<[u8]> for PooledBufMut {
749    #[inline]
750    fn as_ref(&self) -> &[u8] {
751        // SAFETY: bytes from cursor..len have been initialized.
752        unsafe { std::slice::from_raw_parts(self.buffer.as_ptr().add(self.cursor), self.len()) }
753    }
754}
755
756impl AsMut<[u8]> for PooledBufMut {
757    #[inline]
758    fn as_mut(&mut self) -> &mut [u8] {
759        let len = self.len();
760        // SAFETY: bytes from cursor..len have been initialized.
761        unsafe { std::slice::from_raw_parts_mut(self.buffer.as_ptr().add(self.cursor), len) }
762    }
763}
764
765impl Drop for PooledBufMut {
766    fn drop(&mut self) {
767        // SAFETY: Drop is only called once. freeze() wraps self in ManuallyDrop
768        // to prevent this Drop impl from running after ownership is transferred.
769        let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
770        if let Some(pool) = self.pool.upgrade() {
771            pool.return_buffer(buffer);
772        }
773        // else: buffer is dropped here, which deallocates it
774    }
775}
776
777impl Buf for PooledBufMut {
778    #[inline]
779    fn remaining(&self) -> usize {
780        self.len - self.cursor
781    }
782
783    #[inline]
784    fn chunk(&self) -> &[u8] {
785        // SAFETY: bytes from cursor..len have been initialized.
786        unsafe {
787            std::slice::from_raw_parts(
788                self.buffer.as_ptr().add(self.cursor),
789                self.len - self.cursor,
790            )
791        }
792    }
793
794    #[inline]
795    fn advance(&mut self, cnt: usize) {
796        let remaining = self.len - self.cursor;
797        assert!(cnt <= remaining, "cannot advance past end of buffer");
798        self.cursor += cnt;
799    }
800}
801
802// SAFETY: BufMut implementation for PooledBufMut.
803// - `remaining_mut()` reports bytes available for writing (raw_capacity - len)
804// - `chunk_mut()` returns uninitialized memory from len to raw_capacity
805// - `advance_mut()` advances len within bounds
806unsafe impl BufMut for PooledBufMut {
807    #[inline]
808    fn remaining_mut(&self) -> usize {
809        self.raw_capacity() - self.len
810    }
811
812    #[inline]
813    unsafe fn advance_mut(&mut self, cnt: usize) {
814        assert!(
815            cnt <= self.remaining_mut(),
816            "cannot advance past end of buffer"
817        );
818        self.len += cnt;
819    }
820
821    #[inline]
822    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
823        let raw_cap = self.raw_capacity();
824        let len = self.len;
825        // SAFETY: We have exclusive access and the slice is within raw capacity.
826        unsafe {
827            let ptr = self.buffer.as_ptr().add(len);
828            bytes::buf::UninitSlice::from_raw_parts_mut(ptr, raw_cap - len)
829        }
830    }
831}
832
833/// Owner for pooled bytes that returns the buffer to the pool on drop.
834struct PooledOwner {
835    buffer: ManuallyDrop<AlignedBuffer>,
836    /// Start offset of the data.
837    cursor: usize,
838    /// End offset of the data (exclusive).
839    len: usize,
840    pool: Weak<BufferPoolInner>,
841}
842
843impl PooledOwner {
844    const fn new(
845        buffer: AlignedBuffer,
846        cursor: usize,
847        len: usize,
848        pool: Weak<BufferPoolInner>,
849    ) -> Self {
850        Self {
851            buffer: ManuallyDrop::new(buffer),
852            cursor,
853            len,
854            pool,
855        }
856    }
857}
858
859// Required for Bytes::from_owner
860impl AsRef<[u8]> for PooledOwner {
861    fn as_ref(&self) -> &[u8] {
862        // SAFETY: bytes from cursor..len have been initialized.
863        unsafe {
864            std::slice::from_raw_parts(
865                self.buffer.as_ptr().add(self.cursor),
866                self.len - self.cursor,
867            )
868        }
869    }
870}
871
872impl Drop for PooledOwner {
873    fn drop(&mut self) {
874        // SAFETY: Drop is only called once.
875        let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
876        if let Some(pool) = self.pool.upgrade() {
877            pool.return_buffer(buffer);
878        }
879        // else: buffer is dropped here, which deallocates it
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use crate::IoBufs;
887    use bytes::BytesMut;
888    use std::{sync::mpsc, thread};
889
890    fn test_registry() -> Registry {
891        Registry::default()
892    }
893
894    /// Creates a test config with page alignment.
895    fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
896        BufferPoolConfig {
897            min_size: NZUsize!(min_size),
898            max_size: NZUsize!(max_size),
899            max_per_class: NZUsize!(max_per_class),
900            prefill: false,
901            alignment: NZUsize!(page_size()),
902        }
903    }
904
905    #[test]
906    fn test_page_size() {
907        let size = page_size();
908        assert!(size >= 4096);
909        assert!(size.is_power_of_two());
910    }
911
912    #[test]
913    fn test_aligned_buffer() {
914        let page = page_size();
915        let buf = AlignedBuffer::new(4096, page);
916        assert_eq!(buf.capacity(), 4096);
917        assert!((buf.as_ptr() as usize).is_multiple_of(page));
918
919        // Test with cache-line alignment
920        let cache_line = cache_line_size();
921        let buf2 = AlignedBuffer::new(4096, cache_line);
922        assert_eq!(buf2.capacity(), 4096);
923        assert!((buf2.as_ptr() as usize).is_multiple_of(cache_line));
924    }
925
926    #[test]
927    fn test_config_validation() {
928        let page = page_size();
929        let config = test_config(page, page * 4, 10);
930        config.validate();
931    }
932
933    #[test]
934    #[should_panic(expected = "min_size must be a power of two")]
935    fn test_config_invalid_min_size() {
936        let config = BufferPoolConfig {
937            min_size: NZUsize!(3000),
938            max_size: NZUsize!(8192),
939            max_per_class: NZUsize!(10),
940            prefill: false,
941            alignment: NZUsize!(page_size()),
942        };
943        config.validate();
944    }
945
946    #[test]
947    fn test_config_class_index() {
948        let page = page_size();
949        let config = test_config(page, page * 8, 10);
950
951        // Classes: page, page*2, page*4, page*8
952        assert_eq!(config.num_classes(), 4);
953
954        assert_eq!(config.class_index(1), Some(0));
955        assert_eq!(config.class_index(page), Some(0));
956        assert_eq!(config.class_index(page + 1), Some(1));
957        assert_eq!(config.class_index(page * 2), Some(1));
958        assert_eq!(config.class_index(page * 8), Some(3));
959        assert_eq!(config.class_index(page * 8 + 1), None);
960    }
961
962    #[test]
963    fn test_pool_alloc_and_return() {
964        let page = page_size();
965        let mut registry = test_registry();
966        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
967
968        // Allocate a buffer - returns buffer with len=0, capacity >= requested
969        let buf = pool.try_alloc(100).unwrap();
970        assert!(buf.capacity() >= page);
971        assert_eq!(buf.len(), 0);
972
973        // Drop returns to pool
974        drop(buf);
975
976        // Can allocate again
977        let buf2 = pool.try_alloc(100).unwrap();
978        assert!(buf2.capacity() >= page);
979        assert_eq!(buf2.len(), 0);
980    }
981
982    #[test]
983    fn test_pool_exhaustion() {
984        let page = page_size();
985        let mut registry = test_registry();
986        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
987
988        // Allocate max buffers
989        let _buf1 = pool.try_alloc(100).expect("first alloc should succeed");
990        let _buf2 = pool.try_alloc(100).expect("second alloc should succeed");
991
992        // Third allocation should fail
993        assert!(pool.try_alloc(100).is_err());
994    }
995
996    #[test]
997    fn test_pool_oversized() {
998        let page = page_size();
999        let mut registry = test_registry();
1000        let pool = BufferPool::new(test_config(page, page * 2, 10), &mut registry);
1001
1002        // Request larger than max_size
1003        assert!(pool.try_alloc(page * 4).is_err());
1004    }
1005
1006    #[test]
1007    fn test_pool_size_classes() {
1008        let page = page_size();
1009        let mut registry = test_registry();
1010        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1011
1012        // Small request gets smallest class
1013        let buf1 = pool.try_alloc(100).unwrap();
1014        assert_eq!(buf1.capacity(), page);
1015
1016        // Larger request gets appropriate class
1017        let buf2 = pool.try_alloc(page + 1).unwrap();
1018        assert_eq!(buf2.capacity(), page * 2);
1019
1020        let buf3 = pool.try_alloc(page * 3).unwrap();
1021        assert_eq!(buf3.capacity(), page * 4);
1022    }
1023
1024    #[test]
1025    fn test_pooled_buf_mut_freeze() {
1026        let page = page_size();
1027        let mut registry = test_registry();
1028        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1029
1030        // Allocate and initialize a buffer
1031        let mut buf = pool.try_alloc(11).unwrap();
1032        buf.put_slice(&[0u8; 11]);
1033        assert_eq!(buf.len(), 11);
1034
1035        // Write some data
1036        buf.as_mut()[..5].copy_from_slice(&[1, 2, 3, 4, 5]);
1037
1038        // Freeze preserves the content
1039        let iobuf = buf.freeze();
1040        assert_eq!(iobuf.len(), 11);
1041        assert_eq!(&iobuf.as_ref()[..5], &[1, 2, 3, 4, 5]);
1042
1043        // IoBuf can be sliced
1044        let slice = iobuf.slice(0..5);
1045        assert_eq!(slice.len(), 5);
1046    }
1047
1048    #[test]
1049    fn test_prefill() {
1050        let page = NZUsize!(page_size());
1051        let mut registry = test_registry();
1052        let pool = BufferPool::new(
1053            BufferPoolConfig {
1054                min_size: page,
1055                max_size: page,
1056                max_per_class: NZUsize!(5),
1057                prefill: true,
1058                alignment: page,
1059            },
1060            &mut registry,
1061        );
1062
1063        // Should be able to allocate max_per_class buffers immediately
1064        let mut bufs = Vec::new();
1065        for _ in 0..5 {
1066            bufs.push(pool.try_alloc(100).expect("alloc should succeed"));
1067        }
1068
1069        // Next allocation should fail
1070        assert!(pool.try_alloc(100).is_err());
1071    }
1072
1073    #[test]
1074    fn test_config_for_network() {
1075        let config = BufferPoolConfig::for_network();
1076        config.validate();
1077        assert_eq!(config.min_size.get(), cache_line_size());
1078        assert_eq!(config.max_size.get(), 64 * 1024);
1079        assert_eq!(config.max_per_class.get(), 4096);
1080        assert!(!config.prefill);
1081        assert_eq!(config.alignment.get(), cache_line_size());
1082    }
1083
1084    #[test]
1085    fn test_config_for_storage() {
1086        let config = BufferPoolConfig::for_storage();
1087        config.validate();
1088        assert_eq!(config.min_size.get(), page_size());
1089        assert_eq!(config.max_size.get(), 64 * 1024);
1090        assert_eq!(config.max_per_class.get(), 32);
1091        assert!(!config.prefill);
1092        assert_eq!(config.alignment.get(), page_size());
1093    }
1094
1095    /// Helper to get the number of allocated buffers for a size class.
1096    fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1097        let class_index = pool.inner.config.class_index(size).unwrap();
1098        pool.inner.classes[class_index]
1099            .allocated
1100            .load(Ordering::Relaxed)
1101    }
1102
1103    /// Helper to get the number of available buffers in freelist for a size class.
1104    fn get_available(pool: &BufferPool, size: usize) -> i64 {
1105        let class_index = pool.inner.config.class_index(size).unwrap();
1106        let label = SizeClassLabel {
1107            size_class: pool.inner.classes[class_index].size as u64,
1108        };
1109        pool.inner.metrics.available.get_or_create(&label).get()
1110    }
1111
1112    #[test]
1113    fn test_freeze_returns_buffer_to_pool() {
1114        let page = page_size();
1115        let mut registry = test_registry();
1116        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1117
1118        // Initially: 0 allocated, 0 available
1119        assert_eq!(get_allocated(&pool, page), 0);
1120        assert_eq!(get_available(&pool, page), 0);
1121
1122        // Allocate and freeze
1123        let buf = pool.try_alloc(100).unwrap();
1124        assert_eq!(get_allocated(&pool, page), 1);
1125        assert_eq!(get_available(&pool, page), 0);
1126
1127        let iobuf = buf.freeze();
1128        // Still allocated (held by IoBuf)
1129        assert_eq!(get_allocated(&pool, page), 1);
1130
1131        // Drop the IoBuf - buffer should return to pool
1132        drop(iobuf);
1133        assert_eq!(get_allocated(&pool, page), 0);
1134        assert_eq!(get_available(&pool, page), 1);
1135    }
1136
1137    #[test]
1138    fn test_cloned_iobuf_returns_buffer_when_all_dropped() {
1139        let page = page_size();
1140        let mut registry = test_registry();
1141        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1142
1143        let buf = pool.try_alloc(100).unwrap();
1144        let iobuf = buf.freeze();
1145
1146        // Clone the IoBuf multiple times (this clones the inner Bytes via Arc)
1147        let clone1 = iobuf.clone();
1148        let clone2 = iobuf.clone();
1149        let clone3 = iobuf.clone();
1150
1151        assert_eq!(get_allocated(&pool, page), 1);
1152
1153        // Drop original and some clones - buffer should NOT return yet
1154        drop(iobuf);
1155        drop(clone1);
1156        assert_eq!(get_allocated(&pool, page), 1);
1157        assert_eq!(get_available(&pool, page), 0);
1158
1159        // Drop remaining clones - buffer should return
1160        drop(clone2);
1161        assert_eq!(get_allocated(&pool, page), 1); // Still held by clone3
1162
1163        drop(clone3);
1164        assert_eq!(get_allocated(&pool, page), 0);
1165        assert_eq!(get_available(&pool, page), 1);
1166    }
1167
1168    #[test]
1169    fn test_slice_holds_buffer_reference() {
1170        let page = page_size();
1171        let mut registry = test_registry();
1172        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1173
1174        let mut buf = pool.try_alloc(100).unwrap();
1175        buf.put_slice(&[0u8; 100]);
1176        let iobuf = buf.freeze();
1177
1178        // Create a slice - this should hold a reference to the underlying buffer
1179        let slice = iobuf.slice(10..50);
1180
1181        // Drop original - slice should keep buffer alive
1182        drop(iobuf);
1183        assert_eq!(get_allocated(&pool, page), 1);
1184        assert_eq!(get_available(&pool, page), 0);
1185
1186        // Drop slice - buffer should return
1187        drop(slice);
1188        assert_eq!(get_allocated(&pool, page), 0);
1189        assert_eq!(get_available(&pool, page), 1);
1190    }
1191
1192    #[test]
1193    fn test_copy_to_bytes_on_pooled_buffer() {
1194        let page = page_size();
1195        let mut registry = test_registry();
1196        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1197
1198        let mut buf = pool.try_alloc(100).unwrap();
1199        buf.put_slice(&[0x42u8; 100]);
1200        let mut iobuf = buf.freeze();
1201
1202        // copy_to_bytes should create a slice sharing the same buffer
1203        let extracted = iobuf.copy_to_bytes(50);
1204        assert_eq!(extracted.len(), 50);
1205        assert!(extracted.iter().all(|&b| b == 0x42));
1206
1207        // Both should hold references
1208        assert_eq!(get_allocated(&pool, page), 1);
1209
1210        // Drop original
1211        drop(iobuf);
1212        assert_eq!(get_allocated(&pool, page), 1); // extracted holds it
1213
1214        // Drop extracted
1215        drop(extracted);
1216        assert_eq!(get_allocated(&pool, page), 0);
1217        assert_eq!(get_available(&pool, page), 1);
1218    }
1219
1220    #[test]
1221    fn test_concurrent_clones_and_drops() {
1222        let page = page_size();
1223        let mut registry = test_registry();
1224        let pool = BufferPool::new(test_config(page, page, 4), &mut registry);
1225
1226        // Simulate the pattern in Messenger::content where we clone for multiple recipients
1227        for _ in 0..100 {
1228            let buf = pool.try_alloc(100).unwrap();
1229            let iobuf = buf.freeze();
1230
1231            // Simulate sending to 10 recipients (clone for each)
1232            let clones: Vec<_> = (0..10).map(|_| iobuf.clone()).collect();
1233            drop(iobuf);
1234
1235            // Drop clones one by one
1236            for clone in clones {
1237                drop(clone);
1238            }
1239        }
1240
1241        // All buffers should be returned
1242        assert_eq!(get_allocated(&pool, page), 0);
1243    }
1244
1245    #[test]
1246    fn test_iobuf_to_iobufmut_conversion_returns_pooled_buffer() {
1247        // This tests the IoBuf -> IoBufMut conversion that happens
1248        // when send() takes impl Into<IoBufMut>
1249        let page = page_size();
1250        let mut registry = test_registry();
1251        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1252
1253        let buf = pool.try_alloc(100).unwrap();
1254        assert_eq!(get_allocated(&pool, page), 1);
1255
1256        let iobuf = buf.freeze();
1257        assert_eq!(get_allocated(&pool, page), 1);
1258
1259        // This is what happens when you call send(iobuf) where send takes impl Into<IoBufMut>
1260        // The IoBuf is converted to IoBufMut via From<IoBuf> for IoBufMut
1261        let iobufmut: IoBufMut = iobuf.into();
1262
1263        // The conversion copies data to a new BytesMut and drops the original Bytes
1264        // So the pooled buffer should be returned!
1265        assert_eq!(
1266            get_allocated(&pool, page),
1267            0,
1268            "pooled buffer should be returned after IoBuf->IoBufMut conversion"
1269        );
1270        assert_eq!(get_available(&pool, page), 1);
1271
1272        // The IoBufMut is now backed by BytesMut, not the pool
1273        drop(iobufmut);
1274        // Pool state unchanged
1275        assert_eq!(get_allocated(&pool, page), 0);
1276        assert_eq!(get_available(&pool, page), 1);
1277    }
1278
1279    #[test]
1280    fn test_stream_send_pattern() {
1281        // Simulates what stream::Sender::send does:
1282        // 1. Takes impl Into<IoBufs>
1283        // 2. Allocates encryption buffer from pool
1284        // 3. Copies plaintext into encryption buffer
1285        // 4. Encrypts in place
1286        // 5. Freezes and sends
1287        let page = page_size();
1288        let mut registry = test_registry();
1289        let pool = BufferPool::new(test_config(page, page, 4), &mut registry);
1290
1291        for _ in 0..100 {
1292            // Incoming data (could be IoBuf or IoBufMut)
1293            let mut incoming = pool.try_alloc(100).unwrap();
1294            incoming.put_slice(&[0x42u8; 100]);
1295            let incoming_iobuf = incoming.freeze();
1296
1297            // Convert to IoBufs (what send() does)
1298            let mut bufs: IoBufs = incoming_iobuf.into();
1299            let plaintext_len = bufs.remaining();
1300
1301            // Allocate encryption buffer with capacity (no init needed, we write to it)
1302            let ciphertext_len = plaintext_len + 16; // +16 for tag
1303            let mut encryption_buf = pool.try_alloc(ciphertext_len).unwrap();
1304            // SAFETY: We fill the entire buffer before reading
1305            unsafe { encryption_buf.set_len(ciphertext_len) };
1306
1307            // Copy plaintext into encryption buffer
1308            let mut offset = 0;
1309            while bufs.has_remaining() {
1310                let chunk = bufs.chunk();
1311                let chunk_len = chunk.len();
1312                encryption_buf.as_mut()[offset..offset + chunk_len].copy_from_slice(chunk);
1313                offset += chunk_len;
1314                bufs.advance(chunk_len);
1315            }
1316
1317            // At this point, bufs (which holds the incoming IoBuf) should be fully consumed
1318            // but the underlying buffer is still referenced until bufs is dropped
1319            drop(bufs);
1320
1321            // Simulate encryption (just modify in place)
1322            encryption_buf.as_mut()[plaintext_len..].fill(0xAA);
1323
1324            // Freeze and "send"
1325            let ciphertext = encryption_buf.freeze();
1326
1327            // Simulate network send completing
1328            drop(ciphertext);
1329        }
1330
1331        // All buffers should be returned
1332        assert_eq!(get_allocated(&pool, page), 0);
1333    }
1334
1335    #[test]
1336    fn test_multithreaded_alloc_freeze_return() {
1337        let page = page_size();
1338        let mut registry = test_registry();
1339        let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
1340
1341        let mut handles = vec![];
1342
1343        // Reduce iterations under miri (atomics are slow)
1344        cfg_if::cfg_if! {
1345            if #[cfg(miri)] {
1346                let iterations = 100;
1347            } else {
1348                let iterations = 1000;
1349            }
1350        }
1351
1352        // Spawn multiple threads that allocate, freeze, clone, and drop
1353        for _ in 0..10 {
1354            let pool = pool.clone();
1355            let handle = thread::spawn(move || {
1356                for _ in 0..iterations {
1357                    let buf = pool.try_alloc(100).unwrap();
1358                    let iobuf = buf.freeze();
1359
1360                    // Clone a few times
1361                    let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
1362                    drop(iobuf);
1363
1364                    // Drop clones
1365                    for clone in clones {
1366                        drop(clone);
1367                    }
1368                }
1369            });
1370            handles.push(handle);
1371        }
1372
1373        // Wait for all threads
1374        for handle in handles {
1375            handle.join().unwrap();
1376        }
1377
1378        // All buffers should be returned
1379        let class_index = pool.inner.config.class_index(page).unwrap();
1380        let allocated = pool.inner.classes[class_index]
1381            .allocated
1382            .load(Ordering::Relaxed);
1383        assert_eq!(
1384            allocated, 0,
1385            "all buffers should be returned after multithreaded test"
1386        );
1387    }
1388
1389    #[test]
1390    fn test_cross_thread_buffer_return() {
1391        // Allocate on one thread, freeze, send to another thread, drop there
1392        let page = page_size();
1393        let mut registry = test_registry();
1394        let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
1395
1396        let (tx, rx) = mpsc::channel();
1397
1398        // Allocate and freeze on main thread
1399        for _ in 0..50 {
1400            let buf = pool.try_alloc(100).unwrap();
1401            let iobuf = buf.freeze();
1402            tx.send(iobuf).unwrap();
1403        }
1404        drop(tx);
1405
1406        // Receive and drop on another thread
1407        let handle = thread::spawn(move || {
1408            while let Ok(iobuf) = rx.recv() {
1409                drop(iobuf);
1410            }
1411        });
1412
1413        handle.join().unwrap();
1414
1415        // All buffers should be returned
1416        assert_eq!(get_allocated(&pool, page), 0);
1417    }
1418
1419    #[test]
1420    fn test_pool_dropped_before_buffer() {
1421        // What happens if the pool is dropped while buffers are still in use?
1422        // The Weak reference should fail to upgrade, and the buffer should just be deallocated.
1423
1424        let page = page_size();
1425        let mut registry = test_registry();
1426        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1427
1428        let mut buf = pool.try_alloc(100).unwrap();
1429        buf.put_slice(&[0u8; 100]);
1430        let iobuf = buf.freeze();
1431
1432        // Drop the pool while buffer is still alive
1433        drop(pool);
1434
1435        // Buffer should still be usable
1436        assert_eq!(iobuf.len(), 100);
1437
1438        // Dropping the buffer should not panic (Weak upgrade fails, buffer is deallocated)
1439        drop(iobuf);
1440        // No assertion here - we just want to make sure it doesn't panic
1441    }
1442
1443    /// Verify PooledBufMut matches BytesMut semantics for Buf trait.
1444    #[test]
1445    fn test_bytesmut_parity_buf_trait() {
1446        let page = page_size();
1447        let mut registry = test_registry();
1448        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1449
1450        let mut bytes = BytesMut::with_capacity(100);
1451        bytes.put_slice(&[0xAAu8; 50]);
1452
1453        let mut pooled = pool.try_alloc(100).unwrap();
1454        pooled.put_slice(&[0xAAu8; 50]);
1455
1456        // remaining()
1457        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
1458
1459        // chunk()
1460        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
1461
1462        // advance()
1463        Buf::advance(&mut bytes, 10);
1464        Buf::advance(&mut pooled, 10);
1465        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
1466        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
1467
1468        // advance to end
1469        let remaining = Buf::remaining(&bytes);
1470        Buf::advance(&mut bytes, remaining);
1471        Buf::advance(&mut pooled, remaining);
1472        assert_eq!(Buf::remaining(&bytes), 0);
1473        assert_eq!(Buf::remaining(&pooled), 0);
1474        assert!(!Buf::has_remaining(&bytes));
1475        assert!(!Buf::has_remaining(&pooled));
1476    }
1477
1478    /// Verify PooledBufMut matches BytesMut semantics for BufMut trait.
1479    #[test]
1480    fn test_bytesmut_parity_bufmut_trait() {
1481        let page = page_size();
1482        let mut registry = test_registry();
1483        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1484
1485        let mut bytes = BytesMut::with_capacity(100);
1486        let mut pooled = pool.try_alloc(100).unwrap();
1487
1488        // remaining_mut()
1489        assert!(BufMut::remaining_mut(&bytes) >= 100);
1490        assert!(BufMut::remaining_mut(&pooled) >= 100);
1491
1492        // put_slice()
1493        BufMut::put_slice(&mut bytes, b"hello");
1494        BufMut::put_slice(&mut pooled, b"hello");
1495        assert_eq!(bytes.as_ref(), pooled.as_ref());
1496
1497        // put_u8()
1498        BufMut::put_u8(&mut bytes, 0x42);
1499        BufMut::put_u8(&mut pooled, 0x42);
1500        assert_eq!(bytes.as_ref(), pooled.as_ref());
1501
1502        // chunk_mut() - verify we can write to it
1503        let bytes_chunk = BufMut::chunk_mut(&mut bytes);
1504        let pooled_chunk = BufMut::chunk_mut(&mut pooled);
1505        assert!(bytes_chunk.len() > 0);
1506        assert!(pooled_chunk.len() > 0);
1507    }
1508
1509    /// Verify truncate works correctly after advance.
1510    #[test]
1511    fn test_bytesmut_parity_truncate_after_advance() {
1512        let page = page_size();
1513        let mut registry = test_registry();
1514        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1515
1516        let mut bytes = BytesMut::with_capacity(100);
1517        bytes.put_slice(&[0xAAu8; 50]);
1518        Buf::advance(&mut bytes, 10);
1519
1520        let mut pooled = pool.try_alloc(100).unwrap();
1521        pooled.put_slice(&[0xAAu8; 50]);
1522        Buf::advance(&mut pooled, 10);
1523
1524        // Both should have 40 bytes remaining
1525        assert_eq!(bytes.len(), 40);
1526        assert_eq!(pooled.len(), 40);
1527
1528        // Truncate to 20 readable bytes
1529        bytes.truncate(20);
1530        pooled.truncate(20);
1531
1532        assert_eq!(bytes.len(), pooled.len(), "len after truncate");
1533        assert_eq!(bytes.as_ref(), pooled.as_ref(), "content after truncate");
1534    }
1535
1536    /// Verify clear works correctly after advance.
1537    #[test]
1538    fn test_bytesmut_parity_clear_after_advance() {
1539        let page = page_size();
1540        let mut registry = test_registry();
1541        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1542
1543        let mut bytes = BytesMut::with_capacity(100);
1544        bytes.put_slice(&[0xAAu8; 50]);
1545        Buf::advance(&mut bytes, 10);
1546
1547        let mut pooled = pool.try_alloc(100).unwrap();
1548        pooled.put_slice(&[0xAAu8; 50]);
1549        Buf::advance(&mut pooled, 10);
1550
1551        bytes.clear();
1552        pooled.clear();
1553
1554        assert_eq!(bytes.len(), 0);
1555        assert_eq!(pooled.len(), 0);
1556        assert!(bytes.is_empty());
1557        assert!(pooled.is_empty());
1558    }
1559
1560    /// Test pool exhaustion and recovery.
1561    #[test]
1562    fn test_pool_exhaustion_and_recovery() {
1563        let page = page_size();
1564        let mut registry = test_registry();
1565        let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
1566
1567        // Exhaust the pool
1568        let buf1 = pool.try_alloc(100).expect("first alloc");
1569        let buf2 = pool.try_alloc(100).expect("second alloc");
1570        let buf3 = pool.try_alloc(100).expect("third alloc");
1571        assert!(pool.try_alloc(100).is_err(), "pool should be exhausted");
1572
1573        // Return one buffer
1574        drop(buf1);
1575
1576        // Should be able to allocate again
1577        let buf4 = pool.try_alloc(100).expect("alloc after return");
1578        assert!(pool.try_alloc(100).is_err(), "pool exhausted again");
1579
1580        // Return all and verify freelist reuse
1581        drop(buf2);
1582        drop(buf3);
1583        drop(buf4);
1584
1585        assert_eq!(get_allocated(&pool, page), 0);
1586        assert_eq!(get_available(&pool, page), 3);
1587
1588        // Allocate again - should reuse from freelist
1589        let _buf5 = pool.try_alloc(100).expect("reuse from freelist");
1590        assert_eq!(get_available(&pool, page), 2);
1591    }
1592
1593    /// Test try_alloc error variants.
1594    #[test]
1595    fn test_try_alloc_errors() {
1596        let page = page_size();
1597        let mut registry = test_registry();
1598        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1599
1600        // Oversized request
1601        let result = pool.try_alloc(page * 10);
1602        assert_eq!(result.unwrap_err(), PoolError::Oversized);
1603
1604        // Exhaust pool
1605        let _buf1 = pool.try_alloc(100).unwrap();
1606        let _buf2 = pool.try_alloc(100).unwrap();
1607        let result = pool.try_alloc(100);
1608        assert_eq!(result.unwrap_err(), PoolError::Exhausted);
1609    }
1610
1611    /// Test fallback allocation when pool is exhausted or oversized.
1612    #[test]
1613    fn test_fallback_allocation() {
1614        let page = page_size();
1615        let mut registry = test_registry();
1616        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1617
1618        // Exhaust the pool
1619        let buf1 = pool.try_alloc(100).unwrap();
1620        let buf2 = pool.try_alloc(100).unwrap();
1621        assert!(buf1.is_pooled());
1622        assert!(buf2.is_pooled());
1623
1624        // Fallback via alloc() when exhausted - still aligned, but untracked
1625        let mut fallback_exhausted = pool.alloc(100);
1626        assert!(!fallback_exhausted.is_pooled());
1627        assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
1628
1629        // Fallback via alloc() when oversized - still aligned, but untracked
1630        let mut fallback_oversized = pool.alloc(page * 10);
1631        assert!(!fallback_oversized.is_pooled());
1632        assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
1633
1634        // Verify pool counters unchanged by fallback allocations
1635        assert_eq!(get_allocated(&pool, page), 2);
1636
1637        // Drop fallback buffers - should not affect pool counters
1638        drop(fallback_exhausted);
1639        drop(fallback_oversized);
1640        assert_eq!(get_allocated(&pool, page), 2);
1641
1642        // Drop tracked buffers - counters should decrease
1643        drop(buf1);
1644        drop(buf2);
1645        assert_eq!(get_allocated(&pool, page), 0);
1646    }
1647
1648    /// Test is_pooled method.
1649    #[test]
1650    fn test_is_pooled() {
1651        let page = page_size();
1652        let mut registry = test_registry();
1653        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1654
1655        let pooled = pool.try_alloc(100).unwrap();
1656        assert!(pooled.is_pooled());
1657
1658        let owned = IoBufMut::with_capacity(100);
1659        assert!(!owned.is_pooled());
1660    }
1661
1662    #[test]
1663    fn test_bytesmut_parity_capacity_after_advance() {
1664        let page = page_size();
1665        let mut registry = test_registry();
1666        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1667
1668        let mut bytes = BytesMut::with_capacity(page);
1669        bytes.put_slice(&[0xAAu8; 50]);
1670
1671        let mut pooled = pool.try_alloc(page).unwrap();
1672        pooled.put_slice(&[0xAAu8; 50]);
1673
1674        // Before advance
1675        assert_eq!(bytes.len(), pooled.len(), "len before advance");
1676
1677        Buf::advance(&mut bytes, 20);
1678        Buf::advance(&mut pooled, 20);
1679
1680        // After advance: capacity shrinks, len shrinks
1681        assert_eq!(bytes.len(), pooled.len(), "len after advance");
1682        assert_eq!(
1683            bytes.capacity(),
1684            pooled.capacity(),
1685            "capacity after advance"
1686        );
1687    }
1688
1689    #[test]
1690    fn test_bytesmut_parity_set_len_after_advance() {
1691        let page = page_size();
1692        let mut registry = test_registry();
1693        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1694
1695        let mut bytes = BytesMut::with_capacity(page);
1696        bytes.resize(50, 0xBB);
1697        Buf::advance(&mut bytes, 20);
1698
1699        let mut pooled = pool.try_alloc(page).unwrap();
1700        pooled.put_slice(&[0xBB; 50]);
1701        Buf::advance(&mut pooled, 20);
1702
1703        // After put_slice(50) and advance(20): cursor=20, len=50, readable=30 bytes (20..50)
1704        // set_len(25) shrinks readable region to 25 bytes (20..45), which is within initialized range
1705        // SAFETY: We're shrinking the readable region, all bytes in range are initialized.
1706        unsafe {
1707            bytes.set_len(25);
1708            pooled.set_len(25);
1709        }
1710
1711        assert_eq!(bytes.len(), pooled.len(), "len after set_len");
1712        assert_eq!(bytes.as_ref(), pooled.as_ref(), "content after set_len");
1713    }
1714
1715    #[test]
1716    fn test_bytesmut_parity_clear_preserves_view() {
1717        let page = page_size();
1718        let mut registry = test_registry();
1719        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1720
1721        let mut bytes = BytesMut::with_capacity(page);
1722        bytes.resize(50, 0xCC);
1723        Buf::advance(&mut bytes, 20);
1724        let cap_before_clear = bytes.capacity();
1725        bytes.clear();
1726
1727        let mut pooled = pool.try_alloc(page).unwrap();
1728        pooled.put_slice(&[0xCC; 50]);
1729        Buf::advance(&mut pooled, 20);
1730        let pooled_cap_before = pooled.capacity();
1731        pooled.clear();
1732
1733        // clear() sets len to 0 but preserves capacity (doesn't resurrect prefix)
1734        assert_eq!(bytes.len(), pooled.len(), "len after clear");
1735        assert_eq!(bytes.capacity(), cap_before_clear, "bytes cap unchanged");
1736        assert_eq!(pooled.capacity(), pooled_cap_before, "pooled cap unchanged");
1737    }
1738
1739    #[test]
1740    fn test_bytesmut_parity_put_after_advance() {
1741        let page = page_size();
1742        let mut registry = test_registry();
1743        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1744
1745        let mut bytes = BytesMut::with_capacity(100);
1746        bytes.resize(30, 0xAA);
1747        Buf::advance(&mut bytes, 10);
1748        bytes.put_slice(&[0xBB; 10]);
1749
1750        let mut pooled = pool.try_alloc(100).unwrap();
1751        pooled.put_slice(&[0xAA; 30]);
1752        Buf::advance(&mut pooled, 10);
1753        pooled.put_slice(&[0xBB; 10]);
1754
1755        assert_eq!(bytes.as_ref(), pooled.as_ref(), "content after put_slice");
1756    }
1757
1758    #[test]
1759    fn test_buffer_alignment() {
1760        let page = page_size();
1761        let cache_line = cache_line_size();
1762        let mut registry = test_registry();
1763
1764        // Reduce max_per_class under miri (atomics are slow)
1765        cfg_if::cfg_if! {
1766            if #[cfg(miri)] {
1767                let storage_config = BufferPoolConfig {
1768                    max_per_class: NZUsize!(32),
1769                    ..BufferPoolConfig::for_storage()
1770                };
1771                let network_config = BufferPoolConfig {
1772                    max_per_class: NZUsize!(32),
1773                    ..BufferPoolConfig::for_network()
1774                };
1775            } else {
1776                let storage_config = BufferPoolConfig::for_storage();
1777                let network_config = BufferPoolConfig::for_network();
1778            }
1779        }
1780
1781        // Storage preset - page aligned
1782        let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
1783        let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
1784        assert_eq!(
1785            buf.as_mut_ptr() as usize % page,
1786            0,
1787            "storage buffer not page-aligned"
1788        );
1789
1790        // Network preset - cache-line aligned
1791        let network_buffer_pool = BufferPool::new(network_config, &mut registry);
1792        let mut buf = network_buffer_pool.try_alloc(100).unwrap();
1793        assert_eq!(
1794            buf.as_mut_ptr() as usize % cache_line,
1795            0,
1796            "network buffer not cache-line aligned"
1797        );
1798    }
1799
1800    #[test]
1801    fn test_freeze_after_advance_to_end() {
1802        let page = page_size();
1803        let mut registry = test_registry();
1804        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1805
1806        let mut buf = pool.try_alloc(100).unwrap();
1807        buf.put_slice(&[0x42; 100]);
1808        Buf::advance(&mut buf, 100);
1809
1810        let frozen = buf.freeze();
1811        assert!(frozen.is_empty());
1812    }
1813
1814    #[test]
1815    fn test_zero_capacity_allocation() {
1816        let page = page_size();
1817        let mut registry = test_registry();
1818        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1819
1820        let buf = pool.try_alloc(0).expect("zero capacity should succeed");
1821        assert_eq!(buf.capacity(), page);
1822        assert_eq!(buf.len(), 0);
1823    }
1824
1825    #[test]
1826    fn test_exact_max_size_allocation() {
1827        let page = page_size();
1828        let mut registry = test_registry();
1829        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1830
1831        let buf = pool.try_alloc(page).expect("exact max size should succeed");
1832        assert_eq!(buf.capacity(), page);
1833    }
1834
1835    #[test]
1836    fn test_freeze_after_partial_advance_mut() {
1837        let page = page_size();
1838        let mut registry = test_registry();
1839        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1840
1841        let mut buf = pool.try_alloc(100).unwrap();
1842        // Write 50 bytes of initialized data
1843        buf.put_slice(&[0xAA; 50]);
1844        // Consume 20 bytes via Buf
1845        Buf::advance(&mut buf, 20);
1846        // Freeze should only contain 30 bytes
1847        let frozen = buf.freeze();
1848        assert_eq!(frozen.len(), 30);
1849        assert_eq!(frozen.as_ref(), &[0xAA; 30]);
1850    }
1851
1852    #[test]
1853    fn test_interleaved_advance_and_write() {
1854        let page = page_size();
1855        let mut registry = test_registry();
1856        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1857
1858        let mut buf = pool.try_alloc(100).unwrap();
1859        buf.put_slice(b"hello");
1860        Buf::advance(&mut buf, 2);
1861        buf.put_slice(b"world");
1862        assert_eq!(buf.as_ref(), b"lloworld");
1863    }
1864
1865    #[test]
1866    fn test_freeze_slice_clone_refcount() {
1867        let page = page_size();
1868        let mut registry = test_registry();
1869        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1870
1871        let mut buf = pool.try_alloc(100).unwrap();
1872        buf.put_slice(&[0x42; 100]);
1873        let iobuf = buf.freeze();
1874        let slice = iobuf.slice(10..50);
1875        let clone1 = slice.clone();
1876        let clone2 = iobuf.clone();
1877
1878        drop(iobuf);
1879        drop(slice);
1880        assert_eq!(get_allocated(&pool, page), 1); // Still held by clones
1881
1882        drop(clone1);
1883        assert_eq!(get_allocated(&pool, page), 1); // Still held by clone2
1884
1885        drop(clone2);
1886        assert_eq!(get_allocated(&pool, page), 0); // Finally returned
1887    }
1888
1889    #[test]
1890    fn test_truncate_beyond_len_is_noop() {
1891        let page = page_size();
1892        let mut registry = test_registry();
1893        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1894
1895        // BytesMut behavior
1896        let mut bytes = BytesMut::with_capacity(100);
1897        bytes.resize(50, 0xAA);
1898        bytes.truncate(100); // Should be no-op
1899        assert_eq!(bytes.len(), 50);
1900
1901        // PooledBufMut should match
1902        let mut pooled = pool.try_alloc(100).unwrap();
1903        pooled.put_slice(&[0xAA; 50]);
1904        pooled.truncate(100); // Should be no-op
1905        assert_eq!(pooled.len(), 50);
1906    }
1907
1908    #[test]
1909    fn test_freeze_empty_after_clear() {
1910        let page = page_size();
1911        let mut registry = test_registry();
1912        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
1913
1914        let mut buf = pool.try_alloc(100).unwrap();
1915        buf.put_slice(&[0xAA; 50]);
1916        buf.clear();
1917
1918        let frozen = buf.freeze();
1919        assert!(frozen.is_empty());
1920        assert_eq!(frozen.len(), 0);
1921
1922        // Should still return to pool on drop
1923        drop(frozen);
1924        assert_eq!(get_available(&pool, page), 1);
1925    }
1926
1927    #[test]
1928    fn test_alignment_after_advance() {
1929        let page = page_size();
1930        let mut registry = test_registry();
1931        let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
1932
1933        let mut buf = pool.try_alloc(100).unwrap();
1934        buf.put_slice(&[0; 100]);
1935
1936        // Initially aligned
1937        assert_eq!(buf.as_mut_ptr() as usize % page, 0);
1938
1939        // After advance, alignment may be broken
1940        Buf::advance(&mut buf, 7);
1941        // Pointer is now at offset 7, not page-aligned
1942        assert_ne!(buf.as_mut_ptr() as usize % page, 0);
1943    }
1944}