Skip to main content

commonware_runtime/iobuf/
pool.rs

1//! Buffer pool for efficient I/O operations.
2//!
3//! Provides pooled, aligned buffers that can be reused to reduce allocation
4//! overhead. Buffer alignment is configurable: use page alignment for storage I/O
5//! (required for direct I/O and DMA), or cache-line alignment for network I/O
6//! (reduces fragmentation).
7//!
8//! # Thread Safety
9//!
10//! [`BufferPool`] is `Send + Sync` and can be safely shared across threads.
11//! Allocation and deallocation are lock-free operations using atomic counters
12//! and a lock-free queue ([`crossbeam_queue::ArrayQueue`]).
13//!
14//! # Pool Lifecycle
15//!
16//! The pool uses reference counting internally. Buffers hold a weak reference
17//! to the pool, so:
18//! - If a buffer is returned after the pool is dropped, it is deallocated
19//!   directly instead of being returned to the freelist.
20//! - The pool can be dropped while buffers are still in use; those buffers
21//!   remain valid and will be deallocated when they are dropped.
22//!
23//! # Size Classes
24//!
25//! Buffers are organized into power-of-two size classes from `min_size` to
26//! `max_size`. For example, with `min_size = 4096` and `max_size = 32768`:
27//! - Class 0: 4096 bytes
28//! - Class 1: 8192 bytes
29//! - Class 2: 16384 bytes
30//! - Class 3: 32768 bytes
31//!
32//! Allocation requests are rounded up to the next size class. Requests larger
33//! than `max_size` return [`PoolError::Oversized`] from [`BufferPool::try_alloc`],
34//! or fall back to an untracked aligned heap allocation from [`BufferPool::alloc`].
35
36use super::{IoBuf, IoBufMut};
37use bytes::{Buf, BufMut, Bytes};
38use commonware_utils::NZUsize;
39use crossbeam_queue::ArrayQueue;
40use prometheus_client::{
41    encoding::EncodeLabelSet,
42    metrics::{counter::Counter, family::Family, gauge::Gauge},
43    registry::Registry,
44};
45use std::{
46    alloc::{alloc, alloc_zeroed, dealloc, handle_alloc_error, Layout},
47    mem::ManuallyDrop,
48    num::NonZeroUsize,
49    ops::{Bound, RangeBounds},
50    ptr::NonNull,
51    sync::{
52        atomic::{AtomicUsize, Ordering},
53        Arc, Weak,
54    },
55};
56
57/// Error returned when buffer pool allocation fails.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum PoolError {
60    /// The requested capacity exceeds the maximum buffer size.
61    Oversized,
62    /// The pool is exhausted for the required size class.
63    Exhausted,
64}
65
66impl std::fmt::Display for PoolError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
70            Self::Exhausted => write!(f, "pool exhausted for required size class"),
71        }
72    }
73}
74
75impl std::error::Error for PoolError {}
76
77/// Returns the system page size.
78///
79/// On Unix systems, queries the actual page size via `sysconf`.
80/// On other systems (Windows), defaults to 4KB.
81#[cfg(unix)]
82fn page_size() -> usize {
83    // SAFETY: sysconf is safe to call.
84    let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
85    if size <= 0 {
86        4096 // Safe fallback if sysconf fails
87    } else {
88        size as usize
89    }
90}
91
92#[cfg(not(unix))]
93#[allow(clippy::missing_const_for_fn)]
94fn page_size() -> usize {
95    4096
96}
97
98/// Returns the cache line size for the current architecture.
99///
100/// Uses 128 bytes for x86_64 and aarch64 as a conservative estimate that
101/// accounts for spatial prefetching. Uses 64 bytes for other architectures.
102///
103/// See: <https://github.com/crossbeam-rs/crossbeam/blob/983d56b6007ca4c22b56a665a7785f40f55c2a53/crossbeam-utils/src/cache_padded.rs>
104const fn cache_line_size() -> usize {
105    cfg_if::cfg_if! {
106        if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
107            128
108        } else {
109            64
110        }
111    }
112}
113
114/// Configuration for a buffer pool.
115#[derive(Debug, Clone)]
116pub struct BufferPoolConfig {
117    /// Minimum buffer size. Must be >= alignment and a power of two.
118    pub min_size: NonZeroUsize,
119    /// Maximum buffer size. Must be a power of two and >= min_size.
120    pub max_size: NonZeroUsize,
121    /// Maximum number of buffers per size class.
122    pub max_per_class: NonZeroUsize,
123    /// Whether to pre-allocate all buffers on pool creation.
124    pub prefill: bool,
125    /// Buffer alignment. Must be a power of two.
126    /// Use `page_size()` for storage I/O and `cache_line_size()` for network I/O.
127    pub alignment: NonZeroUsize,
128}
129
130impl BufferPoolConfig {
131    /// Network I/O preset: cache-line aligned, cache_line_size to 64KB buffers,
132    /// 4096 per class, not prefilled.
133    ///
134    /// Network operations typically need multiple concurrent buffers per connection
135    /// (message, encoding, encryption) so we allow 4096 buffers per size class.
136    /// Cache-line alignment is used because network buffers don't require page
137    /// alignment for DMA, and smaller alignment reduces internal fragmentation.
138    pub const fn for_network() -> Self {
139        let cache_line = NZUsize!(cache_line_size());
140        Self {
141            min_size: cache_line,
142            max_size: NZUsize!(64 * 1024),
143            max_per_class: NZUsize!(4096),
144            prefill: false,
145            alignment: cache_line,
146        }
147    }
148
149    /// Storage I/O preset: page-aligned, page_size to 8MB buffers, 32 per class,
150    /// not prefilled.
151    ///
152    /// Page alignment is required for direct I/O and efficient DMA transfers.
153    pub fn for_storage() -> Self {
154        let page = NZUsize!(page_size());
155        Self {
156            min_size: page,
157            max_size: NZUsize!(8 * 1024 * 1024),
158            max_per_class: NZUsize!(32),
159            prefill: false,
160            alignment: page,
161        }
162    }
163
164    /// Returns a copy of this config with a new minimum buffer size.
165    pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
166        self.min_size = min_size;
167        self
168    }
169
170    /// Returns a copy of this config with a new maximum buffer size.
171    pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
172        self.max_size = max_size;
173        self
174    }
175
176    /// Returns a copy of this config with a new maximum number of buffers per size class.
177    pub const fn with_max_per_class(mut self, max_per_class: NonZeroUsize) -> Self {
178        self.max_per_class = max_per_class;
179        self
180    }
181
182    /// Returns a copy of this config with a new prefill setting.
183    pub const fn with_prefill(mut self, prefill: bool) -> Self {
184        self.prefill = prefill;
185        self
186    }
187
188    /// Returns a copy of this config with a new alignment.
189    pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
190        self.alignment = alignment;
191        self
192    }
193
194    /// Returns a copy of this config sized for an approximate tracked-memory budget.
195    ///
196    /// This computes `max_per_class` as:
197    ///
198    /// `ceil(budget_bytes / sum(size_class_bytes))`
199    ///
200    /// where `size_class_bytes` includes every class from `min_size` to `max_size`.
201    /// This always rounds up to at least one buffer per size class, so the
202    /// resulting estimated capacity may exceed `budget_bytes`.
203    pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
204        let mut class_bytes = 0usize;
205        for i in 0..self.num_classes() {
206            class_bytes = class_bytes.saturating_add(self.class_size(i));
207        }
208        if class_bytes == 0 {
209            return self;
210        }
211        self.max_per_class = NZUsize!(budget_bytes.get().div_ceil(class_bytes));
212        self
213    }
214
215    /// Validates the configuration, panicking on invalid values.
216    ///
217    /// # Panics
218    ///
219    /// - `alignment` is not a power of two
220    /// - `min_size` is not a power of two
221    /// - `max_size` is not a power of two
222    /// - `min_size < alignment`
223    /// - `max_size < min_size`
224    fn validate(&self) {
225        assert!(
226            self.alignment.is_power_of_two(),
227            "alignment must be a power of two"
228        );
229        assert!(
230            self.min_size.is_power_of_two(),
231            "min_size must be a power of two"
232        );
233        assert!(
234            self.max_size.is_power_of_two(),
235            "max_size must be a power of two"
236        );
237        assert!(
238            self.min_size >= self.alignment,
239            "min_size ({}) must be >= alignment ({})",
240            self.min_size,
241            self.alignment
242        );
243        assert!(
244            self.max_size >= self.min_size,
245            "max_size must be >= min_size"
246        );
247    }
248
249    /// Returns the number of size classes.
250    fn num_classes(&self) -> usize {
251        if self.max_size < self.min_size {
252            return 0;
253        }
254        // Classes are: min_size, min_size*2, min_size*4, ..., max_size
255        (self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
256    }
257
258    /// Returns the size class index for a given size.
259    /// Returns None if size > max_size.
260    fn class_index(&self, size: usize) -> Option<usize> {
261        if size > self.max_size.get() {
262            return None;
263        }
264        if size <= self.min_size.get() {
265            return Some(0);
266        }
267        // Find the smallest power-of-two class that fits
268        let size_class = size.next_power_of_two();
269        let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
270        if index < self.num_classes() {
271            Some(index)
272        } else {
273            None
274        }
275    }
276
277    /// Returns the buffer size for a given class index.
278    const fn class_size(&self, index: usize) -> usize {
279        self.min_size.get() << index
280    }
281}
282
283/// Label for buffer pool metrics, identifying the size class.
284#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
285struct SizeClassLabel {
286    size_class: u64,
287}
288
289/// Metrics for the buffer pool.
290struct PoolMetrics {
291    /// Number of buffers currently allocated (out of pool).
292    allocated: Family<SizeClassLabel, Gauge>,
293    /// Number of buffers available in the pool.
294    available: Family<SizeClassLabel, Gauge>,
295    /// Total number of successful allocations.
296    allocations_total: Family<SizeClassLabel, Counter>,
297    /// Total number of failed allocations (pool exhausted).
298    exhausted_total: Family<SizeClassLabel, Counter>,
299    /// Total number of oversized allocation requests.
300    oversized_total: Counter,
301}
302
303impl PoolMetrics {
304    fn new(registry: &mut Registry) -> Self {
305        let metrics = Self {
306            allocated: Family::default(),
307            available: Family::default(),
308            allocations_total: Family::default(),
309            exhausted_total: Family::default(),
310            oversized_total: Counter::default(),
311        };
312
313        registry.register(
314            "buffer_pool_allocated",
315            "Number of buffers currently allocated from the pool",
316            metrics.allocated.clone(),
317        );
318        registry.register(
319            "buffer_pool_available",
320            "Number of buffers available in the pool",
321            metrics.available.clone(),
322        );
323        registry.register(
324            "buffer_pool_allocations_total",
325            "Total number of successful buffer allocations",
326            metrics.allocations_total.clone(),
327        );
328        registry.register(
329            "buffer_pool_exhausted_total",
330            "Total number of failed allocations due to pool exhaustion",
331            metrics.exhausted_total.clone(),
332        );
333        registry.register(
334            "buffer_pool_oversized_total",
335            "Total number of allocation requests exceeding max buffer size",
336            metrics.oversized_total.clone(),
337        );
338
339        metrics
340    }
341}
342
343/// An aligned buffer.
344///
345/// The buffer is allocated with the specified alignment for efficient I/O operations.
346/// Deallocates itself on drop using the stored layout.
347pub(crate) struct AlignedBuffer {
348    ptr: NonNull<u8>,
349    layout: Layout,
350}
351
352// SAFETY: AlignedBuffer owns its memory and can be sent between threads.
353unsafe impl Send for AlignedBuffer {}
354// SAFETY: AlignedBuffer's memory is not shared (no interior mutability of pointer).
355unsafe impl Sync for AlignedBuffer {}
356
357impl AlignedBuffer {
358    /// Allocates a new buffer with the given capacity and alignment.
359    ///
360    /// # Panics
361    ///
362    /// Panics if:
363    /// - `capacity == 0`
364    /// - `alignment` is zero or not a power of two
365    /// - `capacity`, rounded up to `alignment`, exceeds `isize::MAX`
366    ///
367    /// # Aborts
368    ///
369    /// Aborts the process on allocation failure via `handle_alloc_error`.
370    fn new(capacity: usize, alignment: usize) -> Self {
371        assert!(capacity > 0, "capacity must be greater than zero");
372        let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
373
374        // SAFETY: Layout is valid and has non-zero size.
375        let ptr = unsafe { alloc(layout) };
376        let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout));
377
378        Self { ptr, layout }
379    }
380
381    /// Allocates a new zero-initialized buffer with the given capacity and alignment.
382    ///
383    /// # Panics
384    ///
385    /// Panics if:
386    /// - `capacity == 0`
387    /// - `alignment` is zero or not a power of two
388    /// - `capacity`, rounded up to `alignment`, exceeds `isize::MAX`
389    ///
390    /// # Aborts
391    ///
392    /// Aborts the process on allocation failure via `handle_alloc_error`.
393    fn new_zeroed(capacity: usize, alignment: usize) -> Self {
394        assert!(capacity > 0, "capacity must be greater than zero");
395        let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
396
397        // SAFETY: Layout is valid and has non-zero size.
398        let ptr = unsafe { alloc_zeroed(layout) };
399        let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout));
400
401        Self { ptr, layout }
402    }
403
404    /// Returns the capacity of the buffer.
405    #[inline]
406    const fn capacity(&self) -> usize {
407        self.layout.size()
408    }
409
410    /// Returns a raw pointer to the buffer.
411    #[inline]
412    const fn as_ptr(&self) -> *mut u8 {
413        self.ptr.as_ptr()
414    }
415}
416
417impl Drop for AlignedBuffer {
418    fn drop(&mut self) {
419        // SAFETY: ptr was allocated with this layout.
420        unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
421    }
422}
423
424/// Per-size-class state.
425///
426/// The freelist stores `Option<AlignedBuffer>` where:
427/// - `Some(buf)` = a reusable buffer
428/// - `None` = an available slot for creating a new buffer
429struct SizeClass {
430    /// The buffer size for this class.
431    size: usize,
432    /// Buffer alignment.
433    alignment: usize,
434    /// Free list storing either reusable buffers or empty slots.
435    freelist: ArrayQueue<Option<AlignedBuffer>>,
436    /// Number of buffers currently allocated (out of pool).
437    allocated: AtomicUsize,
438}
439
440impl SizeClass {
441    fn new(size: usize, alignment: usize, max_buffers: usize, prefill: bool) -> Self {
442        let freelist = ArrayQueue::new(max_buffers);
443        for _ in 0..max_buffers {
444            let entry = if prefill {
445                Some(AlignedBuffer::new(size, alignment))
446            } else {
447                None
448            };
449            let _ = freelist.push(entry);
450        }
451        Self {
452            size,
453            alignment,
454            freelist,
455            allocated: AtomicUsize::new(0),
456        }
457    }
458}
459
460/// Internal allocation result for pooled allocations.
461struct Allocation {
462    buffer: AlignedBuffer,
463    is_new: bool,
464}
465
466/// Internal state of the buffer pool.
467pub(crate) struct BufferPoolInner {
468    config: BufferPoolConfig,
469    classes: Vec<SizeClass>,
470    metrics: PoolMetrics,
471}
472
473impl BufferPoolInner {
474    /// Try to allocate a buffer from the given size class.
475    ///
476    /// If `zero_on_new` is true, newly-created buffers are allocated with
477    /// `alloc_zeroed`. Reused buffers are never re-zeroed here.
478    fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
479        let class = &self.classes[class_index];
480        let label = SizeClassLabel {
481            size_class: class.size as u64,
482        };
483
484        match class.freelist.pop() {
485            Some(Some(buffer)) => {
486                // Reuse existing buffer
487                class.allocated.fetch_add(1, Ordering::Relaxed);
488                self.metrics.allocations_total.get_or_create(&label).inc();
489                self.metrics.allocated.get_or_create(&label).inc();
490                self.metrics.available.get_or_create(&label).dec();
491                Some(Allocation {
492                    buffer,
493                    is_new: false,
494                })
495            }
496            Some(None) => {
497                // Create new buffer (we have a slot)
498                class.allocated.fetch_add(1, Ordering::Relaxed);
499                self.metrics.allocations_total.get_or_create(&label).inc();
500                self.metrics.allocated.get_or_create(&label).inc();
501                let buffer = if zero_on_new {
502                    AlignedBuffer::new_zeroed(class.size, class.alignment)
503                } else {
504                    AlignedBuffer::new(class.size, class.alignment)
505                };
506                Some(Allocation {
507                    buffer,
508                    is_new: true,
509                })
510            }
511            None => {
512                // Pool exhausted (no slots available)
513                self.metrics.exhausted_total.get_or_create(&label).inc();
514                None
515            }
516        }
517    }
518
519    /// Return a buffer to the pool.
520    fn return_buffer(&self, buffer: AlignedBuffer) {
521        // Find the class for this buffer size
522        if let Some(class_index) = self.config.class_index(buffer.capacity()) {
523            let class = &self.classes[class_index];
524            let label = SizeClassLabel {
525                size_class: class.size as u64,
526            };
527
528            class.allocated.fetch_sub(1, Ordering::Relaxed);
529            self.metrics.allocated.get_or_create(&label).dec();
530
531            // Try to return to freelist
532            match class.freelist.push(Some(buffer)) {
533                Ok(()) => {
534                    self.metrics.available.get_or_create(&label).inc();
535                }
536                Err(_buffer) => {
537                    // Freelist full, buffer is dropped and deallocated
538                }
539            }
540        }
541        // Buffer doesn't match any class (or freelist full) - it's dropped and deallocated
542    }
543}
544
545/// A pool of reusable, aligned buffers.
546///
547/// Buffers are organized into power-of-two size classes. When a buffer is requested,
548/// the smallest size class that fits is used. Buffers are automatically returned to
549/// the pool when dropped.
550///
551/// # Alignment
552///
553/// Buffer alignment is guaranteed only at the base pointer (when `cursor == 0`).
554/// After calling [`Buf::advance`], the pointer returned by `as_mut_ptr()` may
555/// no longer be aligned. For direct I/O operations that require alignment,
556/// do not advance the buffer before use.
557#[derive(Clone)]
558pub struct BufferPool {
559    inner: Arc<BufferPoolInner>,
560}
561
562impl std::fmt::Debug for BufferPool {
563    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564        f.debug_struct("BufferPool")
565            .field("config", &self.inner.config)
566            .field("num_classes", &self.inner.classes.len())
567            .finish()
568    }
569}
570
571impl BufferPool {
572    /// Creates a new buffer pool with the given configuration.
573    ///
574    /// # Panics
575    ///
576    /// Panics if the configuration is invalid.
577    pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
578        config.validate();
579
580        let metrics = PoolMetrics::new(registry);
581
582        let mut classes = Vec::with_capacity(config.num_classes());
583        for i in 0..config.num_classes() {
584            let size = config.class_size(i);
585            let class = SizeClass::new(
586                size,
587                config.alignment.get(),
588                config.max_per_class.get(),
589                config.prefill,
590            );
591            classes.push(class);
592        }
593
594        // Update available metrics after prefill
595        if config.prefill {
596            for class in &classes {
597                let label = SizeClassLabel {
598                    size_class: class.size as u64,
599                };
600                let available = class.freelist.len() as i64;
601                metrics.available.get_or_create(&label).set(available);
602            }
603        }
604
605        Self {
606            inner: Arc::new(BufferPoolInner {
607                config,
608                classes,
609                metrics,
610            }),
611        }
612    }
613
614    /// Returns the size class index for `capacity`, recording oversized metrics on failure.
615    fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
616        let class_index = self.inner.config.class_index(capacity);
617        if class_index.is_none() {
618            self.inner.metrics.oversized_total.inc();
619        }
620        class_index
621    }
622
623    /// Attempts to allocate a pooled buffer.
624    ///
625    /// Unlike [`Self::alloc`], this method does not fall back to untracked
626    /// allocation.
627    ///
628    /// The returned buffer has `len() == 0` and `capacity() >= capacity`.
629    ///
630    /// # Initialization
631    ///
632    /// The returned buffer contains **uninitialized memory**. Do not read from
633    /// it until data has been written.
634    ///
635    /// # Errors
636    ///
637    /// - [`PoolError::Oversized`]: `capacity` exceeds `max_size`
638    /// - [`PoolError::Exhausted`]: Pool exhausted for required size class
639    pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
640        let class_index = self
641            .class_index_or_record_oversized(capacity)
642            .ok_or(PoolError::Oversized)?;
643
644        let buffer = self
645            .inner
646            .try_alloc(class_index, false)
647            .map(|allocation| allocation.buffer)
648            .ok_or(PoolError::Exhausted)?;
649        let pooled = PooledBufMut::new(buffer, Arc::downgrade(&self.inner));
650        Ok(IoBufMut::from_pooled(pooled))
651    }
652
653    /// Allocates a buffer with capacity for at least `capacity` bytes.
654    ///
655    /// The returned buffer has `len() == 0` and `capacity() >= capacity`,
656    /// matching the semantics of [`IoBufMut::with_capacity`] and
657    /// [`bytes::BytesMut::with_capacity`]. Use [`BufMut::put_slice`] or other
658    /// [`BufMut`] methods to write data to the buffer.
659    ///
660    /// If the pool can provide a buffer (capacity within limits and pool not
661    /// exhausted), returns a pooled buffer that will be returned to the pool
662    /// when dropped. Otherwise, falls back to an untracked aligned heap
663    /// allocation that is deallocated when dropped.
664    ///
665    /// Use [`Self::try_alloc`] if you need pooled-only behavior.
666    ///
667    /// # Initialization
668    ///
669    /// The returned buffer contains **uninitialized memory**. Do not read from
670    /// it until data has been written.
671    pub fn alloc(&self, capacity: usize) -> IoBufMut {
672        self.try_alloc(capacity).unwrap_or_else(|_| {
673            let size = capacity.max(self.inner.config.min_size.get());
674            let buffer = AlignedBuffer::new(size, self.inner.config.alignment.get());
675            // Using Weak::new() means the buffer won't be returned to the pool on drop.
676            IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()))
677        })
678    }
679
680    /// Allocates a buffer and sets its readable length to `len` without
681    /// initializing bytes.
682    ///
683    /// Equivalent to [`Self::alloc`] followed by [`IoBufMut::set_len`].
684    ///
685    /// # Safety
686    ///
687    /// Caller must ensure all bytes are initialized before any read operation.
688    pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
689        let mut buf = self.alloc(len);
690        // SAFETY: guaranteed by caller.
691        unsafe { buf.set_len(len) };
692        buf
693    }
694
695    /// Attempts to allocate a zero-initialized pooled buffer.
696    ///
697    /// Unlike [`Self::alloc_zeroed`], this method does not fall back to
698    /// untracked allocation.
699    ///
700    /// The returned buffer has `len() == len` and `capacity() >= len`.
701    ///
702    /// # Initialization
703    ///
704    /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
705    /// may be uninitialized.
706    ///
707    /// # Errors
708    ///
709    /// - [`PoolError::Oversized`]: `len` exceeds `max_size`
710    /// - [`PoolError::Exhausted`]: Pool exhausted for required size class
711    pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
712        let class_index = self
713            .class_index_or_record_oversized(len)
714            .ok_or(PoolError::Oversized)?;
715        let allocation = self
716            .inner
717            .try_alloc(class_index, true)
718            .ok_or(PoolError::Exhausted)?;
719
720        let mut buf = IoBufMut::from_pooled(PooledBufMut::new(
721            allocation.buffer,
722            Arc::downgrade(&self.inner),
723        ));
724        if allocation.is_new {
725            // SAFETY: buffer was allocated with alloc_zeroed, so bytes in 0..len are initialized.
726            unsafe { buf.set_len(len) };
727        } else {
728            // Reused buffers may contain old bytes, re-zero requested readable range.
729            buf.put_bytes(0, len);
730        }
731        Ok(buf)
732    }
733
734    /// Allocates a zero-initialized buffer with readable length `len`.
735    ///
736    /// The returned buffer has `len() == len` and `capacity() >= len`.
737    ///
738    /// If the pool can provide a buffer (len within limits and pool not
739    /// exhausted), returns a pooled buffer that will be returned to the pool
740    /// when dropped. Otherwise, falls back to an untracked aligned heap
741    /// allocation that is deallocated when dropped.
742    ///
743    /// Use this for read APIs that require an initialized `&mut [u8]`.
744    /// This avoids `unsafe set_len` at callsites.
745    ///
746    /// Use [`Self::try_alloc_zeroed`] if you need pooled-only behavior.
747    ///
748    /// # Initialization
749    ///
750    /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
751    /// may be uninitialized.
752    pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
753        self.try_alloc_zeroed(len).unwrap_or_else(|_| {
754            // Pool exhausted or oversized: allocate untracked zeroed memory.
755            let size = len.max(self.inner.config.min_size.get());
756            let buffer = AlignedBuffer::new_zeroed(size, self.inner.config.alignment.get());
757            let mut buf = IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()));
758            // SAFETY: buffer was allocated with alloc_zeroed, so bytes in 0..len are initialized.
759            unsafe { buf.set_len(len) };
760            buf
761        })
762    }
763
764    /// Returns the pool configuration.
765    pub fn config(&self) -> &BufferPoolConfig {
766        &self.inner.config
767    }
768}
769
770/// Shared pooled allocation.
771///
772/// On drop, returns the aligned buffer to the pool if tracked.
773struct PooledBufInner {
774    buffer: ManuallyDrop<AlignedBuffer>,
775    pool: Weak<BufferPoolInner>,
776}
777
778impl PooledBufInner {
779    const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
780        Self {
781            buffer: ManuallyDrop::new(buffer),
782            pool,
783        }
784    }
785
786    #[inline]
787    fn capacity(&self) -> usize {
788        self.buffer.capacity()
789    }
790}
791
792impl Drop for PooledBufInner {
793    fn drop(&mut self) {
794        // SAFETY: Drop is called at most once for this value.
795        let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
796        if let Some(pool) = self.pool.upgrade() {
797            pool.return_buffer(buffer);
798        }
799        // else: buffer is dropped here, which deallocates it
800    }
801}
802
803/// Immutable, reference-counted view over a pooled allocation.
804///
805/// Cloning is cheap and shares the same underlying aligned allocation.
806///
807/// # View Layout
808///
809/// ```text
810/// [0................offset...........offset+len...........capacity]
811///  ^                 ^                   ^                    ^
812///  |                 |                   |                    |
813///  allocation start  first readable      end of readable      allocation end
814///                    byte of this view   region for this view
815/// ```
816///
817/// Regions:
818/// - `[0..offset)`: not readable from this view
819/// - `[offset..offset+len)`: readable bytes for this view
820/// - `[offset+len..capacity)`: not readable from this view
821///
822/// # Invariants
823///
824/// - `offset <= capacity`
825/// - `offset + len <= capacity`
826///
827/// This representation allows sliced views to preserve their current readable
828/// window while still supporting `try_into_mut` when uniquely owned.
829#[derive(Clone)]
830pub(crate) struct PooledBuf {
831    inner: Arc<PooledBufInner>,
832    offset: usize,
833    len: usize,
834}
835
836impl std::fmt::Debug for PooledBuf {
837    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
838        f.debug_struct("PooledBuf")
839            .field("offset", &self.offset)
840            .field("len", &self.len)
841            .field("capacity", &self.inner.capacity())
842            .finish()
843    }
844}
845
846impl PooledBuf {
847    /// Returns `true` if this buffer is tracked by a pool.
848    ///
849    /// Tracked buffers originate from [`BufferPool`] allocations and are
850    /// returned to their pool when dropped.
851    ///
852    /// Untracked fallback allocations from [`BufferPool::alloc`] return `false`.
853    #[inline]
854    pub fn is_tracked(&self) -> bool {
855        self.inner.pool.strong_count() > 0
856    }
857
858    /// Returns a pointer to the first readable byte.
859    #[inline]
860    pub fn as_ptr(&self) -> *const u8 {
861        // SAFETY: offset is always within the underlying allocation.
862        unsafe { self.inner.buffer.as_ptr().add(self.offset) }
863    }
864
865    /// Returns a slice of this view (zero-copy).
866    ///
867    /// The range is resolved relative to this view's readable window
868    /// (`0..self.len`), not relative to the allocation start.
869    ///
870    /// Returns `None` for empty ranges, allowing callers to detach from the
871    /// underlying pooled allocation.
872    pub fn slice(&self, range: impl RangeBounds<usize>) -> Option<Self> {
873        let start = match range.start_bound() {
874            Bound::Included(&n) => n,
875            Bound::Excluded(&n) => n.checked_add(1).expect("range start overflow"),
876            Bound::Unbounded => 0,
877        };
878        let end = match range.end_bound() {
879            Bound::Included(&n) => n.checked_add(1).expect("range end overflow"),
880            Bound::Excluded(&n) => n,
881            Bound::Unbounded => self.len,
882        };
883        assert!(start <= end, "slice start must be <= end");
884        assert!(end <= self.len, "slice out of bounds");
885
886        if start == end {
887            return None;
888        }
889
890        Some(Self {
891            inner: self.inner.clone(),
892            offset: self.offset + start,
893            len: end - start,
894        })
895    }
896
897    /// Splits the buffer into two at the given index.
898    ///
899    /// Afterwards `self` contains bytes `[at, len)`, and the returned [`PooledBuf`]
900    /// contains bytes `[0, at)`.
901    ///
902    /// This is an `O(1)` zero-copy operation.
903    ///
904    /// # Panics
905    ///
906    /// Panics if `at > len`.
907    #[inline]
908    pub fn split_to(&mut self, at: usize) -> Self {
909        assert!(
910            at <= self.len,
911            "split_to out of bounds: {:?} <= {:?}",
912            at,
913            self.len,
914        );
915
916        let prefix = Self {
917            inner: self.inner.clone(),
918            offset: self.offset,
919            len: at,
920        };
921
922        self.offset += at;
923        self.len -= at;
924        prefix
925    }
926
927    /// Try to recover mutable ownership without copying.
928    ///
929    /// This succeeds only when this is the sole remaining reference to the
930    /// underlying pooled allocation (`Arc` strong count is 1).
931    ///
932    /// On success, the returned mutable buffer preserves the readable bytes and
933    /// mutable capacity from this view's current offset to the end of the
934    /// allocation. This means uniquely-owned sliced views can also be recovered
935    /// as mutable buffers while keeping the same readable window.
936    ///
937    /// On failure, returns `self` unchanged.
938    pub fn try_into_mut(self) -> Result<PooledBufMut, Self> {
939        let Self { inner, offset, len } = self;
940        match Arc::try_unwrap(inner) {
941            // Preserve the existing readable view:
942            // - cursor = view start
943            // - len = view end
944            Ok(inner) => Ok(PooledBufMut {
945                inner: ManuallyDrop::new(inner),
946                cursor: offset,
947                len: offset.checked_add(len).expect("slice end overflow"),
948            }),
949            Err(inner) => Err(Self { inner, offset, len }),
950        }
951    }
952
953    /// Converts this pooled view into [`Bytes`] without copying.
954    ///
955    /// Empty views return detached [`Bytes::new`] so pooled memory is not
956    /// retained by an empty owner.
957    pub fn into_bytes(self) -> Bytes {
958        if self.len == 0 {
959            return Bytes::new();
960        }
961        Bytes::from_owner(self)
962    }
963}
964
965impl AsRef<[u8]> for PooledBuf {
966    #[inline]
967    fn as_ref(&self) -> &[u8] {
968        // SAFETY: offset/len are always bounded within the underlying allocation.
969        unsafe { std::slice::from_raw_parts(self.inner.buffer.as_ptr().add(self.offset), self.len) }
970    }
971}
972
973impl Buf for PooledBuf {
974    #[inline]
975    fn remaining(&self) -> usize {
976        self.len
977    }
978
979    #[inline]
980    fn chunk(&self) -> &[u8] {
981        self.as_ref()
982    }
983
984    #[inline]
985    fn advance(&mut self, cnt: usize) {
986        assert!(cnt <= self.len, "cannot advance past end of buffer");
987        self.offset += cnt;
988        self.len -= cnt;
989    }
990
991    #[inline]
992    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
993        assert!(len <= self.len, "copy_to_bytes out of bounds");
994        if len == 0 {
995            return Bytes::new();
996        }
997        let slice = Self {
998            inner: self.inner.clone(),
999            offset: self.offset,
1000            len,
1001        };
1002        self.advance(len);
1003        slice.into_bytes()
1004    }
1005}
1006
1007/// A mutable aligned buffer.
1008///
1009/// When dropped, the underlying buffer is returned to the pool if tracked,
1010/// or deallocated directly if untracked (e.g. fallback allocations).
1011///
1012/// # Buffer Layout
1013///
1014/// ```text
1015/// [0................cursor..............len.............raw_capacity]
1016///  ^                 ^                   ^                 ^
1017///  |                 |                   |                 |
1018///  allocation start  read position       write position    allocation end
1019///                    (consumed prefix)   (initialized)
1020///
1021/// Regions:
1022/// - [0..cursor]:        consumed (via Buf::advance), no longer accessible
1023/// - [cursor..len]:      readable bytes (as_ref returns this slice)
1024/// - [len..raw_capacity]: uninitialized, writable via BufMut
1025/// ```
1026///
1027/// # Invariants
1028///
1029/// - `cursor <= len <= raw_capacity`
1030/// - Bytes in `0..len` have been initialized (safe to read)
1031/// - Bytes in `len..raw_capacity` are uninitialized (write-only via [`BufMut`])
1032///
1033/// # Computed Values
1034///
1035/// - `len()` = readable bytes = `self.len - cursor`
1036/// - `capacity()` = view capacity = `raw_capacity - cursor` (shrinks after advance)
1037/// - `remaining_mut()` = writable bytes = `raw_capacity - self.len`
1038///
1039/// This matches [`bytes::BytesMut`] semantics.
1040///
1041/// # Fixed Capacity
1042///
1043/// Unlike [`bytes::BytesMut`], pooled buffers have **fixed capacity** and do
1044/// NOT grow automatically. Calling [`BufMut::put_slice`] or other [`BufMut`]
1045/// methods that would exceed capacity will panic (per the [`BufMut`] trait
1046/// contract).
1047///
1048/// Always check `remaining_mut()` before writing variable-length data.
1049pub(crate) struct PooledBufMut {
1050    inner: ManuallyDrop<PooledBufInner>,
1051    /// Read cursor position (for `Buf` trait).
1052    cursor: usize,
1053    /// Number of bytes written (initialized).
1054    len: usize,
1055}
1056
1057impl std::fmt::Debug for PooledBufMut {
1058    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1059        f.debug_struct("PooledBufMut")
1060            .field("cursor", &self.cursor)
1061            .field("len", &self.len)
1062            .field("capacity", &self.capacity())
1063            .finish()
1064    }
1065}
1066
1067impl PooledBufMut {
1068    const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
1069        Self {
1070            inner: ManuallyDrop::new(PooledBufInner::new(buffer, pool)),
1071            cursor: 0,
1072            len: 0,
1073        }
1074    }
1075
1076    /// Returns `true` if this buffer is tracked by a pool.
1077    ///
1078    /// Tracked buffers originate from [`BufferPool`] allocations and are
1079    /// returned to their pool when dropped.
1080    ///
1081    /// Untracked fallback allocations from [`BufferPool::alloc`] return `false`.
1082    #[inline]
1083    pub fn is_tracked(&self) -> bool {
1084        self.inner.pool.strong_count() > 0
1085    }
1086
1087    /// Returns the number of readable bytes remaining in the buffer.
1088    ///
1089    /// This is `len - cursor`, matching [`bytes::BytesMut`] semantics.
1090    #[inline]
1091    pub const fn len(&self) -> usize {
1092        self.len - self.cursor
1093    }
1094
1095    /// Returns true if no readable bytes remain.
1096    #[inline]
1097    pub const fn is_empty(&self) -> bool {
1098        self.cursor == self.len
1099    }
1100
1101    /// Returns the number of bytes the buffer can hold without reallocating.
1102    #[inline]
1103    pub fn capacity(&self) -> usize {
1104        self.inner.capacity() - self.cursor
1105    }
1106
1107    /// Returns the raw allocation capacity (internal use only).
1108    #[inline]
1109    fn raw_capacity(&self) -> usize {
1110        self.inner.capacity()
1111    }
1112
1113    /// Returns an unsafe mutable pointer to the buffer's data.
1114    #[inline]
1115    pub fn as_mut_ptr(&mut self) -> *mut u8 {
1116        // SAFETY: cursor is always <= raw capacity
1117        unsafe { self.inner.buffer.as_ptr().add(self.cursor) }
1118    }
1119
1120    /// Sets the length of the buffer (view-relative).
1121    ///
1122    /// This will explicitly set the size of the buffer without actually
1123    /// modifying the data, so it is up to the caller to ensure that the data
1124    /// has been initialized.
1125    ///
1126    /// The `len` parameter is relative to the current view (after any `advance`
1127    /// calls), matching [`bytes::BytesMut::set_len`] semantics.
1128    ///
1129    /// # Safety
1130    ///
1131    /// Caller must ensure:
1132    /// - All bytes in the range `[cursor, cursor + len)` are initialized
1133    /// - `len <= capacity()` (where capacity is view-relative)
1134    #[inline]
1135    pub const unsafe fn set_len(&mut self, len: usize) {
1136        self.len = self.cursor + len;
1137    }
1138
1139    /// Clears the buffer, removing all data. Existing capacity is preserved.
1140    #[inline]
1141    pub const fn clear(&mut self) {
1142        self.len = self.cursor;
1143    }
1144
1145    /// Truncates the buffer to at most `len` readable bytes.
1146    ///
1147    /// If `len` is greater than the current readable length, this has no effect.
1148    /// This operates on readable bytes (after cursor), matching
1149    /// [`bytes::BytesMut::truncate`] semantics for buffers that have been advanced.
1150    #[inline]
1151    pub const fn truncate(&mut self, len: usize) {
1152        if len < self.len() {
1153            self.len = self.cursor + len;
1154        }
1155    }
1156
1157    /// Convert into an immutable pooled view over the current readable window.
1158    fn into_pooled(self) -> PooledBuf {
1159        // Wrap self in ManuallyDrop first to prevent Drop from running
1160        // if any subsequent code panics.
1161        let mut me = ManuallyDrop::new(self);
1162        // SAFETY: me is wrapped in ManuallyDrop so its Drop impl won't run.
1163        // ManuallyDrop::take moves the inner value out, leaving the wrapper empty.
1164        let inner = unsafe { ManuallyDrop::take(&mut me.inner) };
1165        PooledBuf {
1166            inner: Arc::new(inner),
1167            offset: me.cursor,
1168            len: me.len - me.cursor,
1169        }
1170    }
1171
1172    /// Freezes the buffer into an immutable [`IoBuf`].
1173    ///
1174    /// Only the readable portion (`cursor..len`) is included in the result.
1175    /// The underlying buffer will be returned to the pool when all references
1176    /// to the [`IoBuf`] (including slices) are dropped.
1177    pub fn freeze(self) -> IoBuf {
1178        IoBuf::from_pooled(self.into_pooled())
1179    }
1180
1181    /// Converts the current readable window into [`Bytes`] without copying.
1182    ///
1183    /// Empty buffers return detached [`Bytes::new`] so pooled memory is not
1184    /// retained by an empty owner.
1185    pub fn into_bytes(self) -> Bytes {
1186        if self.is_empty() {
1187            return Bytes::new();
1188        }
1189        Bytes::from_owner(self.into_pooled())
1190    }
1191}
1192
1193impl AsRef<[u8]> for PooledBufMut {
1194    #[inline]
1195    fn as_ref(&self) -> &[u8] {
1196        // SAFETY: bytes from cursor..len have been initialized.
1197        unsafe {
1198            std::slice::from_raw_parts(self.inner.buffer.as_ptr().add(self.cursor), self.len())
1199        }
1200    }
1201}
1202
1203impl AsMut<[u8]> for PooledBufMut {
1204    #[inline]
1205    fn as_mut(&mut self) -> &mut [u8] {
1206        let len = self.len();
1207        // SAFETY: bytes from cursor..len have been initialized.
1208        unsafe { std::slice::from_raw_parts_mut(self.inner.buffer.as_ptr().add(self.cursor), len) }
1209    }
1210}
1211
1212impl Drop for PooledBufMut {
1213    fn drop(&mut self) {
1214        // SAFETY: Drop is only called once. freeze() wraps self in ManuallyDrop
1215        // to prevent this Drop impl from running after ownership is transferred.
1216        unsafe { ManuallyDrop::drop(&mut self.inner) };
1217    }
1218}
1219
1220impl Buf for PooledBufMut {
1221    #[inline]
1222    fn remaining(&self) -> usize {
1223        self.len - self.cursor
1224    }
1225
1226    #[inline]
1227    fn chunk(&self) -> &[u8] {
1228        // SAFETY: bytes from cursor..len have been initialized.
1229        unsafe {
1230            std::slice::from_raw_parts(
1231                self.inner.buffer.as_ptr().add(self.cursor),
1232                self.len - self.cursor,
1233            )
1234        }
1235    }
1236
1237    #[inline]
1238    fn advance(&mut self, cnt: usize) {
1239        let remaining = self.len - self.cursor;
1240        assert!(cnt <= remaining, "cannot advance past end of buffer");
1241        self.cursor += cnt;
1242    }
1243}
1244
1245// SAFETY: BufMut implementation for PooledBufMut.
1246// - `remaining_mut()` reports bytes available for writing (raw_capacity - len)
1247// - `chunk_mut()` returns uninitialized memory from len to raw_capacity
1248// - `advance_mut()` advances len within bounds
1249unsafe impl BufMut for PooledBufMut {
1250    #[inline]
1251    fn remaining_mut(&self) -> usize {
1252        self.raw_capacity() - self.len
1253    }
1254
1255    #[inline]
1256    unsafe fn advance_mut(&mut self, cnt: usize) {
1257        assert!(
1258            cnt <= self.remaining_mut(),
1259            "cannot advance past end of buffer"
1260        );
1261        self.len += cnt;
1262    }
1263
1264    #[inline]
1265    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
1266        let raw_cap = self.raw_capacity();
1267        let len = self.len;
1268        // SAFETY: We have exclusive access and the slice is within raw capacity.
1269        unsafe {
1270            let ptr = self.inner.buffer.as_ptr().add(len);
1271            bytes::buf::UninitSlice::from_raw_parts_mut(ptr, raw_cap - len)
1272        }
1273    }
1274}
1275
1276#[cfg(test)]
1277mod tests {
1278    use super::*;
1279    use bytes::BytesMut;
1280    use std::{sync::mpsc, thread};
1281
1282    fn test_registry() -> Registry {
1283        Registry::default()
1284    }
1285
1286    /// Creates a test config with page alignment.
1287    fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
1288        BufferPoolConfig {
1289            min_size: NZUsize!(min_size),
1290            max_size: NZUsize!(max_size),
1291            max_per_class: NZUsize!(max_per_class),
1292            prefill: false,
1293            alignment: NZUsize!(page_size()),
1294        }
1295    }
1296
1297    #[test]
1298    fn test_page_size() {
1299        let size = page_size();
1300        assert!(size >= 4096);
1301        assert!(size.is_power_of_two());
1302    }
1303
1304    #[test]
1305    fn test_aligned_buffer() {
1306        let page = page_size();
1307        let buf = AlignedBuffer::new(4096, page);
1308        assert_eq!(buf.capacity(), 4096);
1309        assert!((buf.as_ptr() as usize).is_multiple_of(page));
1310
1311        // Test with cache-line alignment
1312        let cache_line = cache_line_size();
1313        let buf2 = AlignedBuffer::new(4096, cache_line);
1314        assert_eq!(buf2.capacity(), 4096);
1315        assert!((buf2.as_ptr() as usize).is_multiple_of(cache_line));
1316    }
1317
1318    #[test]
1319    #[should_panic(expected = "capacity must be greater than zero")]
1320    fn test_aligned_buffer_zero_capacity_panics() {
1321        let _ = AlignedBuffer::new(0, page_size());
1322    }
1323
1324    #[test]
1325    #[should_panic(expected = "capacity must be greater than zero")]
1326    fn test_aligned_buffer_zeroed_zero_capacity_panics() {
1327        let _ = AlignedBuffer::new_zeroed(0, page_size());
1328    }
1329
1330    #[test]
1331    fn test_config_validation() {
1332        let page = page_size();
1333        let config = test_config(page, page * 4, 10);
1334        config.validate();
1335    }
1336
1337    #[test]
1338    #[should_panic(expected = "min_size must be a power of two")]
1339    fn test_config_invalid_min_size() {
1340        let config = BufferPoolConfig {
1341            min_size: NZUsize!(3000),
1342            max_size: NZUsize!(8192),
1343            max_per_class: NZUsize!(10),
1344            prefill: false,
1345            alignment: NZUsize!(page_size()),
1346        };
1347        config.validate();
1348    }
1349
1350    #[test]
1351    fn test_config_class_index() {
1352        let page = page_size();
1353        let config = test_config(page, page * 8, 10);
1354
1355        // Classes: page, page*2, page*4, page*8
1356        assert_eq!(config.num_classes(), 4);
1357
1358        assert_eq!(config.class_index(1), Some(0));
1359        assert_eq!(config.class_index(page), Some(0));
1360        assert_eq!(config.class_index(page + 1), Some(1));
1361        assert_eq!(config.class_index(page * 2), Some(1));
1362        assert_eq!(config.class_index(page * 8), Some(3));
1363        assert_eq!(config.class_index(page * 8 + 1), None);
1364    }
1365
1366    #[test]
1367    fn test_pool_alloc_and_return() {
1368        let page = page_size();
1369        let mut registry = test_registry();
1370        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1371
1372        // Allocate a buffer - returns buffer with len=0, capacity >= requested
1373        let buf = pool.try_alloc(100).unwrap();
1374        assert!(buf.capacity() >= page);
1375        assert_eq!(buf.len(), 0);
1376
1377        // Drop returns to pool
1378        drop(buf);
1379
1380        // Can allocate again
1381        let buf2 = pool.try_alloc(100).unwrap();
1382        assert!(buf2.capacity() >= page);
1383        assert_eq!(buf2.len(), 0);
1384    }
1385
1386    #[test]
1387    fn test_alloc_len_sets_len() {
1388        let page = page_size();
1389        let mut registry = test_registry();
1390        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1391
1392        // SAFETY: we immediately initialize all bytes before reading.
1393        let mut buf = unsafe { pool.alloc_len(100) };
1394        assert_eq!(buf.len(), 100);
1395        buf.as_mut().fill(0xAB);
1396        let frozen = buf.freeze();
1397        assert_eq!(frozen.as_ref(), &[0xAB; 100]);
1398    }
1399
1400    #[test]
1401    fn test_alloc_zeroed_sets_len_and_zeros() {
1402        let page = page_size();
1403        let mut registry = test_registry();
1404        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1405
1406        let buf = pool.alloc_zeroed(100);
1407        assert_eq!(buf.len(), 100);
1408        assert!(buf.as_ref().iter().all(|&b| b == 0));
1409    }
1410
1411    #[test]
1412    fn test_try_alloc_zeroed_sets_len_and_zeros() {
1413        let page = page_size();
1414        let mut registry = test_registry();
1415        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1416
1417        let buf = pool.try_alloc_zeroed(100).unwrap();
1418        assert!(buf.is_pooled());
1419        assert_eq!(buf.len(), 100);
1420        assert!(buf.as_ref().iter().all(|&b| b == 0));
1421    }
1422
1423    #[test]
1424    fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
1425        let page = page_size();
1426        let mut registry = test_registry();
1427        let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1428
1429        // Exhaust pooled capacity for this class.
1430        let _pooled = pool.try_alloc(100).unwrap();
1431
1432        let buf = pool.alloc_zeroed(100);
1433        assert!(!buf.is_pooled());
1434        assert_eq!(buf.len(), 100);
1435        assert!(buf.as_ref().iter().all(|&b| b == 0));
1436    }
1437
1438    #[test]
1439    fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
1440        let page = page_size();
1441        let mut registry = test_registry();
1442        let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1443
1444        let mut first = pool.alloc_zeroed(100);
1445        assert!(first.is_pooled());
1446        assert!(first.as_ref().iter().all(|&b| b == 0));
1447
1448        // Dirty the buffer before returning it to the pool.
1449        first.as_mut().fill(0xAB);
1450        drop(first);
1451
1452        let second = pool.alloc_zeroed(100);
1453        assert!(second.is_pooled());
1454        assert_eq!(second.len(), 100);
1455        assert!(second.as_ref().iter().all(|&b| b == 0));
1456    }
1457
1458    #[test]
1459    fn test_pool_exhaustion() {
1460        let page = page_size();
1461        let mut registry = test_registry();
1462        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1463
1464        // Allocate max buffers
1465        let _buf1 = pool.try_alloc(100).expect("first alloc should succeed");
1466        let _buf2 = pool.try_alloc(100).expect("second alloc should succeed");
1467
1468        // Third allocation should fail
1469        assert!(pool.try_alloc(100).is_err());
1470    }
1471
1472    #[test]
1473    fn test_pool_oversized() {
1474        let page = page_size();
1475        let mut registry = test_registry();
1476        let pool = BufferPool::new(test_config(page, page * 2, 10), &mut registry);
1477
1478        // Request larger than max_size
1479        assert!(pool.try_alloc(page * 4).is_err());
1480    }
1481
1482    #[test]
1483    fn test_pool_size_classes() {
1484        let page = page_size();
1485        let mut registry = test_registry();
1486        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1487
1488        // Small request gets smallest class
1489        let buf1 = pool.try_alloc(100).unwrap();
1490        assert_eq!(buf1.capacity(), page);
1491
1492        // Larger request gets appropriate class
1493        let buf2 = pool.try_alloc(page + 1).unwrap();
1494        assert_eq!(buf2.capacity(), page * 2);
1495
1496        let buf3 = pool.try_alloc(page * 3).unwrap();
1497        assert_eq!(buf3.capacity(), page * 4);
1498    }
1499
1500    #[test]
1501    fn test_pooled_buf_mut_freeze() {
1502        let page = page_size();
1503        let mut registry = test_registry();
1504        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1505
1506        // Allocate and initialize a buffer
1507        let mut buf = pool.try_alloc(11).unwrap();
1508        buf.put_slice(&[0u8; 11]);
1509        assert_eq!(buf.len(), 11);
1510
1511        // Write some data
1512        buf.as_mut()[..5].copy_from_slice(&[1, 2, 3, 4, 5]);
1513
1514        // Freeze preserves the content
1515        let iobuf = buf.freeze();
1516        assert_eq!(iobuf.len(), 11);
1517        assert_eq!(&iobuf.as_ref()[..5], &[1, 2, 3, 4, 5]);
1518
1519        // IoBuf can be sliced
1520        let slice = iobuf.slice(0..5);
1521        assert_eq!(slice.len(), 5);
1522    }
1523
1524    #[test]
1525    fn test_prefill() {
1526        let page = NZUsize!(page_size());
1527        let mut registry = test_registry();
1528        let pool = BufferPool::new(
1529            BufferPoolConfig {
1530                min_size: page,
1531                max_size: page,
1532                max_per_class: NZUsize!(5),
1533                prefill: true,
1534                alignment: page,
1535            },
1536            &mut registry,
1537        );
1538
1539        // Should be able to allocate max_per_class buffers immediately
1540        let mut bufs = Vec::new();
1541        for _ in 0..5 {
1542            bufs.push(pool.try_alloc(100).expect("alloc should succeed"));
1543        }
1544
1545        // Next allocation should fail
1546        assert!(pool.try_alloc(100).is_err());
1547    }
1548
1549    #[test]
1550    fn test_config_for_network() {
1551        let config = BufferPoolConfig::for_network();
1552        config.validate();
1553        assert_eq!(config.min_size.get(), cache_line_size());
1554        assert_eq!(config.max_size.get(), 64 * 1024);
1555        assert_eq!(config.max_per_class.get(), 4096);
1556        assert!(!config.prefill);
1557        assert_eq!(config.alignment.get(), cache_line_size());
1558    }
1559
1560    #[test]
1561    fn test_config_for_storage() {
1562        let config = BufferPoolConfig::for_storage();
1563        config.validate();
1564        assert_eq!(config.min_size.get(), page_size());
1565        assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
1566        assert_eq!(config.max_per_class.get(), 32);
1567        assert!(!config.prefill);
1568        assert_eq!(config.alignment.get(), page_size());
1569    }
1570
1571    #[test]
1572    fn test_storage_config_supports_default_allocations() {
1573        let mut registry = test_registry();
1574        let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
1575
1576        let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
1577        assert_eq!(buf.capacity(), 8 * 1024 * 1024);
1578    }
1579
1580    #[test]
1581    fn test_config_builders() {
1582        let page = NZUsize!(page_size());
1583        let config = BufferPoolConfig::for_storage()
1584            .with_max_per_class(NZUsize!(64))
1585            .with_prefill(true)
1586            .with_min_size(page)
1587            .with_max_size(NZUsize!(128 * 1024));
1588
1589        config.validate();
1590        assert_eq!(config.min_size, page);
1591        assert_eq!(config.max_size.get(), 128 * 1024);
1592        assert_eq!(config.max_per_class.get(), 64);
1593        assert!(config.prefill);
1594
1595        // Storage profile alignment stays page-sized unless explicitly changed.
1596        assert_eq!(config.alignment.get(), page_size());
1597
1598        // Alignment can be tuned explicitly as long as min_size is also adjusted.
1599        let aligned = BufferPoolConfig::for_network()
1600            .with_alignment(NZUsize!(256))
1601            .with_min_size(NZUsize!(256));
1602        aligned.validate();
1603        assert_eq!(aligned.alignment.get(), 256);
1604        assert_eq!(aligned.min_size.get(), 256);
1605    }
1606
1607    #[test]
1608    fn test_config_with_budget_bytes() {
1609        // Classes: 4, 8, 16 (sum = 28). Budget 280 => max_per_class = 10.
1610        let config = BufferPoolConfig {
1611            min_size: NZUsize!(4),
1612            max_size: NZUsize!(16),
1613            max_per_class: NZUsize!(1),
1614            prefill: false,
1615            alignment: NZUsize!(4),
1616        }
1617        .with_budget_bytes(NZUsize!(280));
1618        assert_eq!(config.max_per_class.get(), 10);
1619
1620        // Budget 10 rounds up to one buffer per class.
1621        let small_budget = BufferPoolConfig {
1622            min_size: NZUsize!(4),
1623            max_size: NZUsize!(16),
1624            max_per_class: NZUsize!(1),
1625            prefill: false,
1626            alignment: NZUsize!(4),
1627        }
1628        .with_budget_bytes(NZUsize!(10));
1629        assert_eq!(small_budget.max_per_class.get(), 1);
1630    }
1631
1632    #[test]
1633    fn test_pool_error_display() {
1634        assert_eq!(
1635            PoolError::Oversized.to_string(),
1636            "requested capacity exceeds maximum buffer size"
1637        );
1638        assert_eq!(
1639            PoolError::Exhausted.to_string(),
1640            "pool exhausted for required size class"
1641        );
1642    }
1643
1644    #[test]
1645    fn test_config_invalid_range_edge_paths() {
1646        let invalid_order = BufferPoolConfig {
1647            min_size: NZUsize!(8),
1648            max_size: NZUsize!(4),
1649            max_per_class: NZUsize!(1),
1650            prefill: false,
1651            alignment: NZUsize!(4),
1652        };
1653        assert_eq!(invalid_order.num_classes(), 0);
1654        let unchanged = invalid_order.clone().with_budget_bytes(NZUsize!(128));
1655        assert_eq!(unchanged.max_per_class, invalid_order.max_per_class);
1656
1657        let non_power_two_max = BufferPoolConfig {
1658            min_size: NZUsize!(8),
1659            max_size: NZUsize!(12),
1660            max_per_class: NZUsize!(1),
1661            prefill: false,
1662            alignment: NZUsize!(4),
1663        };
1664        assert_eq!(non_power_two_max.class_index(12), None);
1665    }
1666
1667    #[test]
1668    fn test_pool_debug_and_config_accessor() {
1669        let page = page_size();
1670        let mut registry = test_registry();
1671        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1672
1673        let debug = format!("{pool:?}");
1674        assert!(debug.contains("BufferPool"));
1675        assert!(debug.contains("num_classes"));
1676        assert_eq!(pool.config().min_size.get(), page);
1677    }
1678
1679    #[test]
1680    fn test_return_buffer_freelist_full_drops_extra() {
1681        let page = page_size();
1682        let mut registry = test_registry();
1683        let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1684
1685        // Fill freelist with a returned tracked buffer.
1686        let tracked = pool.try_alloc(page).expect("tracked allocation");
1687        drop(tracked);
1688
1689        // Simulate one outstanding allocation, then return an extra same-class
1690        // buffer while freelist is already full to hit the Err(push) branch.
1691        let class_index = pool
1692            .inner
1693            .config
1694            .class_index(page)
1695            .expect("class exists for page-sized buffer");
1696        pool.inner.classes[class_index]
1697            .allocated
1698            .store(1, Ordering::Relaxed);
1699        pool.inner
1700            .return_buffer(AlignedBuffer::new(page, page_size()));
1701        assert_eq!(
1702            pool.inner.classes[class_index]
1703                .allocated
1704                .load(Ordering::Relaxed),
1705            0
1706        );
1707    }
1708
1709    #[test]
1710    fn test_return_buffer_ignores_unmatched_class() {
1711        let page = page_size();
1712        let mut registry = test_registry();
1713        let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1714
1715        // Size does not map to any configured class (`max_size == page`).
1716        pool.inner
1717            .return_buffer(AlignedBuffer::new(page * 2, page_size()));
1718        assert_eq!(get_allocated(&pool, page), 0);
1719    }
1720
1721    #[test]
1722    fn test_pooled_debug_and_empty_into_bytes_paths() {
1723        let page = page_size();
1724
1725        let pooled_mut_debug = {
1726            let pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
1727            format!("{pooled_mut:?}")
1728        };
1729        assert!(pooled_mut_debug.contains("PooledBufMut"));
1730        assert!(pooled_mut_debug.contains("cursor"));
1731
1732        let empty_from_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
1733        assert!(empty_from_mut.into_bytes().is_empty());
1734
1735        let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new()).into_pooled();
1736        let pooled_debug = format!("{pooled:?}");
1737        assert!(pooled_debug.contains("PooledBuf"));
1738        assert!(pooled_debug.contains("capacity"));
1739        assert!(pooled.into_bytes().is_empty());
1740    }
1741
1742    #[test]
1743    #[should_panic(expected = "range start overflow")]
1744    fn test_pooled_slice_excluded_start_overflow() {
1745        let page = page_size();
1746        let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new()).into_pooled();
1747        let _ = pooled.slice((Bound::Excluded(usize::MAX), Bound::<usize>::Unbounded));
1748    }
1749
1750    /// Helper to get the number of allocated buffers for a size class.
1751    fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1752        let class_index = pool.inner.config.class_index(size).unwrap();
1753        pool.inner.classes[class_index]
1754            .allocated
1755            .load(Ordering::Relaxed)
1756    }
1757
1758    /// Helper to get the number of available buffers in freelist for a size class.
1759    fn get_available(pool: &BufferPool, size: usize) -> i64 {
1760        let class_index = pool.inner.config.class_index(size).unwrap();
1761        let label = SizeClassLabel {
1762            size_class: pool.inner.classes[class_index].size as u64,
1763        };
1764        pool.inner.metrics.available.get_or_create(&label).get()
1765    }
1766
1767    #[test]
1768    fn test_freeze_returns_buffer_to_pool() {
1769        let page = page_size();
1770        let mut registry = test_registry();
1771        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1772
1773        // Initially: 0 allocated, 0 available
1774        assert_eq!(get_allocated(&pool, page), 0);
1775        assert_eq!(get_available(&pool, page), 0);
1776
1777        // Allocate and freeze
1778        let buf = pool.try_alloc(100).unwrap();
1779        assert_eq!(get_allocated(&pool, page), 1);
1780        assert_eq!(get_available(&pool, page), 0);
1781
1782        let iobuf = buf.freeze();
1783        // Still allocated (held by IoBuf)
1784        assert_eq!(get_allocated(&pool, page), 1);
1785
1786        // Drop the IoBuf - buffer should return to pool
1787        drop(iobuf);
1788        assert_eq!(get_allocated(&pool, page), 0);
1789        assert_eq!(get_available(&pool, page), 1);
1790    }
1791
1792    #[test]
1793    fn test_refcount_and_copy_to_bytes_paths() {
1794        let page = page_size();
1795        let mut registry = test_registry();
1796        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1797
1798        // Refcount behavior:
1799        // - clone/slice keep the pooled allocation alive
1800        // - empty slice does not keep ownership
1801        {
1802            let mut buf = pool.try_alloc(100).unwrap();
1803            buf.put_slice(&[0xAA; 100]);
1804            let iobuf = buf.freeze();
1805            let clone = iobuf.clone();
1806            let slice = iobuf.slice(10..40);
1807            let empty = iobuf.slice(10..10);
1808            assert!(empty.is_empty());
1809            drop(iobuf);
1810            assert_eq!(get_allocated(&pool, page), 1);
1811            drop(slice);
1812            assert_eq!(get_allocated(&pool, page), 1);
1813            drop(clone);
1814            assert_eq!(get_allocated(&pool, page), 0);
1815        }
1816
1817        // IoBuf::copy_to_bytes behavior:
1818        // - zero-length copy is empty and non-advancing
1819        // - partial copy advances while keeping ownership alive
1820        // - full drain transfers ownership out of source
1821        // - zero-length copy on already-empty source stays detached
1822        {
1823            let mut buf = pool.try_alloc(100).unwrap();
1824            buf.put_slice(&[0x42; 100]);
1825            let mut iobuf = buf.freeze();
1826
1827            let zero = iobuf.copy_to_bytes(0);
1828            assert!(zero.is_empty());
1829            assert_eq!(iobuf.remaining(), 100);
1830
1831            let partial = iobuf.copy_to_bytes(30);
1832            assert_eq!(&partial[..], &[0x42; 30]);
1833            assert_eq!(iobuf.remaining(), 70);
1834
1835            let rest = iobuf.copy_to_bytes(70);
1836            assert_eq!(&rest[..], &[0x42; 70]);
1837            assert_eq!(iobuf.remaining(), 0);
1838
1839            // Zero-length copy on empty should not transfer ownership.
1840            let empty = iobuf.copy_to_bytes(0);
1841            assert!(empty.is_empty());
1842
1843            drop(iobuf);
1844            assert_eq!(get_allocated(&pool, page), 1);
1845            drop(zero);
1846            drop(partial);
1847            assert_eq!(get_allocated(&pool, page), 1);
1848            drop(rest);
1849            assert_eq!(get_allocated(&pool, page), 0);
1850        }
1851
1852        // IoBufMut::copy_to_bytes mirrors the immutable ownership semantics.
1853        {
1854            let buf = pool.try_alloc(100).unwrap();
1855            let mut iobufmut = buf;
1856            iobufmut.put_slice(&[0x7E; 100]);
1857
1858            let zero = iobufmut.copy_to_bytes(0);
1859            assert!(zero.is_empty());
1860            assert_eq!(iobufmut.remaining(), 100);
1861
1862            let partial = iobufmut.copy_to_bytes(30);
1863            assert_eq!(&partial[..], &[0x7E; 30]);
1864            assert_eq!(iobufmut.remaining(), 70);
1865
1866            let rest = iobufmut.copy_to_bytes(70);
1867            assert_eq!(&rest[..], &[0x7E; 70]);
1868            assert_eq!(iobufmut.remaining(), 0);
1869
1870            drop(iobufmut);
1871            assert_eq!(get_allocated(&pool, page), 1);
1872            drop(zero);
1873            drop(partial);
1874            assert_eq!(get_allocated(&pool, page), 1);
1875            drop(rest);
1876            assert_eq!(get_allocated(&pool, page), 0);
1877        }
1878    }
1879
1880    #[test]
1881    fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
1882        // IoBuf -> IoBufMut should recover pooled ownership for unique non-full views.
1883        let page = page_size();
1884        let mut registry = test_registry();
1885        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1886
1887        let buf = pool.try_alloc(100).unwrap();
1888        assert_eq!(get_allocated(&pool, page), 1);
1889
1890        let iobuf = buf.freeze();
1891        assert_eq!(get_allocated(&pool, page), 1);
1892
1893        let iobufmut: IoBufMut = iobuf.into();
1894
1895        // Conversion reused pooled storage instead of copying.
1896        assert_eq!(
1897            get_allocated(&pool, page),
1898            1,
1899            "pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
1900        );
1901        assert_eq!(get_available(&pool, page), 0);
1902
1903        // Dropping returns the pooled buffer.
1904        drop(iobufmut);
1905        assert_eq!(get_allocated(&pool, page), 0);
1906        assert_eq!(get_available(&pool, page), 1);
1907    }
1908
1909    #[test]
1910    fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
1911        let page = page_size();
1912        let mut registry = test_registry();
1913        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1914
1915        let mut buf = pool.try_alloc(page).unwrap();
1916        buf.put_slice(&vec![0xEE; page]);
1917        let iobuf = buf.freeze();
1918
1919        let iobufmut: IoBufMut = iobuf.into();
1920        assert_eq!(iobufmut.len(), page);
1921        assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
1922        assert_eq!(get_allocated(&pool, page), 1);
1923        assert_eq!(get_available(&pool, page), 0);
1924
1925        drop(iobufmut);
1926        assert_eq!(get_allocated(&pool, page), 0);
1927        assert_eq!(get_available(&pool, page), 1);
1928    }
1929
1930    #[test]
1931    fn test_iobuf_try_into_mut_recycles_full_unique_view() {
1932        let page = page_size();
1933        let mut registry = test_registry();
1934        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1935
1936        let mut buf = pool.try_alloc(page).unwrap();
1937        buf.put_slice(&vec![0xAB; page]);
1938        let iobuf = buf.freeze();
1939        assert_eq!(get_allocated(&pool, page), 1);
1940
1941        let recycled = iobuf
1942            .try_into_mut()
1943            .expect("unique full-view pooled buffer should recycle");
1944        assert_eq!(recycled.len(), page);
1945        assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
1946        assert_eq!(recycled.capacity(), page);
1947        assert_eq!(get_allocated(&pool, page), 1);
1948
1949        drop(recycled);
1950        assert_eq!(get_allocated(&pool, page), 0);
1951        assert_eq!(get_available(&pool, page), 1);
1952    }
1953
1954    #[test]
1955    fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
1956        let page = page_size();
1957        let mut registry = test_registry();
1958        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1959
1960        // Unique sliced views can recover mutable ownership without copying.
1961        let mut buf = pool.try_alloc(page).unwrap();
1962        buf.put_slice(&vec![0xCD; page]);
1963        let iobuf = buf.freeze();
1964        let sliced = iobuf.slice(1..page);
1965        drop(iobuf);
1966        let recycled = sliced
1967            .try_into_mut()
1968            .expect("unique sliced pooled buffer should recycle");
1969        assert_eq!(recycled.len(), page - 1);
1970        assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
1971        assert_eq!(recycled.capacity(), page - 1);
1972        assert_eq!(get_allocated(&pool, page), 1);
1973        drop(recycled);
1974        assert_eq!(get_allocated(&pool, page), 0);
1975        assert_eq!(get_available(&pool, page), 1);
1976
1977        // Shared views still cannot recover mutable ownership.
1978        let mut buf = pool.try_alloc(page).unwrap();
1979        buf.put_slice(&vec![0xEF; page]);
1980        let iobuf = buf.freeze();
1981        let cloned = iobuf.clone();
1982        let iobuf = iobuf
1983            .try_into_mut()
1984            .expect_err("shared pooled buffer must not convert to mutable");
1985
1986        drop(cloned);
1987        drop(iobuf);
1988        assert_eq!(get_allocated(&pool, page), 0);
1989        assert!(get_available(&pool, page) >= 1);
1990    }
1991
1992    #[test]
1993    fn test_multithreaded_alloc_freeze_return() {
1994        let page = page_size();
1995        let mut registry = test_registry();
1996        let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
1997
1998        let mut handles = vec![];
1999
2000        // Reduce iterations under miri (atomics are slow)
2001        cfg_if::cfg_if! {
2002            if #[cfg(miri)] {
2003                let iterations = 100;
2004            } else {
2005                let iterations = 1000;
2006            }
2007        }
2008
2009        // Spawn multiple threads that allocate, freeze, clone, and drop
2010        for _ in 0..10 {
2011            let pool = pool.clone();
2012            let handle = thread::spawn(move || {
2013                for _ in 0..iterations {
2014                    let buf = pool.try_alloc(100).unwrap();
2015                    let iobuf = buf.freeze();
2016
2017                    // Clone a few times
2018                    let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
2019                    drop(iobuf);
2020
2021                    // Drop clones
2022                    for clone in clones {
2023                        drop(clone);
2024                    }
2025                }
2026            });
2027            handles.push(handle);
2028        }
2029
2030        // Wait for all threads
2031        for handle in handles {
2032            handle.join().unwrap();
2033        }
2034
2035        // All buffers should be returned
2036        let class_index = pool.inner.config.class_index(page).unwrap();
2037        let allocated = pool.inner.classes[class_index]
2038            .allocated
2039            .load(Ordering::Relaxed);
2040        assert_eq!(
2041            allocated, 0,
2042            "all buffers should be returned after multithreaded test"
2043        );
2044    }
2045
2046    #[test]
2047    fn test_cross_thread_buffer_return() {
2048        // Allocate on one thread, freeze, send to another thread, drop there
2049        let page = page_size();
2050        let mut registry = test_registry();
2051        let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
2052
2053        let (tx, rx) = mpsc::channel();
2054
2055        // Allocate and freeze on main thread
2056        for _ in 0..50 {
2057            let buf = pool.try_alloc(100).unwrap();
2058            let iobuf = buf.freeze();
2059            tx.send(iobuf).unwrap();
2060        }
2061        drop(tx);
2062
2063        // Receive and drop on another thread
2064        let handle = thread::spawn(move || {
2065            while let Ok(iobuf) = rx.recv() {
2066                drop(iobuf);
2067            }
2068        });
2069
2070        handle.join().unwrap();
2071
2072        // All buffers should be returned
2073        assert_eq!(get_allocated(&pool, page), 0);
2074    }
2075
2076    #[test]
2077    fn test_pool_dropped_before_buffer() {
2078        // What happens if the pool is dropped while buffers are still in use?
2079        // The Weak reference should fail to upgrade, and the buffer should just be deallocated.
2080
2081        let page = page_size();
2082        let mut registry = test_registry();
2083        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2084
2085        let mut buf = pool.try_alloc(100).unwrap();
2086        buf.put_slice(&[0u8; 100]);
2087        let iobuf = buf.freeze();
2088
2089        // Drop the pool while buffer is still alive
2090        drop(pool);
2091
2092        // Buffer should still be usable
2093        assert_eq!(iobuf.len(), 100);
2094
2095        // Dropping the buffer should not panic (Weak upgrade fails, buffer is deallocated)
2096        drop(iobuf);
2097        // No assertion here - we just want to make sure it doesn't panic
2098    }
2099
2100    /// Verify pooled IoBuf matches Bytes semantics for Buf trait methods.
2101    #[test]
2102    fn test_bytes_parity_iobuf_buf_trait() {
2103        let page = page_size();
2104        let mut registry = test_registry();
2105        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2106
2107        let data: Vec<u8> = (0..100u8).collect();
2108
2109        let mut pooled_mut = pool.try_alloc(data.len()).unwrap();
2110        pooled_mut.put_slice(&data);
2111        let mut pooled = pooled_mut.freeze();
2112        let mut bytes = Bytes::from(data);
2113
2114        // remaining() + chunk()
2115        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2116        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2117
2118        // advance()
2119        Buf::advance(&mut bytes, 13);
2120        Buf::advance(&mut pooled, 13);
2121        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2122        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2123
2124        // copy_to_bytes(0)
2125        let bytes_zero = Buf::copy_to_bytes(&mut bytes, 0);
2126        let pooled_zero = Buf::copy_to_bytes(&mut pooled, 0);
2127        assert_eq!(bytes_zero, pooled_zero);
2128        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2129        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2130
2131        // copy_to_bytes(n)
2132        let bytes_mid = Buf::copy_to_bytes(&mut bytes, 17);
2133        let pooled_mid = Buf::copy_to_bytes(&mut pooled, 17);
2134        assert_eq!(bytes_mid, pooled_mid);
2135        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2136        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2137
2138        // copy_to_bytes(remaining)
2139        let remaining = Buf::remaining(&bytes);
2140        let bytes_rest = Buf::copy_to_bytes(&mut bytes, remaining);
2141        let pooled_rest = Buf::copy_to_bytes(&mut pooled, remaining);
2142        assert_eq!(bytes_rest, pooled_rest);
2143        assert_eq!(Buf::remaining(&bytes), 0);
2144        assert_eq!(Buf::remaining(&pooled), 0);
2145        assert!(!Buf::has_remaining(&bytes));
2146        assert!(!Buf::has_remaining(&pooled));
2147    }
2148
2149    /// Verify pooled IoBuf slice behavior matches Bytes for content semantics.
2150    #[test]
2151    fn test_bytes_parity_iobuf_slice() {
2152        let page = page_size();
2153        let mut registry = test_registry();
2154        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2155
2156        let data: Vec<u8> = (0..32u8).collect();
2157        let mut pooled_mut = pool.try_alloc(data.len()).unwrap();
2158        pooled_mut.put_slice(&data);
2159        let pooled = pooled_mut.freeze();
2160        let bytes = Bytes::from(data);
2161
2162        assert_eq!(pooled.slice(..5).as_ref(), bytes.slice(..5).as_ref());
2163        assert_eq!(pooled.slice(6..).as_ref(), bytes.slice(6..).as_ref());
2164        assert_eq!(pooled.slice(3..8).as_ref(), bytes.slice(3..8).as_ref());
2165        assert_eq!(pooled.slice(..=7).as_ref(), bytes.slice(..=7).as_ref());
2166        assert_eq!(pooled.slice(10..10).as_ref(), bytes.slice(10..10).as_ref());
2167    }
2168
2169    #[test]
2170    fn test_bytes_parity_iobuf_split_to() {
2171        let page = page_size();
2172        let mut pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
2173        pooled_mut.put_slice(b"abcdefgh");
2174        let mut pooled = pooled_mut.into_pooled();
2175        let mut bytes = Bytes::from_static(b"abcdefgh");
2176
2177        // split_to(0)
2178        assert_eq!(pooled.split_to(0).as_ref(), bytes.split_to(0).as_ref());
2179        assert_eq!(pooled.as_ref(), bytes.as_ref());
2180
2181        // split_to(n)
2182        assert_eq!(pooled.split_to(3).as_ref(), bytes.split_to(3).as_ref());
2183        assert_eq!(pooled.as_ref(), bytes.as_ref());
2184
2185        // split_to(remaining)
2186        let remaining = bytes.remaining();
2187        assert_eq!(
2188            pooled.split_to(remaining).as_ref(),
2189            bytes.split_to(remaining).as_ref()
2190        );
2191        assert_eq!(pooled.as_ref(), bytes.as_ref());
2192    }
2193
2194    #[test]
2195    #[should_panic(expected = "split_to out of bounds")]
2196    fn test_iobuf_split_to_out_of_bounds() {
2197        let page = page_size();
2198        let mut pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
2199        pooled_mut.put_slice(b"abc");
2200        let mut pooled = pooled_mut.into_pooled();
2201        let _ = pooled.split_to(4);
2202    }
2203
2204    /// Verify PooledBufMut matches BytesMut semantics for Buf trait.
2205    #[test]
2206    fn test_bytesmut_parity_buf_trait() {
2207        let page = page_size();
2208        let mut registry = test_registry();
2209        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2210
2211        let mut bytes = BytesMut::with_capacity(100);
2212        bytes.put_slice(&[0xAAu8; 50]);
2213
2214        let mut pooled = pool.try_alloc(100).unwrap();
2215        pooled.put_slice(&[0xAAu8; 50]);
2216
2217        // remaining()
2218        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2219
2220        // chunk()
2221        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2222
2223        // advance()
2224        Buf::advance(&mut bytes, 10);
2225        Buf::advance(&mut pooled, 10);
2226        assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
2227        assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
2228
2229        // advance to end
2230        let remaining = Buf::remaining(&bytes);
2231        Buf::advance(&mut bytes, remaining);
2232        Buf::advance(&mut pooled, remaining);
2233        assert_eq!(Buf::remaining(&bytes), 0);
2234        assert_eq!(Buf::remaining(&pooled), 0);
2235        assert!(!Buf::has_remaining(&bytes));
2236        assert!(!Buf::has_remaining(&pooled));
2237    }
2238
2239    /// Verify PooledBufMut matches BytesMut semantics for BufMut trait.
2240    #[test]
2241    fn test_bytesmut_parity_bufmut_trait() {
2242        let page = page_size();
2243        let mut registry = test_registry();
2244        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2245
2246        let mut bytes = BytesMut::with_capacity(100);
2247        let mut pooled = pool.try_alloc(100).unwrap();
2248
2249        // remaining_mut()
2250        assert!(BufMut::remaining_mut(&bytes) >= 100);
2251        assert!(BufMut::remaining_mut(&pooled) >= 100);
2252
2253        // put_slice()
2254        BufMut::put_slice(&mut bytes, b"hello");
2255        BufMut::put_slice(&mut pooled, b"hello");
2256        assert_eq!(bytes.as_ref(), pooled.as_ref());
2257
2258        // put_u8()
2259        BufMut::put_u8(&mut bytes, 0x42);
2260        BufMut::put_u8(&mut pooled, 0x42);
2261        assert_eq!(bytes.as_ref(), pooled.as_ref());
2262
2263        // chunk_mut() - verify we can write to it
2264        let bytes_chunk = BufMut::chunk_mut(&mut bytes);
2265        let pooled_chunk = BufMut::chunk_mut(&mut pooled);
2266        assert!(bytes_chunk.len() > 0);
2267        assert!(pooled_chunk.len() > 0);
2268    }
2269
2270    #[test]
2271    fn test_bytesmut_parity_after_advance_paths() {
2272        let page = page_size();
2273        let mut registry = test_registry();
2274        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
2275
2276        // truncate after advance
2277        {
2278            let mut bytes = BytesMut::with_capacity(100);
2279            bytes.put_slice(&[0xAAu8; 50]);
2280            Buf::advance(&mut bytes, 10);
2281            let mut pooled = pool.try_alloc(100).unwrap();
2282            pooled.put_slice(&[0xAAu8; 50]);
2283            Buf::advance(&mut pooled, 10);
2284            bytes.truncate(20);
2285            pooled.truncate(20);
2286            assert_eq!(bytes.as_ref(), pooled.as_ref());
2287        }
2288
2289        // clear after advance
2290        {
2291            let mut bytes = BytesMut::with_capacity(100);
2292            bytes.put_slice(&[0xAAu8; 50]);
2293            Buf::advance(&mut bytes, 10);
2294            let mut pooled = pool.try_alloc(100).unwrap();
2295            pooled.put_slice(&[0xAAu8; 50]);
2296            Buf::advance(&mut pooled, 10);
2297            bytes.clear();
2298            pooled.clear();
2299            assert_eq!(bytes.len(), 0);
2300            assert_eq!(pooled.len(), 0);
2301        }
2302
2303        // capacity/set_len/clear semantics after advance
2304        {
2305            let mut bytes = BytesMut::with_capacity(page);
2306            bytes.resize(50, 0xBB);
2307            Buf::advance(&mut bytes, 20);
2308            let mut pooled = pool.try_alloc(page).unwrap();
2309            pooled.put_slice(&[0xBB; 50]);
2310            Buf::advance(&mut pooled, 20);
2311            assert_eq!(bytes.capacity(), pooled.capacity());
2312            // SAFETY: shrink readable window to initialized region.
2313            unsafe {
2314                bytes.set_len(25);
2315                pooled.set_len(25);
2316            }
2317            assert_eq!(bytes.as_ref(), pooled.as_ref());
2318            let bytes_cap = bytes.capacity();
2319            let pooled_cap = pooled.capacity();
2320            bytes.clear();
2321            pooled.clear();
2322            assert_eq!(bytes.capacity(), bytes_cap);
2323            assert_eq!(pooled.capacity(), pooled_cap);
2324        }
2325
2326        // put after advance + truncate-beyond-len no-op
2327        {
2328            let mut bytes = BytesMut::with_capacity(100);
2329            bytes.resize(30, 0xAA);
2330            Buf::advance(&mut bytes, 10);
2331            bytes.put_slice(&[0xBB; 10]);
2332            bytes.truncate(100);
2333
2334            let mut pooled = pool.try_alloc(100).unwrap();
2335            pooled.put_slice(&[0xAA; 30]);
2336            Buf::advance(&mut pooled, 10);
2337            pooled.put_slice(&[0xBB; 10]);
2338            pooled.truncate(100);
2339            assert_eq!(bytes.as_ref(), pooled.as_ref());
2340        }
2341    }
2342
2343    /// Test pool exhaustion and recovery.
2344    #[test]
2345    fn test_pool_exhaustion_and_recovery() {
2346        let page = page_size();
2347        let mut registry = test_registry();
2348        let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
2349
2350        // Exhaust the pool
2351        let buf1 = pool.try_alloc(100).expect("first alloc");
2352        let buf2 = pool.try_alloc(100).expect("second alloc");
2353        let buf3 = pool.try_alloc(100).expect("third alloc");
2354        assert!(pool.try_alloc(100).is_err(), "pool should be exhausted");
2355
2356        // Return one buffer
2357        drop(buf1);
2358
2359        // Should be able to allocate again
2360        let buf4 = pool.try_alloc(100).expect("alloc after return");
2361        assert!(pool.try_alloc(100).is_err(), "pool exhausted again");
2362
2363        // Return all and verify freelist reuse
2364        drop(buf2);
2365        drop(buf3);
2366        drop(buf4);
2367
2368        assert_eq!(get_allocated(&pool, page), 0);
2369        assert_eq!(get_available(&pool, page), 3);
2370
2371        // Allocate again - should reuse from freelist
2372        let _buf5 = pool.try_alloc(100).expect("reuse from freelist");
2373        assert_eq!(get_available(&pool, page), 2);
2374    }
2375
2376    /// Test try_alloc error variants.
2377    #[test]
2378    fn test_try_alloc_errors() {
2379        let page = page_size();
2380        let mut registry = test_registry();
2381        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2382
2383        // Oversized request
2384        let result = pool.try_alloc(page * 10);
2385        assert_eq!(result.unwrap_err(), PoolError::Oversized);
2386
2387        // Exhaust pool
2388        let _buf1 = pool.try_alloc(100).unwrap();
2389        let _buf2 = pool.try_alloc(100).unwrap();
2390        let result = pool.try_alloc(100);
2391        assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2392    }
2393
2394    #[test]
2395    fn test_try_alloc_zeroed_errors() {
2396        let page = page_size();
2397        let mut registry = test_registry();
2398        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2399
2400        // Oversized request
2401        let result = pool.try_alloc_zeroed(page * 10);
2402        assert_eq!(result.unwrap_err(), PoolError::Oversized);
2403
2404        // Exhaust pool
2405        let _buf1 = pool.try_alloc_zeroed(100).unwrap();
2406        let _buf2 = pool.try_alloc_zeroed(100).unwrap();
2407        let result = pool.try_alloc_zeroed(100);
2408        assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2409    }
2410
2411    /// Test fallback allocation when pool is exhausted or oversized.
2412    #[test]
2413    fn test_fallback_allocation() {
2414        let page = page_size();
2415        let mut registry = test_registry();
2416        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2417
2418        // Exhaust the pool
2419        let buf1 = pool.try_alloc(100).unwrap();
2420        let buf2 = pool.try_alloc(100).unwrap();
2421        assert!(buf1.is_pooled());
2422        assert!(buf2.is_pooled());
2423
2424        // Fallback via alloc() when exhausted - still aligned, but untracked
2425        let mut fallback_exhausted = pool.alloc(100);
2426        assert!(!fallback_exhausted.is_pooled());
2427        assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
2428
2429        // Fallback via alloc() when oversized - still aligned, but untracked
2430        let mut fallback_oversized = pool.alloc(page * 10);
2431        assert!(!fallback_oversized.is_pooled());
2432        assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
2433
2434        // Verify pool counters unchanged by fallback allocations
2435        assert_eq!(get_allocated(&pool, page), 2);
2436
2437        // Drop fallback buffers - should not affect pool counters
2438        drop(fallback_exhausted);
2439        drop(fallback_oversized);
2440        assert_eq!(get_allocated(&pool, page), 2);
2441
2442        // Drop tracked buffers - counters should decrease
2443        drop(buf1);
2444        drop(buf2);
2445        assert_eq!(get_allocated(&pool, page), 0);
2446    }
2447
2448    /// Test is_pooled method.
2449    #[test]
2450    fn test_is_pooled() {
2451        let page = page_size();
2452        let mut registry = test_registry();
2453        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2454
2455        let pooled = pool.try_alloc(100).unwrap();
2456        assert!(pooled.is_pooled());
2457
2458        let owned = IoBufMut::with_capacity(100);
2459        assert!(!owned.is_pooled());
2460    }
2461
2462    #[test]
2463    fn test_iobuf_is_pooled() {
2464        let page = page_size();
2465        let mut registry = test_registry();
2466        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2467
2468        let pooled = pool.try_alloc(100).unwrap().freeze();
2469        assert!(pooled.is_pooled());
2470
2471        // Oversized alloc uses untracked fallback allocation.
2472        let fallback = pool.alloc(page * 10).freeze();
2473        assert!(!fallback.is_pooled());
2474
2475        let bytes = IoBuf::copy_from_slice(b"hello");
2476        assert!(!bytes.is_pooled());
2477    }
2478
2479    #[test]
2480    fn test_buffer_alignment() {
2481        let page = page_size();
2482        let cache_line = cache_line_size();
2483        let mut registry = test_registry();
2484
2485        // Reduce max_per_class under miri (atomics are slow)
2486        cfg_if::cfg_if! {
2487            if #[cfg(miri)] {
2488                let storage_config = BufferPoolConfig {
2489                    max_per_class: NZUsize!(32),
2490                    ..BufferPoolConfig::for_storage()
2491                };
2492                let network_config = BufferPoolConfig {
2493                    max_per_class: NZUsize!(32),
2494                    ..BufferPoolConfig::for_network()
2495                };
2496            } else {
2497                let storage_config = BufferPoolConfig::for_storage();
2498                let network_config = BufferPoolConfig::for_network();
2499            }
2500        }
2501
2502        // Storage preset - page aligned
2503        let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
2504        let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
2505        assert_eq!(
2506            buf.as_mut_ptr() as usize % page,
2507            0,
2508            "storage buffer not page-aligned"
2509        );
2510
2511        // Network preset - cache-line aligned
2512        let network_buffer_pool = BufferPool::new(network_config, &mut registry);
2513        let mut buf = network_buffer_pool.try_alloc(100).unwrap();
2514        assert_eq!(
2515            buf.as_mut_ptr() as usize % cache_line,
2516            0,
2517            "network buffer not cache-line aligned"
2518        );
2519    }
2520
2521    #[test]
2522    fn test_alloc_and_freeze_view_paths() {
2523        let page = page_size();
2524        let mut registry = test_registry();
2525        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2526
2527        // Allocation edges
2528        let buf = pool.try_alloc(0).expect("zero capacity should succeed");
2529        assert_eq!(buf.capacity(), page);
2530        assert_eq!(buf.len(), 0);
2531        let buf = pool.try_alloc(page).expect("exact max size should succeed");
2532        assert_eq!(buf.capacity(), page);
2533
2534        // Freeze after full advance -> empty.
2535        let mut buf = pool.try_alloc(100).unwrap();
2536        buf.put_slice(&[0x42; 100]);
2537        Buf::advance(&mut buf, 100);
2538        assert!(buf.freeze().is_empty());
2539
2540        // Freeze after partial advance -> suffix view.
2541        let mut buf = pool.try_alloc(100).unwrap();
2542        buf.put_slice(&[0xAA; 50]);
2543        Buf::advance(&mut buf, 20);
2544        let frozen = buf.freeze();
2545        assert_eq!(frozen.len(), 30);
2546        assert_eq!(frozen.as_ref(), &[0xAA; 30]);
2547
2548        // Clear then freeze -> empty.
2549        let mut buf = pool.try_alloc(100).unwrap();
2550        buf.put_slice(&[0xAA; 50]);
2551        buf.clear();
2552        let frozen = buf.freeze();
2553        assert!(frozen.is_empty());
2554    }
2555
2556    #[test]
2557    fn test_interleaved_advance_and_write() {
2558        let page = page_size();
2559        let mut registry = test_registry();
2560        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2561
2562        let mut buf = pool.try_alloc(100).unwrap();
2563        buf.put_slice(b"hello");
2564        Buf::advance(&mut buf, 2);
2565        buf.put_slice(b"world");
2566        assert_eq!(buf.as_ref(), b"lloworld");
2567    }
2568
2569    #[test]
2570    fn test_alignment_after_advance() {
2571        let page = page_size();
2572        let mut registry = test_registry();
2573        let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
2574
2575        let mut buf = pool.try_alloc(100).unwrap();
2576        buf.put_slice(&[0; 100]);
2577
2578        // Initially aligned
2579        assert_eq!(buf.as_mut_ptr() as usize % page, 0);
2580
2581        // After advance, alignment may be broken
2582        Buf::advance(&mut buf, 7);
2583        // Pointer is now at offset 7, not page-aligned
2584        assert_ne!(buf.as_mut_ptr() as usize % page, 0);
2585    }
2586}