Skip to main content

commonware_runtime/iobuf/
pool.rs

1//! Buffer pool for efficient I/O operations.
2//!
3//! Provides pooled, aligned buffers that can be reused to reduce allocation
4//! overhead. Buffer alignment is configurable: use page alignment for storage I/O
5//! (required for direct I/O and DMA), or cache-line alignment for network I/O
6//! (reduces fragmentation).
7//!
8//! # Thread Safety
9//!
10//! [`BufferPool`] is `Send + Sync` and can be safely shared across threads.
11//! Allocation and deallocation are lock-free operations using atomic counters
12//! and a lock-free queue ([`crossbeam_queue::ArrayQueue`]).
13//!
14//! # Pool Lifecycle
15//!
16//! Each tracked buffer keeps a strong reference to the originating size class.
17//! Buffers can outlive the public [`BufferPool`] handle and still return to
18//! their original size class.
19//! - Untracked fallback allocations store no class reference and deallocate
20//!   directly when dropped.
21//! - Requests smaller than [`BufferPoolConfig::pool_min_size`] bypass pooling
22//!   entirely and return untracked aligned allocations from both
23//!   [`BufferPool::try_alloc`] and [`BufferPool::alloc`].
24//! - Dropping [`BufferPool`] drains only the shared global freelists,
25//!   checked-out buffers and buffers cached in a live thread's local cache can
26//!   keep their size class alive until they are dropped or the thread exits.
27//!
28//! # Size Classes
29//!
30//! Buffers are organized into power-of-two size classes from `min_size` to
31//! `max_size`. For example, with `min_size = 4096` and `max_size = 32768`:
32//! - Class 0: 4096 bytes
33//! - Class 1: 8192 bytes
34//! - Class 2: 16384 bytes
35//! - Class 3: 32768 bytes
36//!
37//! Allocation requests are rounded up to the next size class. Requests larger
38//! than `max_size` return [`PoolError::Oversized`] from [`BufferPool::try_alloc`],
39//! or fall back to an untracked aligned heap allocation from [`BufferPool::alloc`].
40//!
41//! # Cache Structure
42//!
43//! Each size class uses a two-level allocator:
44//! - a small per-thread local cache for steady-state same-thread reuse
45//! - a shared global freelist for refill and spill between threads
46//!
47//! When a local cache misses, the pool refills a small batch from the global
48//! freelist before attempting to create a new tracked buffer. Returned buffers
49//! first try to re-enter the dropping thread's local cache, spilling a bounded
50//! batch back to the global freelist if needed.
51
52use super::IoBufMut;
53use crate::iobuf::aligned::{AlignedBuffer, PooledBufMut};
54use commonware_utils::NZUsize;
55use crossbeam_queue::ArrayQueue;
56use prometheus_client::{
57    encoding::EncodeLabelSet,
58    metrics::{counter::Counter, family::Family, gauge::Gauge},
59    registry::Registry,
60};
61use std::{
62    cell::UnsafeCell,
63    num::NonZeroUsize,
64    sync::{
65        atomic::{AtomicUsize, Ordering},
66        Arc,
67    },
68};
69
70/// Minimum thread-cache size required before refill/spill starts batching.
71///
72/// Below this threshold TLS still provides same-thread locality, but batching
73/// would degrade to single-buffer moves and add policy complexity without
74/// amortizing shared-queue traffic.
75const MIN_THREAD_CACHE_BATCHING_CAPACITY: usize = 4;
76
77/// Error returned when buffer pool allocation fails.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum PoolError {
80    /// The requested capacity exceeds the maximum buffer size.
81    Oversized,
82    /// The pool is exhausted for the required size class.
83    Exhausted,
84}
85
86impl std::fmt::Display for PoolError {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
90            Self::Exhausted => write!(f, "pool exhausted for required size class"),
91        }
92    }
93}
94
95impl std::error::Error for PoolError {}
96
97/// Returns the system page size.
98///
99/// On Unix systems, queries the actual page size via `sysconf`.
100/// On other systems (Windows), defaults to 4KB.
101#[cfg(unix)]
102fn page_size() -> usize {
103    // SAFETY: sysconf is safe to call.
104    let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
105    if size <= 0 {
106        4096 // Safe fallback if sysconf fails
107    } else {
108        size as usize
109    }
110}
111
112#[cfg(not(unix))]
113#[allow(clippy::missing_const_for_fn)]
114fn page_size() -> usize {
115    4096
116}
117
118/// Returns the cache line size for the current architecture.
119///
120/// Uses 128 bytes for x86_64 and aarch64 as a conservative estimate that
121/// accounts for spatial prefetching. Uses 64 bytes for other architectures.
122///
123/// See: <https://github.com/crossbeam-rs/crossbeam/blob/983d56b6007ca4c22b56a665a7785f40f55c2a53/crossbeam-utils/src/cache_padded.rs>
124const fn cache_line_size() -> usize {
125    cfg_if::cfg_if! {
126        if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
127            128
128        } else {
129            64
130        }
131    }
132}
133
134/// Policy for sizing each thread's cache within a buffer pool size class.
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub(crate) enum BufferPoolThreadCacheConfig {
137    /// Disable thread-local caching and route all reuse through the shared global freelist.
138    Disabled,
139    /// Use an exact per-thread cache size for every size class.
140    Fixed(NonZeroUsize),
141    /// Derive a per-thread cache size from an expected level of parallelism.
142    ForParallelism(NonZeroUsize),
143}
144
145/// Configuration for a buffer pool.
146#[derive(Debug, Clone)]
147pub struct BufferPoolConfig {
148    /// Minimum request size that should use pooled allocation.
149    ///
150    /// Requests smaller than this bypass the pool and use direct aligned
151    /// allocation instead. A value of `0` means all eligible requests use the
152    /// pool.
153    pub pool_min_size: usize,
154    /// Minimum buffer size. Must be >= alignment and a power of two.
155    pub min_size: NonZeroUsize,
156    /// Maximum buffer size. Must be a power of two and >= min_size.
157    pub max_size: NonZeroUsize,
158    /// Maximum number of buffers per size class.
159    pub max_per_class: NonZeroUsize,
160    /// Whether to pre-allocate all buffers on pool creation.
161    pub prefill: bool,
162    /// Buffer alignment. Must be a power of two.
163    /// Use `page_size()` for storage I/O and `cache_line_size()` for network I/O.
164    pub alignment: NonZeroUsize,
165    /// Policy for sizing the per-thread local cache in each size class.
166    ///
167    /// [`Self::with_thread_cache_disabled`] bypasses thread-local caches.
168    /// [`Self::with_thread_cache_capacity`] uses an exact per-thread cache size.
169    /// [`Self::with_thread_cache_for_parallelism`] derives a size from the
170    /// expected level of parallelism.
171    pub(crate) thread_cache_config: BufferPoolThreadCacheConfig,
172}
173
174impl BufferPoolConfig {
175    /// Network I/O preset: cache-line aligned, 1KB to 64KB buffers,
176    /// 4096 per class, not prefilled.
177    ///
178    /// Network operations typically need multiple concurrent buffers per connection
179    /// (message, encoding, encryption) so we allow 4096 buffers per size class.
180    /// Cache-line alignment is used because network buffers don't require page
181    /// alignment for DMA, and smaller alignment reduces internal fragmentation.
182    pub const fn for_network() -> Self {
183        let cache_line = NZUsize!(cache_line_size());
184        Self {
185            pool_min_size: 1024,
186            min_size: NZUsize!(1024),
187            max_size: NZUsize!(64 * 1024),
188            max_per_class: NZUsize!(4096),
189            prefill: false,
190            alignment: cache_line,
191            thread_cache_config: BufferPoolThreadCacheConfig::Disabled,
192        }
193    }
194
195    /// Storage I/O preset: page-aligned, page_size to 8MB buffers, 32 per class,
196    /// not prefilled.
197    ///
198    /// Page alignment is required for direct I/O and efficient DMA transfers.
199    pub fn for_storage() -> Self {
200        let page = NZUsize!(page_size());
201        Self {
202            pool_min_size: 1024,
203            min_size: page,
204            max_size: NZUsize!(8 * 1024 * 1024),
205            max_per_class: NZUsize!(64),
206            prefill: false,
207            alignment: page,
208            thread_cache_config: BufferPoolThreadCacheConfig::Disabled,
209        }
210    }
211
212    /// Returns a copy of this config with a new minimum request size that uses pooling.
213    pub const fn with_pool_min_size(mut self, pool_min_size: usize) -> Self {
214        self.pool_min_size = pool_min_size;
215        self
216    }
217
218    /// Returns a copy of this config with a new minimum buffer size.
219    pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
220        self.min_size = min_size;
221        self
222    }
223
224    /// Returns a copy of this config with a new maximum buffer size.
225    pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
226        self.max_size = max_size;
227        self
228    }
229
230    /// Returns a copy of this config with a new maximum number of buffers per size class.
231    pub const fn with_max_per_class(mut self, max_per_class: NonZeroUsize) -> Self {
232        self.max_per_class = max_per_class;
233        self
234    }
235
236    /// Returns a copy of this config with an explicit per-thread cache size.
237    pub const fn with_thread_cache_capacity(mut self, thread_cache_capacity: NonZeroUsize) -> Self {
238        self.thread_cache_config = BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity);
239        self
240    }
241
242    /// Returns a copy of this config with thread-cache capacity derived from a parallelism hint.
243    ///
244    /// The final per-thread cache size is resolved when the pool is created, using the final
245    /// `max_per_class` value. The derived size reserves half the class budget for the shared
246    /// freelist and clamps the local cache to `[1, 8]`.
247    pub const fn with_thread_cache_for_parallelism(mut self, parallelism: NonZeroUsize) -> Self {
248        self.thread_cache_config = BufferPoolThreadCacheConfig::ForParallelism(parallelism);
249        self
250    }
251
252    /// Returns a copy of this config with thread-local caching disabled.
253    pub const fn with_thread_cache_disabled(mut self) -> Self {
254        self.thread_cache_config = BufferPoolThreadCacheConfig::Disabled;
255        self
256    }
257
258    /// Returns a copy of this config with a new prefill setting.
259    pub const fn with_prefill(mut self, prefill: bool) -> Self {
260        self.prefill = prefill;
261        self
262    }
263
264    /// Returns a copy of this config with a new alignment.
265    pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
266        self.alignment = alignment;
267        self
268    }
269
270    /// Returns a copy of this config sized for an approximate tracked-memory budget.
271    ///
272    /// This computes `max_per_class` as:
273    ///
274    /// `ceil(budget_bytes / sum(size_class_bytes))`
275    ///
276    /// where `size_class_bytes` includes every class from `min_size` to `max_size`.
277    /// This always rounds up to at least one buffer per size class, so the
278    /// resulting estimated capacity may exceed `budget_bytes`.
279    pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
280        let mut class_bytes = 0usize;
281        for i in 0..self.num_classes() {
282            class_bytes = class_bytes.saturating_add(self.class_size(i));
283        }
284        if class_bytes == 0 {
285            return self;
286        }
287        self.max_per_class = NZUsize!(budget_bytes.get().div_ceil(class_bytes));
288        self
289    }
290
291    /// Validates the configuration, panicking on invalid values.
292    ///
293    /// # Panics
294    ///
295    /// - `alignment` is not a power of two
296    /// - `min_size` is not a power of two
297    /// - `max_size` is not a power of two
298    /// - `min_size < alignment`
299    /// - `max_size < min_size`
300    /// - `pool_min_size > min_size`
301    /// - explicit `thread_cache_capacity > max_per_class`
302    fn validate(&self) {
303        assert!(
304            self.alignment.is_power_of_two(),
305            "alignment must be a power of two"
306        );
307        assert!(
308            self.min_size.is_power_of_two(),
309            "min_size must be a power of two"
310        );
311        assert!(
312            self.max_size.is_power_of_two(),
313            "max_size must be a power of two"
314        );
315        assert!(
316            self.min_size >= self.alignment,
317            "min_size ({}) must be >= alignment ({})",
318            self.min_size,
319            self.alignment
320        );
321        assert!(
322            self.max_size >= self.min_size,
323            "max_size must be >= min_size"
324        );
325        assert!(
326            self.pool_min_size <= self.min_size.get(),
327            "pool_min_size ({}) must be <= min_size ({})",
328            self.pool_min_size,
329            self.min_size
330        );
331        if let BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity) = self.thread_cache_config
332        {
333            assert!(
334                thread_cache_capacity <= self.max_per_class,
335                "thread_cache_capacity ({}) must be <= max_per_class ({})",
336                thread_cache_capacity,
337                self.max_per_class
338            );
339        }
340    }
341
342    /// Returns the number of size classes.
343    #[inline]
344    fn num_classes(&self) -> usize {
345        if self.max_size < self.min_size {
346            return 0;
347        }
348        // Classes are: min_size, min_size*2, min_size*4, ..., max_size
349        (self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
350    }
351
352    /// Returns the size class index for a given size.
353    /// Returns None if size > max_size.
354    #[inline]
355    fn class_index(&self, size: usize) -> Option<usize> {
356        if size > self.max_size.get() {
357            return None;
358        }
359        if size <= self.min_size.get() {
360            return Some(0);
361        }
362        // Find the smallest power-of-two class that fits
363        let size_class = size.next_power_of_two();
364        let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
365        if index < self.num_classes() {
366            Some(index)
367        } else {
368            None
369        }
370    }
371
372    /// Returns the buffer size for a given class index.
373    const fn class_size(&self, index: usize) -> usize {
374        self.min_size.get() << index
375    }
376
377    /// Resolves the effective per-thread cache size for each size class.
378    ///
379    /// Derived capacities reserve half of the class budget for the shared freelist so
380    /// cross-thread reuse remains effective, and are clamped to `[1, 8]` to cap
381    /// per-thread retention.
382    fn resolve_thread_cache_capacity(&self) -> usize {
383        match self.thread_cache_config {
384            BufferPoolThreadCacheConfig::Disabled => 0,
385            BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity) => {
386                thread_cache_capacity.get()
387            }
388            BufferPoolThreadCacheConfig::ForParallelism(parallelism) => {
389                let max_per_class = self.max_per_class.get();
390                let effective_threads = parallelism.get().min(max_per_class);
391                (max_per_class / (2 * effective_threads)).clamp(1, 8)
392            }
393        }
394    }
395}
396
397/// Label for buffer pool metrics, identifying the size class.
398#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
399struct SizeClassLabel {
400    size_class: u64,
401}
402
403/// Metrics for the buffer pool.
404struct PoolMetrics {
405    /// Number of tracked buffers currently created for the size class.
406    created: Family<SizeClassLabel, Gauge>,
407    /// Total number of failed allocations (pool exhausted).
408    exhausted_total: Family<SizeClassLabel, Counter>,
409    /// Total number of oversized allocation requests.
410    oversized_total: Counter,
411}
412
413impl PoolMetrics {
414    fn new(registry: &mut Registry) -> Self {
415        let metrics = Self {
416            created: Family::default(),
417            exhausted_total: Family::default(),
418            oversized_total: Counter::default(),
419        };
420
421        registry.register(
422            "buffer_pool_created",
423            "Number of tracked buffers currently created for the pool",
424            metrics.created.clone(),
425        );
426        registry.register(
427            "buffer_pool_exhausted_total",
428            "Total number of failed allocations due to pool exhaustion",
429            metrics.exhausted_total.clone(),
430        );
431        registry.register(
432            "buffer_pool_oversized_total",
433            "Total number of allocation requests exceeding max buffer size",
434            metrics.oversized_total.clone(),
435        );
436
437        metrics
438    }
439}
440
441/// Per-size-class state.
442///
443/// Each class is a small two-level allocator:
444/// - a shared global freelist for tracked buffers visible to all threads
445/// - a per-thread local cache for same-thread reuse
446/// - a `created` counter that caps the total number of tracked buffers
447///
448/// Allocation prefers the local cache, then refills from the global freelist,
449/// and only creates a new tracked buffer when no free buffer is available and
450/// the class still has remaining capacity.
451pub(super) struct SizeClass {
452    /// Dense global identifier for the TLS cache registry.
453    class_id: usize,
454    /// The buffer size for this class.
455    size: usize,
456    /// Buffer alignment.
457    alignment: usize,
458    /// Maximum number of tracked buffers for this class.
459    max: usize,
460    /// Global free list of tracked buffers available for reuse.
461    global: ArrayQueue<AlignedBuffer>,
462    /// Number of tracked buffers currently in existence for this class.
463    created: AtomicUsize,
464    /// Maximum number of buffers retained in the current thread's local bin.
465    thread_cache_capacity: usize,
466}
467
468// SAFETY: shared state in `SizeClass` is synchronized through atomics and the
469// global queue. Per-thread bins are stored in thread-local registries and only
470// accessed by the current thread.
471unsafe impl Send for SizeClass {}
472// SAFETY: see above.
473unsafe impl Sync for SizeClass {}
474
475impl SizeClass {
476    /// Creates a new size class with the given parameters.
477    ///
478    /// If `prefill` is true, allocates `max` buffers upfront and pushes them
479    /// into the global freelist.
480    fn new(
481        class_id: usize,
482        size: usize,
483        alignment: usize,
484        max: usize,
485        thread_cache_capacity: usize,
486        prefill: bool,
487    ) -> Self {
488        let freelist = ArrayQueue::new(max);
489        let mut created = 0;
490        if prefill {
491            for _ in 0..max {
492                let _ = freelist.push(AlignedBuffer::new(size, alignment));
493            }
494            created = max;
495        }
496        Self {
497            class_id,
498            size,
499            alignment,
500            max,
501            global: freelist,
502            created: AtomicUsize::new(created),
503            thread_cache_capacity,
504        }
505    }
506
507    /// Returns a tracked buffer to the global freelist.
508    #[inline]
509    fn push_global(&self, buffer: AlignedBuffer) {
510        self.global.push(buffer).unwrap_or_else(|_| {
511            unreachable!("tracked buffer should always fit in the global pool")
512        });
513    }
514
515    /// Atomically reserves capacity to create one new tracked buffer.
516    ///
517    /// Returns `true` if the reservation succeeded (i.e. `created < max`),
518    /// `false` if the class is at capacity.
519    fn try_reserve(&self) -> bool {
520        self.created
521            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |created| {
522                (created < self.max).then_some(created + 1)
523            })
524            .is_ok()
525    }
526}
527
528/// Free tracked buffer cached in the current thread's TLS registry.
529///
530/// This is allocator cache state, not a checked-out buffer. The cached
531/// `Arc<SizeClass>` is moved back into a checked-out pooled buffer on a local
532/// hit, or used to flush the buffer into the shared global freelist when the
533/// thread cache spills or is dropped on thread exit.
534struct TlsSizeClassCacheEntry {
535    buffer: AlignedBuffer,
536    class: Arc<SizeClass>,
537}
538
539/// Per-class thread-local cache for tracked buffers.
540///
541/// The hot steady-state path allocates from and returns to this cache. When
542/// the cache is full, small bins route overflow directly to the class-global
543/// freelist while larger bins spill a batch back to it. When the thread exits
544/// its remaining entries are flushed to that same global freelist.
545struct TlsSizeClassCache {
546    entries: Vec<TlsSizeClassCacheEntry>,
547    capacity: usize,
548}
549
550impl TlsSizeClassCache {
551    /// Creates a new empty cache with the given maximum thread-cache size.
552    fn new(capacity: usize) -> Self {
553        Self {
554            entries: Vec::with_capacity(capacity),
555            capacity,
556        }
557    }
558
559    /// Returns the number of buffers currently in this local cache.
560    #[inline]
561    const fn len(&self) -> usize {
562        self.entries.len()
563    }
564
565    /// Removes and returns the most recently cached buffer, if any.
566    #[inline]
567    fn pop(&mut self) -> Option<TlsSizeClassCacheEntry> {
568        self.entries.pop()
569    }
570
571    /// Pushes an entry into the local cache, spilling to global if full.
572    ///
573    /// Small local caches prioritize same-thread locality and route overflow
574    /// directly to the global freelist. Once the local cache is large enough
575    /// to batch effectively, half the entries are drained to amortize global
576    /// queue traffic across future returns.
577    fn push(&mut self, entry: TlsSizeClassCacheEntry) {
578        if self.entries.len() < self.capacity {
579            self.entries.push(entry);
580            return;
581        }
582
583        if self.capacity < MIN_THREAD_CACHE_BATCHING_CAPACITY {
584            entry.class.push_global(entry.buffer);
585            return;
586        }
587
588        // Spill half the cache to global to make room.
589        let spill = self.entries.len().min(self.capacity / 2).max(1);
590        for _ in 0..spill {
591            let spilled = self
592                .entries
593                .pop()
594                .expect("spill count must not exceed cached entries");
595            spilled.class.push_global(spilled.buffer);
596        }
597
598        self.entries.push(entry);
599    }
600}
601
602impl Drop for TlsSizeClassCache {
603    fn drop(&mut self) {
604        for entry in self.entries.drain(..) {
605            entry.class.push_global(entry.buffer);
606        }
607    }
608}
609
610// Each thread owns a sparse registry of per-size-class caches, indexed by the
611// global `SizeClass::class_id`.
612//
613// We intentionally use `Vec<Option<...>>` here:
614// - `class_id` values are dense enough for vector indexing to be cheap
615// - each thread typically touches only a subset of all size classes
616// - `None` represents "this thread has never initialized a cache for this id"
617//
618// This keeps the hot TLS-hit path to "index and branch" without a hash map or
619// any synchronization. The cost is that vectors can accumulate holes over time
620// because ids are not recycled.
621thread_local! {
622    static TLS_SIZE_CLASS_CACHES: UnsafeCell<Vec<Option<TlsSizeClassCache>>> =
623        const { UnsafeCell::new(Vec::new()) };
624}
625
626// Global allocator for `SizeClass::class_id`.
627//
628// Ids are monotonic and never reused. This is deliberate: a reused id would
629// require generation tracking or equivalent validation on every TLS cache
630// access to distinguish a live size class from stale per-thread cache state.
631// Keeping ids monotonic makes the TLS fast path cheaper and simpler at the
632// cost of leaving holes in `TLS_CLASS_CACHES` over process lifetime.
633static NEXT_SIZE_CLASS_ID: AtomicUsize = AtomicUsize::new(0);
634
635/// Utilities for managing the calling thread's local [`BufferPool`] caches.
636///
637/// Internally, each thread owns a sparse `Vec<Option<TlsSizeClassCache>>`
638/// keyed by `SizeClass::class_id`, with one per-size-class cache allocated
639/// lazily on first use. Thread exit naturally flushes cached buffers back to
640/// the shared global freelist because `TlsSizeClassCache` drains itself in
641/// `Drop`.
642///
643/// This type exists to keep the unsafe TLS access localized. All steady-state
644/// cache operations (`pop`, `push`, and `refill`) go through this facade rather
645/// than free functions over the `thread_local!` static.
646pub struct BufferPoolThreadCache;
647
648impl BufferPoolThreadCache {
649    /// Flushes all local caches for the current thread into the global freelists.
650    pub fn flush() {
651        TLS_SIZE_CLASS_CACHES.with(|bins| {
652            // SAFETY: this TLS value is only ever accessed by the current thread.
653            let bins = unsafe { &mut *bins.get() };
654            for cache in bins.iter_mut() {
655                let _ = cache.take();
656            }
657        });
658    }
659
660    /// Pops a cached buffer from the current thread's local cache for the
661    /// given size class. Returns `None` if the local cache is empty.
662    #[inline]
663    fn pop(class: &Arc<SizeClass>) -> Option<TlsSizeClassCacheEntry> {
664        Self::with_cache(class.class_id, class.thread_cache_capacity, |cache| {
665            cache.pop()
666        })
667    }
668
669    /// Returns a buffer to the current thread's local cache for the given
670    /// size class, spilling to the global freelist if the cache is full.
671    #[inline]
672    pub(super) fn push(class: Arc<SizeClass>, buffer: AlignedBuffer) {
673        let class_id = class.class_id;
674        let thread_cache_capacity = class.thread_cache_capacity;
675        Self::with_cache(class_id, thread_cache_capacity, |cache| {
676            cache.push(TlsSizeClassCacheEntry { buffer, class });
677        });
678    }
679
680    /// Batch-refills the local cache from the global freelist.
681    ///
682    /// Pulls up to `target - 1` buffers from global into the local cache. For
683    /// small local bins, batching is disabled and this becomes a no-op. Called
684    /// after a global pop succeeds, so the caller already holds one buffer and
685    /// we warm the cache for subsequent local hits when batching is enabled.
686    #[inline]
687    fn refill(class: &Arc<SizeClass>, target: usize) {
688        Self::with_cache(class.class_id, class.thread_cache_capacity, |cache| {
689            while cache.len() + 1 < target {
690                let Some(buffer) = class.global.pop() else {
691                    break;
692                };
693                cache.push(TlsSizeClassCacheEntry {
694                    buffer,
695                    class: class.clone(),
696                });
697            }
698        });
699    }
700
701    /// Accesses the current thread's local cache for `class_id`, creating it
702    /// lazily on first use, and invokes `f` on it.
703    #[inline]
704    fn with_cache<R>(
705        class_id: usize,
706        capacity: usize,
707        f: impl FnOnce(&mut TlsSizeClassCache) -> R,
708    ) -> R {
709        TLS_SIZE_CLASS_CACHES.with(|bins| {
710            // SAFETY: this TLS value is only ever accessed by the current thread.
711            let bins = unsafe { &mut *bins.get() };
712            if class_id >= bins.len() {
713                bins.resize_with(class_id + 1, || None);
714            }
715            let cache = bins[class_id].get_or_insert_with(|| TlsSizeClassCache::new(capacity));
716            f(cache)
717        })
718    }
719}
720
721/// Internal allocation result for pooled allocations.
722struct Allocation {
723    buffer: AlignedBuffer,
724    is_new: bool,
725    class: Arc<SizeClass>,
726}
727
728/// Internal state of the buffer pool.
729pub(crate) struct BufferPoolInner {
730    config: BufferPoolConfig,
731    classes: Vec<Arc<SizeClass>>,
732    metrics: PoolMetrics,
733}
734
735impl Drop for BufferPoolInner {
736    fn drop(&mut self) {
737        for class in &self.classes {
738            while let Some(buffer) = class.global.pop() {
739                class.created.fetch_sub(1, Ordering::Relaxed);
740                drop(buffer);
741            }
742        }
743    }
744}
745
746impl BufferPoolInner {
747    /// Try to allocate a buffer from the given size class.
748    ///
749    /// Uses a three-tier strategy:
750    /// 1. **Thread-local cache** (fast path): no atomics, no contention.
751    /// 2. **Global freelist**: atomic pop, then batch-refill the local cache
752    ///    when the local bin is large enough to amortize shared-queue traffic.
753    /// 3. **New allocation**: reserve capacity via CAS, allocate from heap.
754    ///
755    /// If `zero_on_new` is true, newly-created buffers are allocated with
756    /// `alloc_zeroed`. Reused buffers are never re-zeroed here.
757    fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
758        let class = &self.classes[class_index];
759
760        // Fast path: reuse from thread-local cache (no atomics, no metrics).
761        if let Some(entry) = BufferPoolThreadCache::pop(class) {
762            return Some(Allocation {
763                buffer: entry.buffer,
764                is_new: false,
765                class: entry.class,
766            });
767        }
768
769        // Medium path: refill from global freelist.
770        let target = (class.thread_cache_capacity / 2).max(1);
771        if let Some(buffer) = class.global.pop() {
772            BufferPoolThreadCache::refill(class, target);
773            return Some(Allocation {
774                buffer,
775                is_new: false,
776                class: class.clone(),
777            });
778        }
779
780        // Slow path: create a new tracked buffer (metrics only here).
781        let label = SizeClassLabel {
782            size_class: class.size as u64,
783        };
784        if !class.try_reserve() {
785            self.metrics.exhausted_total.get_or_create(&label).inc();
786            return None;
787        }
788
789        self.metrics.created.get_or_create(&label).inc();
790        let buffer = if zero_on_new {
791            AlignedBuffer::new_zeroed(class.size, class.alignment)
792        } else {
793            AlignedBuffer::new(class.size, class.alignment)
794        };
795        Some(Allocation {
796            buffer,
797            is_new: true,
798            class: class.clone(),
799        })
800    }
801}
802
803/// A pool of reusable, aligned buffers.
804///
805/// Buffers are organized into power-of-two size classes. When a buffer is requested,
806/// the smallest size class that fits is used. Buffers are automatically returned to
807/// the pool when dropped.
808///
809/// # Alignment
810///
811/// Buffer alignment is guaranteed only at the base pointer (when `cursor == 0`).
812/// After calling [`bytes::Buf::advance`], the pointer returned by `as_mut_ptr()` may
813/// no longer be aligned. For direct I/O operations that require alignment,
814/// do not advance the buffer before use.
815#[derive(Clone)]
816pub struct BufferPool {
817    inner: Arc<BufferPoolInner>,
818}
819
820impl std::fmt::Debug for BufferPool {
821    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822        f.debug_struct("BufferPool")
823            .field("config", &self.inner.config)
824            .field("num_classes", &self.inner.classes.len())
825            .finish()
826    }
827}
828
829impl BufferPool {
830    /// Creates a new buffer pool with the given configuration.
831    ///
832    /// # Panics
833    ///
834    /// Panics if the configuration is invalid.
835    pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
836        config.validate();
837        let metrics = PoolMetrics::new(registry);
838        let mut classes = Vec::with_capacity(config.num_classes());
839        let thread_cache_capacity = config.resolve_thread_cache_capacity();
840        for i in 0..config.num_classes() {
841            let size = config.class_size(i);
842            let class_id = NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed);
843            let class = Arc::new(SizeClass::new(
844                class_id,
845                size,
846                config.alignment.get(),
847                config.max_per_class.get(),
848                thread_cache_capacity,
849                config.prefill,
850            ));
851            classes.push(class);
852        }
853
854        // Update created metrics after prefill
855        if config.prefill {
856            for class in &classes {
857                let label = SizeClassLabel {
858                    size_class: class.size as u64,
859                };
860                let created = class.global.len() as i64;
861                metrics.created.get_or_create(&label).set(created);
862            }
863        }
864
865        Self {
866            inner: Arc::new(BufferPoolInner {
867                config,
868                classes,
869                metrics,
870            }),
871        }
872    }
873
874    /// Returns the size class index for `capacity`, recording oversized metrics on failure.
875    #[inline]
876    fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
877        let class_index = self.inner.config.class_index(capacity);
878        if class_index.is_none() {
879            self.inner.metrics.oversized_total.inc();
880        }
881        class_index
882    }
883
884    /// Attempts to allocate a pooled buffer.
885    ///
886    /// Unlike [`Self::alloc`], this method does not fall back to untracked
887    /// allocation on exhaustion or oversized requests. Requests smaller than
888    /// [`BufferPoolConfig::pool_min_size`] intentionally bypass pooling and
889    /// return an untracked aligned allocation instead.
890    ///
891    /// The returned buffer has `len() == 0` and `capacity() >= capacity`.
892    ///
893    /// # Initialization
894    ///
895    /// The returned buffer contains **uninitialized memory**. Do not read from
896    /// it until data has been written.
897    ///
898    /// # Errors
899    ///
900    /// - [`PoolError::Oversized`]: `capacity` exceeds `max_size`
901    /// - [`PoolError::Exhausted`]: Pool exhausted for required size class
902    pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
903        if capacity < self.inner.config.pool_min_size {
904            let size = capacity.max(1);
905            return Ok(IoBufMut::with_alignment(size, self.inner.config.alignment));
906        }
907
908        let class_index = self
909            .class_index_or_record_oversized(capacity)
910            .ok_or(PoolError::Oversized)?;
911
912        let buffer = self
913            .inner
914            .try_alloc(class_index, false)
915            .map(|allocation| PooledBufMut::new(allocation.buffer, allocation.class))
916            .ok_or(PoolError::Exhausted)?;
917        Ok(IoBufMut::from_pooled(buffer))
918    }
919
920    /// Allocates a buffer with capacity for at least `capacity` bytes.
921    ///
922    /// The returned buffer has `len() == 0` and `capacity() >= capacity`,
923    /// matching the semantics of [`IoBufMut::with_capacity`] and
924    /// [`bytes::BytesMut::with_capacity`]. Use [`bytes::BufMut::put_slice`] or
925    /// other [`bytes::BufMut`] methods to write data to the buffer.
926    ///
927    /// If the pool can provide a buffer (capacity within limits and pool not
928    /// exhausted), returns a pooled buffer that will be returned to the pool
929    /// when dropped. Requests smaller than [`BufferPoolConfig::pool_min_size`]
930    /// bypass pooling and return an untracked aligned allocation. Otherwise, oversized or
931    /// exhausted requests fall back to an untracked aligned heap allocation
932    /// that is deallocated when dropped.
933    ///
934    /// Use [`Self::try_alloc`] if you need pooled-only behavior.
935    ///
936    /// # Initialization
937    ///
938    /// The returned buffer contains **uninitialized memory**. Do not read from
939    /// it until data has been written.
940    pub fn alloc(&self, capacity: usize) -> IoBufMut {
941        self.try_alloc(capacity).unwrap_or_else(|_| {
942            let size = capacity.max(self.inner.config.min_size.get());
943            IoBufMut::with_alignment(size, self.inner.config.alignment)
944        })
945    }
946
947    /// Allocates a buffer and sets its readable length to `len` without
948    /// initializing bytes.
949    ///
950    /// Equivalent to [`Self::alloc`] followed by [`IoBufMut::set_len`].
951    ///
952    /// # Safety
953    ///
954    /// Caller must ensure all bytes are initialized before any read operation.
955    pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
956        let mut buf = self.alloc(len);
957        // SAFETY: guaranteed by caller.
958        unsafe { buf.set_len(len) };
959        buf
960    }
961
962    /// Attempts to allocate a zero-initialized pooled buffer.
963    ///
964    /// Unlike [`Self::alloc_zeroed`], this method does not fall back to
965    /// untracked allocation on exhaustion or oversized requests. Requests
966    /// smaller than [`BufferPoolConfig::pool_min_size`] intentionally bypass
967    /// pooling and return an untracked aligned allocation instead.
968    ///
969    /// The returned buffer has `len() == len` and `capacity() >= len`.
970    ///
971    /// # Initialization
972    ///
973    /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
974    /// may be uninitialized.
975    ///
976    /// # Errors
977    ///
978    /// - [`PoolError::Oversized`]: `len` exceeds `max_size`
979    /// - [`PoolError::Exhausted`]: Pool exhausted for required size class
980    pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
981        if len < self.inner.config.pool_min_size {
982            let size = len.max(1);
983            let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
984            buf.truncate(len);
985            return Ok(buf);
986        }
987
988        let class_index = self
989            .class_index_or_record_oversized(len)
990            .ok_or(PoolError::Oversized)?;
991        let allocation = self
992            .inner
993            .try_alloc(class_index, true)
994            .ok_or(PoolError::Exhausted)?;
995
996        let mut buf = IoBufMut::from_pooled(PooledBufMut::new(allocation.buffer, allocation.class));
997        if allocation.is_new {
998            // SAFETY: buffer was allocated with alloc_zeroed, so bytes in 0..len are initialized.
999            unsafe { buf.set_len(len) };
1000        } else {
1001            // Reused buffers may contain old bytes, re-zero requested readable range.
1002            // SAFETY: `as_mut_ptr()` is valid for writes up to `capacity() >= len` bytes.
1003            unsafe {
1004                std::ptr::write_bytes(buf.as_mut_ptr(), 0, len);
1005                buf.set_len(len);
1006            }
1007        }
1008        Ok(buf)
1009    }
1010
1011    /// Allocates a zero-initialized buffer with readable length `len`.
1012    ///
1013    /// The returned buffer has `len() == len` and `capacity() >= len`.
1014    ///
1015    /// If the pool can provide a buffer (len within limits and pool not
1016    /// exhausted), returns a pooled buffer that will be returned to the pool
1017    /// when dropped. Requests smaller than [`BufferPoolConfig::pool_min_size`]
1018    /// bypass pooling and return an untracked aligned allocation. Otherwise, oversized or
1019    /// exhausted requests fall back to an untracked aligned heap allocation
1020    /// that is deallocated when dropped.
1021    ///
1022    /// Use this for read APIs that require an initialized `&mut [u8]`.
1023    /// This avoids `unsafe set_len` at callsites.
1024    ///
1025    /// Use [`Self::try_alloc_zeroed`] if you need pooled-only behavior.
1026    ///
1027    /// # Initialization
1028    ///
1029    /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
1030    /// may be uninitialized.
1031    pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
1032        self.try_alloc_zeroed(len).unwrap_or_else(|_| {
1033            // Pool exhausted or oversized: allocate untracked zeroed memory.
1034            let size = len.max(self.inner.config.min_size.get());
1035            let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
1036            buf.truncate(len);
1037            buf
1038        })
1039    }
1040
1041    /// Returns the pool configuration.
1042    pub fn config(&self) -> &BufferPoolConfig {
1043        &self.inner.config
1044    }
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049    use super::*;
1050    use crate::iobuf::IoBuf;
1051    use bytes::{Buf, BufMut};
1052    use std::{
1053        sync::{mpsc, Arc},
1054        thread,
1055    };
1056
1057    fn test_size_class(size: usize, alignment: usize) -> Arc<SizeClass> {
1058        Arc::new(SizeClass::new(
1059            NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
1060            size,
1061            alignment,
1062            8,
1063            4,
1064            false,
1065        ))
1066    }
1067
1068    fn test_registry() -> Registry {
1069        Registry::default()
1070    }
1071
1072    /// Creates a test config with page alignment.
1073    fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
1074        BufferPoolConfig {
1075            pool_min_size: 0,
1076            min_size: NZUsize!(min_size),
1077            max_size: NZUsize!(max_size),
1078            max_per_class: NZUsize!(max_per_class),
1079            thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1080            prefill: false,
1081            alignment: NZUsize!(page_size()),
1082        }
1083    }
1084
1085    /// Helper to get the number of checked-out tracked buffers for a size class.
1086    ///
1087    /// With TLS enabled, tracked buffers can be free in either the shared
1088    /// freelist or the current thread's local cache.
1089    fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1090        let class_index = pool.inner.config.class_index(size).unwrap();
1091        let class = &pool.inner.classes[class_index];
1092        class.created.load(Ordering::Relaxed) - class.global.len() - get_local_len(class)
1093    }
1094
1095    /// Helper to get the number of free buffers visible to the current thread.
1096    fn get_available(pool: &BufferPool, size: usize) -> i64 {
1097        let class_index = pool.inner.config.class_index(size).unwrap();
1098        let class = &pool.inner.classes[class_index];
1099        (class.global.len() + get_local_len(class)) as i64
1100    }
1101
1102    /// Helper to get the number of free buffers parked in the current thread's
1103    /// local cache for a size class.
1104    fn get_local_len(class: &SizeClass) -> usize {
1105        TLS_SIZE_CLASS_CACHES.with(|bins| {
1106            // SAFETY: this TLS value is only ever accessed by the current thread.
1107            let bins = unsafe { &*bins.get() };
1108            bins.get(class.class_id)
1109                .and_then(Option::as_ref)
1110                .map_or(0, TlsSizeClassCache::len)
1111        })
1112    }
1113
1114    #[test]
1115    fn test_page_size() {
1116        let size = page_size();
1117        assert!(size >= 4096);
1118        assert!(size.is_power_of_two());
1119    }
1120
1121    #[test]
1122    fn test_config_validation() {
1123        let page = page_size();
1124        let config = test_config(page, page * 4, 10);
1125        config.validate();
1126    }
1127
1128    #[test]
1129    #[should_panic(expected = "thread_cache_capacity (11) must be <= max_per_class (10)")]
1130    fn test_config_invalid_thread_cache_capacity() {
1131        let page = page_size();
1132        let config = test_config(page, page * 4, 10).with_thread_cache_capacity(NZUsize!(11));
1133        config.validate();
1134    }
1135
1136    #[test]
1137    #[should_panic(expected = "min_size must be a power of two")]
1138    fn test_config_invalid_min_size() {
1139        let config = BufferPoolConfig {
1140            pool_min_size: 0,
1141            min_size: NZUsize!(3000),
1142            max_size: NZUsize!(8192),
1143            max_per_class: NZUsize!(10),
1144            thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1145            prefill: false,
1146            alignment: NZUsize!(page_size()),
1147        };
1148        config.validate();
1149    }
1150
1151    #[test]
1152    fn test_config_class_index() {
1153        let page = page_size();
1154        let config = test_config(page, page * 8, 10);
1155
1156        // Classes: page, page*2, page*4, page*8
1157        assert_eq!(config.num_classes(), 4);
1158
1159        assert_eq!(config.class_index(1), Some(0));
1160        assert_eq!(config.class_index(page), Some(0));
1161        assert_eq!(config.class_index(page + 1), Some(1));
1162        assert_eq!(config.class_index(page * 2), Some(1));
1163        assert_eq!(config.class_index(page * 8), Some(3));
1164        assert_eq!(config.class_index(page * 8 + 1), None);
1165    }
1166
1167    #[test]
1168    fn test_pool_alloc_and_return() {
1169        let page = page_size();
1170        let mut registry = test_registry();
1171        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1172
1173        // Allocate a buffer - returns buffer with len=0, capacity >= requested
1174        let buf = pool.try_alloc(page).unwrap();
1175        assert!(buf.capacity() >= page);
1176        assert_eq!(buf.len(), 0);
1177
1178        // Drop returns to pool
1179        drop(buf);
1180
1181        // Can allocate again
1182        let buf2 = pool.try_alloc(page).unwrap();
1183        assert!(buf2.capacity() >= page);
1184        assert_eq!(buf2.len(), 0);
1185    }
1186
1187    #[test]
1188    fn test_alloc_len_sets_len() {
1189        let page = page_size();
1190        let mut registry = test_registry();
1191        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1192
1193        // SAFETY: we immediately initialize all bytes before reading.
1194        let mut buf = unsafe { pool.alloc_len(100) };
1195        assert_eq!(buf.len(), 100);
1196        buf.as_mut().fill(0xAB);
1197        let frozen = buf.freeze();
1198        assert_eq!(frozen.as_ref(), &[0xAB; 100]);
1199    }
1200
1201    #[test]
1202    fn test_alloc_zeroed_sets_len_and_zeros() {
1203        let page = page_size();
1204        let mut registry = test_registry();
1205        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1206
1207        let buf = pool.alloc_zeroed(100);
1208        assert_eq!(buf.len(), 100);
1209        assert!(buf.as_ref().iter().all(|&b| b == 0));
1210    }
1211
1212    #[test]
1213    fn test_try_alloc_zeroed_sets_len_and_zeros() {
1214        let page = page_size();
1215        let mut registry = test_registry();
1216        let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
1217
1218        let buf = pool.try_alloc_zeroed(page).unwrap();
1219        assert!(buf.is_pooled());
1220        assert_eq!(buf.len(), page);
1221        assert!(buf.as_ref().iter().all(|&b| b == 0));
1222    }
1223
1224    #[test]
1225    fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
1226        let page = page_size();
1227        let mut registry = test_registry();
1228        let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1229
1230        // Exhaust pooled capacity for this class.
1231        let _pooled = pool.try_alloc(page).unwrap();
1232
1233        let buf = pool.alloc_zeroed(100);
1234        assert!(!buf.is_pooled());
1235        assert_eq!(buf.len(), 100);
1236        assert!(buf.as_ref().iter().all(|&b| b == 0));
1237    }
1238
1239    #[test]
1240    fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
1241        let page = page_size();
1242        let mut registry = test_registry();
1243        let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
1244
1245        let mut first = pool.alloc_zeroed(page);
1246        assert!(first.is_pooled());
1247        assert!(first.as_ref().iter().all(|&b| b == 0));
1248
1249        // Dirty the buffer before returning it to the pool.
1250        first.as_mut().fill(0xAB);
1251        drop(first);
1252
1253        let second = pool.alloc_zeroed(page);
1254        assert!(second.is_pooled());
1255        assert_eq!(second.len(), page);
1256        assert!(second.as_ref().iter().all(|&b| b == 0));
1257    }
1258
1259    #[test]
1260    fn test_requests_smaller_than_pool_min_size_bypass_pool() {
1261        let mut registry = test_registry();
1262        let pool = BufferPool::new(
1263            BufferPoolConfig {
1264                pool_min_size: 512,
1265                min_size: NZUsize!(512),
1266                max_size: NZUsize!(1024),
1267                max_per_class: NZUsize!(2),
1268                thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1269                prefill: false,
1270                alignment: NZUsize!(128),
1271            },
1272            &mut registry,
1273        );
1274
1275        let buf = pool.try_alloc(200).unwrap();
1276        assert!(!buf.is_pooled());
1277        assert_eq!(buf.capacity(), 200);
1278
1279        let zeroed = pool.try_alloc_zeroed(200).unwrap();
1280        assert!(!zeroed.is_pooled());
1281        assert_eq!(zeroed.len(), 200);
1282        assert!(zeroed.as_ref().iter().all(|&b| b == 0));
1283
1284        let pooled = pool.try_alloc(512).unwrap();
1285        assert!(pooled.is_pooled());
1286        assert_eq!(pooled.capacity(), 512);
1287    }
1288
1289    #[test]
1290    fn test_pool_size_classes() {
1291        let page = page_size();
1292        let mut registry = test_registry();
1293        let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
1294
1295        // Small request gets smallest class
1296        let buf1 = pool.try_alloc(page).unwrap();
1297        assert_eq!(buf1.capacity(), page);
1298
1299        // Larger request gets appropriate class
1300        let buf2 = pool.try_alloc(page + 1).unwrap();
1301        assert_eq!(buf2.capacity(), page * 2);
1302
1303        let buf3 = pool.try_alloc(page * 3).unwrap();
1304        assert_eq!(buf3.capacity(), page * 4);
1305    }
1306
1307    #[test]
1308    fn test_prefill() {
1309        let page = NZUsize!(page_size());
1310        let mut registry = test_registry();
1311        let pool = BufferPool::new(
1312            BufferPoolConfig {
1313                pool_min_size: 0,
1314                min_size: page,
1315                max_size: page,
1316                max_per_class: NZUsize!(5),
1317                thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1318                prefill: true,
1319                alignment: page,
1320            },
1321            &mut registry,
1322        );
1323
1324        // Should be able to allocate max_per_class buffers immediately
1325        let mut bufs = Vec::new();
1326        for _ in 0..5 {
1327            bufs.push(pool.try_alloc(page.get()).expect("alloc should succeed"));
1328        }
1329
1330        // Next allocation should fail
1331        assert!(pool.try_alloc(page.get()).is_err());
1332    }
1333
1334    #[test]
1335    fn test_config_for_network() {
1336        let config = BufferPoolConfig::for_network();
1337        config.validate();
1338        assert_eq!(config.pool_min_size, 1024);
1339        assert_eq!(config.min_size.get(), 1024);
1340        assert_eq!(config.max_size.get(), 64 * 1024);
1341        assert_eq!(config.max_per_class.get(), 4096);
1342        assert_eq!(
1343            config.thread_cache_config,
1344            BufferPoolThreadCacheConfig::Disabled
1345        );
1346        assert!(!config.prefill);
1347        assert_eq!(config.alignment.get(), cache_line_size());
1348    }
1349
1350    #[test]
1351    fn test_config_for_storage() {
1352        let config = BufferPoolConfig::for_storage();
1353        config.validate();
1354        assert_eq!(config.pool_min_size, 1024);
1355        assert_eq!(config.min_size.get(), page_size());
1356        assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
1357        assert_eq!(config.max_per_class.get(), 64);
1358        assert_eq!(
1359            config.thread_cache_config,
1360            BufferPoolThreadCacheConfig::Disabled
1361        );
1362        assert!(!config.prefill);
1363        assert_eq!(config.alignment.get(), page_size());
1364    }
1365
1366    #[test]
1367    fn test_storage_config_supports_default_allocations() {
1368        // The storage preset's max_size (8 MB) should be allocatable out of the box.
1369        let mut registry = test_registry();
1370        let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
1371
1372        let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
1373        assert_eq!(buf.capacity(), 8 * 1024 * 1024);
1374    }
1375
1376    #[test]
1377    fn test_config_builders() {
1378        let page = NZUsize!(page_size());
1379        let config = BufferPoolConfig::for_storage()
1380            .with_pool_min_size(1024)
1381            .with_max_per_class(NZUsize!(64))
1382            .with_thread_cache_capacity(NZUsize!(8))
1383            .with_prefill(true)
1384            .with_min_size(page)
1385            .with_max_size(NZUsize!(128 * 1024));
1386
1387        config.validate();
1388        assert_eq!(config.pool_min_size, 1024);
1389        assert_eq!(config.min_size, page);
1390        assert_eq!(config.max_size.get(), 128 * 1024);
1391        assert_eq!(config.max_per_class.get(), 64);
1392        assert_eq!(
1393            config.thread_cache_config,
1394            BufferPoolThreadCacheConfig::Fixed(NZUsize!(8))
1395        );
1396        assert!(config.prefill);
1397        // Storage profile alignment stays page-sized unless explicitly changed.
1398        assert_eq!(config.alignment.get(), page_size());
1399
1400        // Alignment can be tuned explicitly as long as min_size is also adjusted.
1401        let aligned = BufferPoolConfig::for_network()
1402            .with_pool_min_size(256)
1403            .with_thread_cache_for_parallelism(NZUsize!(4))
1404            .with_alignment(NZUsize!(256))
1405            .with_min_size(NZUsize!(256));
1406        aligned.validate();
1407        assert_eq!(
1408            aligned.thread_cache_config,
1409            BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(4))
1410        );
1411        assert_eq!(aligned.alignment.get(), 256);
1412        assert_eq!(aligned.min_size.get(), 256);
1413    }
1414
1415    #[test]
1416    fn test_parallelism_policy_resolves_thread_cache_capacity() {
1417        let page = page_size();
1418        let mut registry = test_registry();
1419        let pool = BufferPool::new(
1420            test_config(page, page, 64).with_thread_cache_for_parallelism(NZUsize!(8)),
1421            &mut registry,
1422        );
1423        let class_index = pool.inner.config.class_index(page).unwrap();
1424        assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 4);
1425    }
1426
1427    #[test]
1428    fn test_fixed_thread_cache_capacity_overrides_runtime_parallelism() {
1429        let page = page_size();
1430        let mut registry = test_registry();
1431        let pool = BufferPool::new(
1432            test_config(page, page, 64).with_thread_cache_capacity(NZUsize!(7)),
1433            &mut registry,
1434        );
1435        let class_index = pool.inner.config.class_index(page).unwrap();
1436
1437        // Fixed capacity should bypass the derived parallelism heuristic.
1438        assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 7);
1439    }
1440
1441    #[test]
1442    fn test_disabled_thread_cache_does_not_retain_buffers_locally() {
1443        let page = page_size();
1444        let mut registry = test_registry();
1445        let pool = BufferPool::new(
1446            test_config(page, page, 2).with_thread_cache_disabled(),
1447            &mut registry,
1448        );
1449        let class_index = pool.inner.config.class_index(page).unwrap();
1450        let class = &pool.inner.classes[class_index];
1451
1452        let tracked = pool.try_alloc(page).expect("tracked allocation");
1453        drop(tracked);
1454
1455        // Disabled thread caching still routes returns through the global
1456        // freelist, but should never retain buffers in the current thread.
1457        assert_eq!(class.thread_cache_capacity, 0);
1458        assert_eq!(get_local_len(class), 0);
1459        assert_eq!(class.global.len(), 1);
1460    }
1461
1462    #[test]
1463    fn test_thread_cache_flush_moves_local_entries_to_global() {
1464        let page = page_size();
1465        let mut registry = test_registry();
1466        let pool = BufferPool::new(
1467            test_config(page, page * 2, 8).with_thread_cache_capacity(NZUsize!(4)),
1468            &mut registry,
1469        );
1470
1471        // Use two distinct size classes so the test exercises the whole TLS
1472        // registry, not just a single per-class cache entry.
1473        let small_index = pool.inner.config.class_index(page).unwrap();
1474        let large_index = pool.inner.config.class_index(page + 1).unwrap();
1475        let small_class = &pool.inner.classes[small_index];
1476        let large_class = &pool.inner.classes[large_index];
1477
1478        // Return one buffer from each class to the current thread. With local
1479        // caching enabled, both drops should stay in the thread-local bins.
1480        let small = pool.try_alloc(page).expect("tracked allocation");
1481        let large = pool.try_alloc(page + 1).expect("tracked allocation");
1482        drop(small);
1483        drop(large);
1484
1485        // Before flushing, both buffers are only visible via the current
1486        // thread's local caches, nothing has been pushed to the global queues.
1487        assert_eq!(get_local_len(small_class), 1);
1488        assert_eq!(get_local_len(large_class), 1);
1489        assert_eq!(small_class.global.len(), 0);
1490        assert_eq!(large_class.global.len(), 0);
1491
1492        // Flushing should walk the entire TLS registry, drop every local cache,
1493        // and let each cache's drop implementation return its buffers to the
1494        // shared global freelists.
1495        BufferPoolThreadCache::flush();
1496
1497        // After flush, the current thread retains nothing locally and both
1498        // buffers are once again visible through their class-global queues.
1499        assert_eq!(get_local_len(small_class), 0);
1500        assert_eq!(get_local_len(large_class), 0);
1501        assert_eq!(small_class.global.len(), 1);
1502        assert_eq!(large_class.global.len(), 1);
1503    }
1504
1505    #[test]
1506    fn test_config_with_budget_bytes() {
1507        // Classes: 4, 8, 16 (sum = 28). Budget 280 => max_per_class = 10.
1508        let config = BufferPoolConfig {
1509            pool_min_size: 0,
1510            min_size: NZUsize!(4),
1511            max_size: NZUsize!(16),
1512            max_per_class: NZUsize!(1),
1513            thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1514            prefill: false,
1515            alignment: NZUsize!(4),
1516        }
1517        .with_budget_bytes(NZUsize!(280));
1518        assert_eq!(config.max_per_class.get(), 10);
1519
1520        // Budget 10 rounds up to one buffer per class.
1521        let small_budget = BufferPoolConfig {
1522            pool_min_size: 0,
1523            min_size: NZUsize!(4),
1524            max_size: NZUsize!(16),
1525            max_per_class: NZUsize!(1),
1526            thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1527            prefill: false,
1528            alignment: NZUsize!(4),
1529        }
1530        .with_budget_bytes(NZUsize!(10));
1531        assert_eq!(small_budget.max_per_class.get(), 1);
1532    }
1533
1534    #[test]
1535    fn test_pool_error_display() {
1536        assert_eq!(
1537            PoolError::Oversized.to_string(),
1538            "requested capacity exceeds maximum buffer size"
1539        );
1540        assert_eq!(
1541            PoolError::Exhausted.to_string(),
1542            "pool exhausted for required size class"
1543        );
1544    }
1545
1546    #[test]
1547    fn test_config_invalid_range_edge_paths() {
1548        // max_size < min_size should yield zero size classes, and budget_bytes
1549        // should leave max_per_class unchanged (no division by zero).
1550        let invalid_order = BufferPoolConfig {
1551            pool_min_size: 0,
1552            min_size: NZUsize!(8),
1553            max_size: NZUsize!(4),
1554            max_per_class: NZUsize!(1),
1555            thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1556            prefill: false,
1557            alignment: NZUsize!(4),
1558        };
1559        assert_eq!(invalid_order.num_classes(), 0);
1560        let unchanged = invalid_order.clone().with_budget_bytes(NZUsize!(128));
1561        assert_eq!(unchanged.max_per_class, invalid_order.max_per_class);
1562
1563        // Non-power-of-two max_size should make the size unreachable via class_index.
1564        let non_power_two_max = BufferPoolConfig {
1565            pool_min_size: 0,
1566            min_size: NZUsize!(8),
1567            max_size: NZUsize!(12),
1568            max_per_class: NZUsize!(1),
1569            thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
1570            prefill: false,
1571            alignment: NZUsize!(4),
1572        };
1573        assert_eq!(non_power_two_max.class_index(12), None);
1574    }
1575
1576    #[test]
1577    fn test_pool_debug_and_config_accessor() {
1578        // Debug formatting and config accessor should be consistent.
1579        let page = page_size();
1580        let mut registry = test_registry();
1581        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1582
1583        let debug = format!("{pool:?}");
1584        assert!(debug.contains("BufferPool"));
1585        assert!(debug.contains("num_classes"));
1586        assert_eq!(pool.config().min_size.get(), page);
1587    }
1588
1589    #[test]
1590    fn test_return_buffer_local_overflow_spills_to_global() {
1591        let page = page_size();
1592        let mut registry = test_registry();
1593        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1594        let class_index = pool
1595            .inner
1596            .config
1597            .class_index(page)
1598            .expect("class exists for page-sized buffer");
1599
1600        let tracked1 = pool.try_alloc(page).expect("first tracked allocation");
1601        let tracked2 = pool.try_alloc(page).expect("second tracked allocation");
1602
1603        // The first return should stay entirely in the current thread's local cache.
1604        drop(tracked1);
1605        assert_eq!(pool.inner.classes[class_index].global.len(), 0);
1606        assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
1607
1608        // Returning another tracked buffer should route overflow to the global
1609        // freelist and retain one in the current thread's local bin.
1610        drop(tracked2);
1611        assert_eq!(pool.inner.classes[class_index].global.len(), 1);
1612        assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
1613        assert_eq!(get_available(&pool, page), 2);
1614    }
1615
1616    #[test]
1617    fn test_small_local_cache_overflow_preserves_locality() {
1618        let page = page_size();
1619        let mut registry = test_registry();
1620        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1621
1622        // With `thread_cache_capacity == 1`, the first return stays local and the
1623        // second overflows directly to global instead of spilling the hot
1624        // local entry through the shared queue.
1625        let mut tracked1 = pool.try_alloc(page).expect("first tracked allocation");
1626        let ptr1 = tracked1.as_mut_ptr();
1627        let mut tracked2 = pool.try_alloc(page).expect("second tracked allocation");
1628        let ptr2 = tracked2.as_mut_ptr();
1629
1630        drop(tracked1);
1631        drop(tracked2);
1632
1633        let mut reused_local = pool.try_alloc(page).expect("reuse from local cache");
1634        assert_eq!(reused_local.as_mut_ptr(), ptr1);
1635
1636        let mut reused_global = pool.try_alloc(page).expect("reuse from global freelist");
1637        assert_eq!(reused_global.as_mut_ptr(), ptr2);
1638    }
1639
1640    #[test]
1641    fn test_large_local_cache_batches_overflow_and_refill() {
1642        let page = page_size();
1643        let mut registry = test_registry();
1644        let threads = std::thread::available_parallelism().map_or(1, NonZeroUsize::get);
1645        let max_per_class = threads * 8;
1646        let pool = BufferPool::new(test_config(page, page, max_per_class), &mut registry);
1647        let class_index = pool
1648            .inner
1649            .config
1650            .class_index(page)
1651            .expect("class exists for page-sized buffer");
1652        let class = &pool.inner.classes[class_index];
1653
1654        assert!(class.thread_cache_capacity >= MIN_THREAD_CACHE_BATCHING_CAPACITY);
1655
1656        // Drop enough distinct checked-out buffers to force an overflow from a
1657        // full local cache. Large bins should spill half the entries to global
1658        // and keep the remainder local for fast same-thread reuse.
1659        let mut bufs = Vec::new();
1660        for _ in 0..class.thread_cache_capacity + 1 {
1661            bufs.push(pool.try_alloc(page).expect("tracked allocation"));
1662        }
1663        for buf in bufs {
1664            drop(buf);
1665        }
1666
1667        assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 + 1);
1668        assert_eq!(class.global.len(), class.thread_cache_capacity / 2);
1669
1670        // Drain the local half, then hit global once. That global pop should
1671        // batch-refill the local cache back up to the configured target.
1672        let mut reused = Vec::new();
1673        for _ in 0..class.thread_cache_capacity / 2 + 1 {
1674            reused.push(pool.try_alloc(page).expect("local reuse"));
1675        }
1676        assert_eq!(get_local_len(class), 0);
1677        assert_eq!(class.global.len(), class.thread_cache_capacity / 2);
1678
1679        let _global = pool.try_alloc(page).expect("global reuse with refill");
1680        assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 - 1);
1681        assert_eq!(class.global.len(), 0);
1682    }
1683
1684    #[test]
1685    fn test_tls_refill_stops_when_global_runs_empty() {
1686        let class = test_size_class(64, 64);
1687
1688        // A short global freelist should refill only what exists, then stop.
1689        class.push_global(AlignedBuffer::new(class.size, class.alignment));
1690        BufferPoolThreadCache::refill(&class, MIN_THREAD_CACHE_BATCHING_CAPACITY);
1691
1692        assert_eq!(get_local_len(&class), 1);
1693        assert_eq!(class.global.len(), 0);
1694    }
1695
1696    #[test]
1697    fn test_tls_size_class_cache_push_tolerates_empty_spill() {
1698        let class = test_size_class(64, 64);
1699        let mut cache = TlsSizeClassCache {
1700            entries: Vec::new(),
1701            capacity: 0,
1702        };
1703
1704        // Small local capacities should bypass batching and push straight to global.
1705        cache.push(TlsSizeClassCacheEntry {
1706            buffer: AlignedBuffer::new(class.size, class.alignment),
1707            class,
1708        });
1709        drop(cache);
1710    }
1711
1712    #[test]
1713    #[should_panic(expected = "tracked buffer should always fit in the global pool")]
1714    fn test_push_global_panics_when_global_queue_is_inconsistently_full() {
1715        let class = Arc::new(SizeClass::new(
1716            NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
1717            64,
1718            64,
1719            1,
1720            1,
1721            false,
1722        ));
1723
1724        // Overfilling the fixed-size global queue should trip the invariant.
1725        class.push_global(AlignedBuffer::new(64, 64));
1726        class.push_global(AlignedBuffer::new(64, 64));
1727    }
1728
1729    #[test]
1730    fn test_pooled_debug_and_empty_into_bytes_paths() {
1731        // Debug formatting for pooled mutable/immutable wrappers, and empty
1732        // into_bytes should detach without retaining the pool allocation.
1733        let page = page_size();
1734        let class = test_size_class(page, page);
1735
1736        // Mutable pooled debug should include cursor position.
1737        let pooled_mut_debug = {
1738            let pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Arc::clone(&class));
1739            format!("{pooled_mut:?}")
1740        };
1741        assert!(pooled_mut_debug.contains("PooledBufMut"));
1742        assert!(pooled_mut_debug.contains("cursor"));
1743
1744        // Empty mutable buffer converts to empty Bytes without retaining pool memory.
1745        let empty_from_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Arc::clone(&class));
1746        assert!(empty_from_mut.into_bytes().is_empty());
1747
1748        // Immutable pooled debug should include capacity.
1749        let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), class).into_pooled();
1750        let pooled_debug = format!("{pooled:?}");
1751        assert!(pooled_debug.contains("PooledBuf"));
1752        assert!(pooled_debug.contains("capacity"));
1753        assert!(pooled.into_bytes().is_empty());
1754    }
1755
1756    #[test]
1757    fn test_freeze_returns_buffer_to_pool() {
1758        let page = page_size();
1759        let mut registry = test_registry();
1760        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1761
1762        // Initially: 0 allocated, 0 available
1763        assert_eq!(get_allocated(&pool, page), 0);
1764        assert_eq!(get_available(&pool, page), 0);
1765
1766        // Allocate and freeze
1767        let buf = pool.try_alloc(page).unwrap();
1768        assert_eq!(get_allocated(&pool, page), 1);
1769        assert_eq!(get_available(&pool, page), 0);
1770
1771        let iobuf = buf.freeze();
1772        // Still allocated (held by IoBuf)
1773        assert_eq!(get_allocated(&pool, page), 1);
1774
1775        // Drop the IoBuf - buffer should return to pool
1776        drop(iobuf);
1777        assert_eq!(get_allocated(&pool, page), 0);
1778        assert_eq!(get_available(&pool, page), 1);
1779    }
1780
1781    #[test]
1782    fn test_refcount_and_copy_to_bytes_paths() {
1783        let page = page_size();
1784        let mut registry = test_registry();
1785        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1786
1787        // Refcount behavior:
1788        // - clone/slice keep the pooled allocation alive
1789        // - empty slice does not keep ownership
1790        {
1791            let mut buf = pool.try_alloc(page).unwrap();
1792            buf.put_slice(&[0xAA; 100]);
1793            let iobuf = buf.freeze();
1794            let clone = iobuf.clone();
1795            let slice = iobuf.slice(10..40);
1796            let empty = iobuf.slice(10..10);
1797            assert!(empty.is_empty());
1798            drop(iobuf);
1799            assert_eq!(get_allocated(&pool, page), 1);
1800            drop(slice);
1801            assert_eq!(get_allocated(&pool, page), 1);
1802            drop(clone);
1803            assert_eq!(get_allocated(&pool, page), 0);
1804        }
1805
1806        // IoBuf::copy_to_bytes behavior:
1807        // - zero-length copy is empty and non-advancing
1808        // - partial copy advances while keeping ownership alive
1809        // - full drain transfers ownership out of source
1810        // - zero-length copy on already-empty source stays detached
1811        {
1812            let mut buf = pool.try_alloc(page).unwrap();
1813            buf.put_slice(&[0x42; 100]);
1814            let mut iobuf = buf.freeze();
1815
1816            let zero = iobuf.copy_to_bytes(0);
1817            assert!(zero.is_empty());
1818            assert_eq!(iobuf.remaining(), 100);
1819
1820            let partial = iobuf.copy_to_bytes(30);
1821            assert_eq!(&partial[..], &[0x42; 30]);
1822            assert_eq!(iobuf.remaining(), 70);
1823
1824            let rest = iobuf.copy_to_bytes(70);
1825            assert_eq!(&rest[..], &[0x42; 70]);
1826            assert_eq!(iobuf.remaining(), 0);
1827
1828            // Zero-length copy on empty should not transfer ownership.
1829            let empty = iobuf.copy_to_bytes(0);
1830            assert!(empty.is_empty());
1831
1832            drop(iobuf);
1833            assert_eq!(get_allocated(&pool, page), 1);
1834            drop(zero);
1835            drop(partial);
1836            assert_eq!(get_allocated(&pool, page), 1);
1837            drop(rest);
1838            assert_eq!(get_allocated(&pool, page), 0);
1839        }
1840
1841        // IoBufMut::copy_to_bytes mirrors the immutable ownership semantics.
1842        {
1843            let buf = pool.try_alloc(page).unwrap();
1844            let mut iobufmut = buf;
1845            iobufmut.put_slice(&[0x7E; 100]);
1846
1847            let zero = iobufmut.copy_to_bytes(0);
1848            assert!(zero.is_empty());
1849            assert_eq!(iobufmut.remaining(), 100);
1850
1851            let partial = iobufmut.copy_to_bytes(30);
1852            assert_eq!(&partial[..], &[0x7E; 30]);
1853            assert_eq!(iobufmut.remaining(), 70);
1854
1855            let rest = iobufmut.copy_to_bytes(70);
1856            assert_eq!(&rest[..], &[0x7E; 70]);
1857            assert_eq!(iobufmut.remaining(), 0);
1858
1859            drop(iobufmut);
1860            assert_eq!(get_allocated(&pool, page), 1);
1861            drop(zero);
1862            drop(partial);
1863            assert_eq!(get_allocated(&pool, page), 1);
1864            drop(rest);
1865            assert_eq!(get_allocated(&pool, page), 0);
1866        }
1867    }
1868
1869    #[test]
1870    fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
1871        // IoBuf -> IoBufMut should recover pooled ownership for unique non-full views.
1872        let page = page_size();
1873        let mut registry = test_registry();
1874        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1875
1876        let buf = pool.try_alloc(page).unwrap();
1877        assert_eq!(get_allocated(&pool, page), 1);
1878
1879        let iobuf = buf.freeze();
1880        assert_eq!(get_allocated(&pool, page), 1);
1881
1882        let iobufmut: IoBufMut = iobuf.into();
1883
1884        // Conversion reused pooled storage instead of copying.
1885        assert_eq!(
1886            get_allocated(&pool, page),
1887            1,
1888            "pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
1889        );
1890        assert_eq!(get_available(&pool, page), 0);
1891
1892        // Dropping returns the pooled buffer.
1893        drop(iobufmut);
1894        assert_eq!(get_allocated(&pool, page), 0);
1895        assert_eq!(get_available(&pool, page), 1);
1896    }
1897
1898    #[test]
1899    fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
1900        // IoBuf -> IoBufMut via From should preserve data and keep pooled
1901        // ownership for a fully-written unique view.
1902        let page = page_size();
1903        let mut registry = test_registry();
1904        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1905
1906        // Fill a pooled buffer completely and freeze.
1907        let mut buf = pool.try_alloc(page).unwrap();
1908        buf.put_slice(&vec![0xEE; page]);
1909        let iobuf = buf.freeze();
1910
1911        // Convert back to mutable; should reuse pooled storage.
1912        let iobufmut: IoBufMut = iobuf.into();
1913        assert_eq!(iobufmut.len(), page);
1914        assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
1915        assert_eq!(get_allocated(&pool, page), 1);
1916        assert_eq!(get_available(&pool, page), 0);
1917
1918        // Dropping returns the buffer to the pool.
1919        drop(iobufmut);
1920        assert_eq!(get_allocated(&pool, page), 0);
1921        assert_eq!(get_available(&pool, page), 1);
1922    }
1923
1924    #[test]
1925    fn test_iobuf_try_into_mut_recycles_full_unique_view() {
1926        // try_into_mut on a uniquely-owned full-view pooled IoBuf should recover
1927        // mutable ownership without copying, preserving data and pool tracking.
1928        let page = page_size();
1929        let mut registry = test_registry();
1930        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1931
1932        let mut buf = pool.try_alloc(page).unwrap();
1933        buf.put_slice(&vec![0xAB; page]);
1934        let iobuf = buf.freeze();
1935        assert_eq!(get_allocated(&pool, page), 1);
1936
1937        // Unique full view should recycle.
1938        let recycled = iobuf
1939            .try_into_mut()
1940            .expect("unique full-view pooled buffer should recycle");
1941        assert_eq!(recycled.len(), page);
1942        assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
1943        assert_eq!(recycled.capacity(), page);
1944        assert_eq!(get_allocated(&pool, page), 1);
1945
1946        drop(recycled);
1947        assert_eq!(get_allocated(&pool, page), 0);
1948        assert_eq!(get_available(&pool, page), 1);
1949    }
1950
1951    #[test]
1952    fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
1953        let page = page_size();
1954        let mut registry = test_registry();
1955        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
1956
1957        // Unique sliced views can recover mutable ownership without copying.
1958        let mut buf = pool.try_alloc(page).unwrap();
1959        buf.put_slice(&vec![0xCD; page]);
1960        let iobuf = buf.freeze();
1961        let sliced = iobuf.slice(1..page);
1962        drop(iobuf);
1963        let recycled = sliced
1964            .try_into_mut()
1965            .expect("unique sliced pooled buffer should recycle");
1966        assert_eq!(recycled.len(), page - 1);
1967        assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
1968        assert_eq!(recycled.capacity(), page - 1);
1969        assert_eq!(get_allocated(&pool, page), 1);
1970        drop(recycled);
1971        assert_eq!(get_allocated(&pool, page), 0);
1972        assert_eq!(get_available(&pool, page), 1);
1973
1974        // Shared views still cannot recover mutable ownership.
1975        let mut buf = pool.try_alloc(page).unwrap();
1976        buf.put_slice(&vec![0xEF; page]);
1977        let iobuf = buf.freeze();
1978        let cloned = iobuf.clone();
1979        let iobuf = iobuf
1980            .try_into_mut()
1981            .expect_err("shared pooled buffer must not convert to mutable");
1982
1983        drop(cloned);
1984        drop(iobuf);
1985        assert_eq!(get_allocated(&pool, page), 0);
1986        assert!(get_available(&pool, page) >= 1);
1987    }
1988
1989    #[test]
1990    fn test_multithreaded_alloc_freeze_return() {
1991        let page = page_size();
1992        let mut registry = test_registry();
1993        let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
1994
1995        let mut handles = vec![];
1996
1997        // Reduce iterations under miri (atomics are slow)
1998        cfg_if::cfg_if! {
1999            if #[cfg(miri)] {
2000                let iterations = 100;
2001            } else {
2002                let iterations = 1000;
2003            }
2004        }
2005
2006        // Spawn multiple threads that allocate, freeze, clone, and drop
2007        for _ in 0..10 {
2008            let pool = pool.clone();
2009            let handle = thread::spawn(move || {
2010                for _ in 0..iterations {
2011                    let buf = pool.try_alloc(page).unwrap();
2012                    let iobuf = buf.freeze();
2013
2014                    // Clone a few times
2015                    let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
2016                    drop(iobuf);
2017
2018                    // Drop clones
2019                    for clone in clones {
2020                        drop(clone);
2021                    }
2022                }
2023            });
2024            handles.push(handle);
2025        }
2026
2027        // Wait for all threads
2028        for handle in handles {
2029            handle.join().unwrap();
2030        }
2031
2032        // Worker threads may retain free buffers in their own local caches, so
2033        // the main thread cannot assert that all of them are visible here.
2034        // It should still be able to allocate successfully once the workers finish.
2035        let _buf = pool
2036            .try_alloc(page)
2037            .expect("pool should remain usable after multithreaded test");
2038    }
2039
2040    #[test]
2041    fn test_cross_thread_buffer_return() {
2042        // Allocate on one thread, freeze, send to another thread, drop there
2043        let page = page_size();
2044        let mut registry = test_registry();
2045        let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
2046
2047        let (tx, rx) = mpsc::channel();
2048
2049        // Allocate and freeze on main thread
2050        for _ in 0..50 {
2051            let buf = pool.try_alloc(page).unwrap();
2052            let iobuf = buf.freeze();
2053            tx.send(iobuf).unwrap();
2054        }
2055        drop(tx);
2056
2057        // Receive and drop on another thread. Those returns should populate the
2058        // dropping thread's local cache, so allocations on that same thread
2059        // should be able to reuse them immediately.
2060        let handle = thread::spawn(move || {
2061            while let Ok(iobuf) = rx.recv() {
2062                drop(iobuf);
2063            }
2064
2065            let class_index = pool
2066                .inner
2067                .config
2068                .class_index(page)
2069                .expect("class exists for page-sized buffer");
2070            assert!(
2071                get_local_len(&pool.inner.classes[class_index]) >= 1,
2072                "dropping thread should retain returned buffers in its local cache"
2073            );
2074
2075            for _ in 0..50 {
2076                let _buf = pool
2077                    .try_alloc(page)
2078                    .expect("dropping thread should be able to reuse returned buffers");
2079            }
2080        });
2081
2082        handle.join().unwrap();
2083    }
2084
2085    #[test]
2086    fn test_thread_exit_flushes_local_bin() {
2087        // When a thread exits, its TLS cache Drop flushes buffers back to the
2088        // global freelist, making them available to other threads.
2089        let page = page_size();
2090        let mut registry = test_registry();
2091        let pool = Arc::new(BufferPool::new(test_config(page, page, 1), &mut registry));
2092
2093        // Allocate and return a buffer on a worker thread, then let it exit.
2094        let worker_pool = pool.clone();
2095        thread::spawn(move || {
2096            let buf = worker_pool
2097                .try_alloc(page)
2098                .expect("worker should allocate tracked buffer");
2099            drop(buf);
2100        })
2101        .join()
2102        .expect("worker thread should exit cleanly");
2103
2104        // After thread exit, the buffer should be in the global freelist (not
2105        // stuck in a dead thread's local cache).
2106        let class_index = pool
2107            .inner
2108            .config
2109            .class_index(page)
2110            .expect("class exists for page-sized buffer");
2111        assert_eq!(pool.inner.classes[class_index].global.len(), 1);
2112        assert_eq!(get_local_len(&pool.inner.classes[class_index]), 0);
2113
2114        // The flushed buffer should be reusable from the main thread.
2115        let _buf = pool
2116            .try_alloc(page)
2117            .expect("thread-exited local buffer should be reusable");
2118    }
2119
2120    #[test]
2121    fn test_pool_drop_drains_global_freelist() {
2122        // Dropping the pool should immediately reclaim globally-visible free
2123        // tracked buffers, while leaving TLS-cached buffers alone.
2124        let page = page_size();
2125        let mut registry = test_registry();
2126        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2127        let class_index = pool
2128            .inner
2129            .config
2130            .class_index(page)
2131            .expect("class exists for page-sized buffer");
2132        let class = Arc::clone(&pool.inner.classes[class_index]);
2133
2134        // Return one buffer to the current thread's local cache and overflow
2135        // the other into the shared global freelist.
2136        let buf1 = pool.try_alloc(page).unwrap();
2137        let buf2 = pool.try_alloc(page).unwrap();
2138        drop(buf1);
2139        drop(buf2);
2140
2141        assert_eq!(class.global.len(), 1);
2142        assert_eq!(get_local_len(&class), 1);
2143
2144        // Pool drop should drain only the global freelist. The thread-local
2145        // cache remains untouched until thread exit.
2146        drop(pool);
2147
2148        assert_eq!(class.global.len(), 0);
2149        assert_eq!(get_local_len(&class), 1);
2150        assert_eq!(class.created.load(Ordering::Relaxed), 1);
2151    }
2152
2153    #[test]
2154    fn test_pool_dropped_before_buffer() {
2155        // What happens if the pool is dropped while buffers are still in use?
2156        // The size class remains alive until the last tracked buffer is dropped.
2157
2158        let page = page_size();
2159        let mut registry = test_registry();
2160        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2161
2162        let mut buf = pool.try_alloc(page).unwrap();
2163        buf.put_slice(&[0u8; 100]);
2164        let iobuf = buf.freeze();
2165
2166        // Drop the pool while buffer is still alive
2167        drop(pool);
2168
2169        // Buffer should still be usable
2170        assert_eq!(iobuf.len(), 100);
2171
2172        // Dropping the buffer should not panic and should return to the retained size class.
2173        drop(iobuf);
2174        // No assertion here - we just want to make sure it doesn't panic
2175    }
2176
2177    #[test]
2178    fn test_pool_exhaustion_and_recovery() {
2179        // Test pool exhaustion and recovery.
2180        let page = page_size();
2181        let mut registry = test_registry();
2182        let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
2183
2184        // Exhaust the pool
2185        let buf1 = pool.try_alloc(page).expect("first alloc");
2186        let buf2 = pool.try_alloc(page).expect("second alloc");
2187        let buf3 = pool.try_alloc(page).expect("third alloc");
2188        assert!(pool.try_alloc(page).is_err(), "pool should be exhausted");
2189
2190        // Return one buffer
2191        drop(buf1);
2192
2193        // Should be able to allocate again
2194        let buf4 = pool.try_alloc(page).expect("alloc after return");
2195        assert!(pool.try_alloc(page).is_err(), "pool exhausted again");
2196
2197        // Return all and verify freelist reuse
2198        drop(buf2);
2199        drop(buf3);
2200        drop(buf4);
2201
2202        assert_eq!(get_allocated(&pool, page), 0);
2203        assert_eq!(get_available(&pool, page), 3);
2204
2205        // Allocate again - should reuse from freelist
2206        let _buf5 = pool.try_alloc(page).expect("reuse from freelist");
2207        assert_eq!(get_available(&pool, page), 2);
2208    }
2209
2210    #[test]
2211    fn test_try_alloc_errors() {
2212        // Test try_alloc error variants.
2213        let page = page_size();
2214        let mut registry = test_registry();
2215        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2216
2217        // Oversized request
2218        let result = pool.try_alloc(page * 10);
2219        assert_eq!(result.unwrap_err(), PoolError::Oversized);
2220
2221        // Exhaust pool
2222        let _buf1 = pool.try_alloc(page).unwrap();
2223        let _buf2 = pool.try_alloc(page).unwrap();
2224        let result = pool.try_alloc(page);
2225        assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2226    }
2227
2228    #[test]
2229    fn test_try_alloc_zeroed_errors() {
2230        // try_alloc_zeroed should return the same error variants as try_alloc.
2231        let page = page_size();
2232        let mut registry = test_registry();
2233        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2234
2235        // Oversized request.
2236        let result = pool.try_alloc_zeroed(page * 10);
2237        assert_eq!(result.unwrap_err(), PoolError::Oversized);
2238
2239        // Exhaust pool, then verify Exhausted error.
2240        let _buf1 = pool.try_alloc_zeroed(page).unwrap();
2241        let _buf2 = pool.try_alloc_zeroed(page).unwrap();
2242        let result = pool.try_alloc_zeroed(page);
2243        assert_eq!(result.unwrap_err(), PoolError::Exhausted);
2244    }
2245
2246    #[test]
2247    fn test_fallback_allocation() {
2248        // Test fallback allocation when pool is exhausted or oversized.
2249        let page = page_size();
2250        let mut registry = test_registry();
2251        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2252
2253        // Exhaust the pool
2254        let buf1 = pool.try_alloc(page).unwrap();
2255        let buf2 = pool.try_alloc(page).unwrap();
2256        assert!(buf1.is_pooled());
2257        assert!(buf2.is_pooled());
2258
2259        // Fallback via alloc() when exhausted - still aligned, but untracked
2260        let mut fallback_exhausted = pool.alloc(page);
2261        assert!(!fallback_exhausted.is_pooled());
2262        assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
2263
2264        // Fallback via alloc() when oversized - still aligned, but untracked
2265        let mut fallback_oversized = pool.alloc(page * 10);
2266        assert!(!fallback_oversized.is_pooled());
2267        assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
2268
2269        // Verify pool counters unchanged by fallback allocations
2270        assert_eq!(get_allocated(&pool, page), 2);
2271
2272        // Drop fallback buffers - should not affect pool counters
2273        drop(fallback_exhausted);
2274        drop(fallback_oversized);
2275        assert_eq!(get_allocated(&pool, page), 2);
2276
2277        // Drop tracked buffers - counters should decrease
2278        drop(buf1);
2279        drop(buf2);
2280        assert_eq!(get_allocated(&pool, page), 0);
2281    }
2282
2283    #[test]
2284    fn test_is_pooled() {
2285        // IoBufMut from the pool should report is_pooled, while heap-backed
2286        // buffers should not.
2287        let page = page_size();
2288        let mut registry = test_registry();
2289        let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
2290
2291        let pooled = pool.try_alloc(page).unwrap();
2292        assert!(pooled.is_pooled());
2293
2294        let owned = IoBufMut::with_capacity(100);
2295        assert!(!owned.is_pooled());
2296    }
2297
2298    #[test]
2299    fn test_iobuf_is_pooled() {
2300        let page = page_size();
2301        let mut registry = test_registry();
2302        let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
2303
2304        let pooled = pool.try_alloc(page).unwrap().freeze();
2305        assert!(pooled.is_pooled());
2306
2307        // Oversized alloc uses untracked fallback allocation.
2308        let fallback = pool.alloc(page * 10).freeze();
2309        assert!(!fallback.is_pooled());
2310
2311        let bytes = IoBuf::copy_from_slice(b"hello");
2312        assert!(!bytes.is_pooled());
2313    }
2314
2315    #[test]
2316    fn test_buffer_alignment() {
2317        let page = page_size();
2318        let cache_line = cache_line_size();
2319        let mut registry = test_registry();
2320
2321        // Reduce max_per_class under miri (atomics are slow)
2322        cfg_if::cfg_if! {
2323            if #[cfg(miri)] {
2324                let storage_config = BufferPoolConfig {
2325                    max_per_class: NZUsize!(32),
2326                    ..BufferPoolConfig::for_storage()
2327                };
2328                let network_config = BufferPoolConfig {
2329                    max_per_class: NZUsize!(32),
2330                    ..BufferPoolConfig::for_network()
2331                };
2332            } else {
2333                let storage_config = BufferPoolConfig::for_storage();
2334                let network_config = BufferPoolConfig::for_network();
2335            }
2336        }
2337
2338        // Storage preset - page aligned
2339        let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
2340        let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
2341        assert_eq!(
2342            buf.as_mut_ptr() as usize % page,
2343            0,
2344            "storage buffer not page-aligned"
2345        );
2346
2347        // Network preset - cache-line aligned
2348        let network_buffer_pool = BufferPool::new(network_config, &mut registry);
2349        let mut buf = network_buffer_pool.try_alloc(100).unwrap();
2350        assert_eq!(
2351            buf.as_mut_ptr() as usize % cache_line,
2352            0,
2353            "network buffer not cache-line aligned"
2354        );
2355    }
2356}