Skip to main content

commonware_runtime/iobuf/
pool.rs

1//! Buffer pool for efficient I/O operations.
2//!
3//! Provides pooled, aligned buffers that can be reused to reduce allocation
4//! overhead. Buffer alignment is configurable: use page alignment for storage I/O
5//! (required for direct I/O and DMA), or cache-line alignment for network I/O
6//! (reduces fragmentation).
7//!
8//! # Thread Safety
9//!
10//! [`BufferPool`] is `Send + Sync` and can be safely shared across threads.
11//! Allocation and deallocation use atomic counters together with a bounded
12//! lock-free global freelist plus per-thread caches.
13//!
14//! # Pool Lifecycle
15//!
16//! Tracked buffers held by pooled views or cached in thread-local bins keep a
17//! strong reference to the originating size class. Buffers can outlive the
18//! public [`BufferPool`] handle and still return to their original size class.
19//! - Untracked fallback allocations store no class reference and deallocate
20//!   directly when dropped.
21//! - Requests smaller than [`BufferPoolConfig::pool_min_size`] bypass pooling
22//!   entirely and return untracked aligned allocations from both
23//!   [`BufferPool::try_alloc`] and [`BufferPool::alloc`].
24//! - Dropping [`BufferPool`] drains only the shared global freelists, pooled
25//!   views and buffers cached in a live thread's local cache can keep their
26//!   size class alive until they are dropped or the thread exits.
27//!
28//! # Size Classes
29//!
30//! Buffers are organized into power-of-two size classes from `min_size` to
31//! `max_size`. For example, with `min_size = 4096` and `max_size = 32768`:
32//! - Class 0: 4096 bytes
33//! - Class 1: 8192 bytes
34//! - Class 2: 16384 bytes
35//! - Class 3: 32768 bytes
36//!
37//! Allocation requests are rounded up to the next size class. Requests larger
38//! than `max_size` return [`PoolError::Oversized`] from [`BufferPool::try_alloc`],
39//! or fall back to an untracked aligned heap allocation from [`BufferPool::alloc`].
40//!
41//! # Cache Structure
42//!
43//! Each size class uses a two-level allocator:
44//! - a small per-thread local cache for steady-state same-thread reuse
45//! - a shared global freelist for refill and spill between threads
46//!
47//! When a local cache misses, the pool refills a small batch from the global
48//! freelist before attempting to create a new tracked buffer. Returned buffers
49//! first try to re-enter the dropping thread's local cache, spilling a bounded
50//! batch back to the global freelist if needed.
51
52use super::{freelist::Freelist, page_size, IoBufMut};
53use crate::{
54    iobuf::buffer::{PooledBufMut, PooledBuffer},
55    telemetry::metrics::{raw, Counter, CounterFamily, EncodeLabelSet, GaugeFamily, Register},
56};
57use commonware_utils::{NZUsize, NZU32};
58use std::{
59    alloc::Layout,
60    cell::{Cell, UnsafeCell},
61    mem::MaybeUninit,
62    num::{NonZeroU32, NonZeroUsize},
63    ptr,
64    sync::{
65        atomic::{AtomicUsize, Ordering},
66        Arc,
67    },
68};
69
70/// Minimum thread-local cache capacity required before refill/spill batches.
71///
72/// Below this threshold TLS still provides same-thread locality, but batching
73/// would degrade to single-buffer moves and add policy complexity without
74/// amortizing shared-queue traffic.
75const MIN_TLS_BATCH_CAPACITY: usize = 4;
76
77/// Error returned when buffer pool allocation fails.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum PoolError {
80    /// The requested capacity exceeds the maximum buffer size.
81    Oversized,
82    /// The pool is exhausted for the required size class.
83    Exhausted,
84}
85
86impl std::fmt::Display for PoolError {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
90            Self::Exhausted => write!(f, "pool exhausted for required size class"),
91        }
92    }
93}
94
95impl std::error::Error for PoolError {}
96
97/// Policy for sizing each thread's cache within a buffer pool size class.
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub(crate) enum BufferPoolThreadCacheConfig {
100    /// Enable thread-local caching.
101    ///
102    /// `None` derives the per-thread cache size from the pool's per-class
103    /// capacity and expected parallelism, reserving about half of each class
104    /// for the shared freelist. Small per-class budgets may resolve to zero,
105    /// disabling thread-local caching so free buffers do not become stranded in
106    /// other threads.
107    ///
108    /// `Some(n)` uses an exact per-thread cache size for every size class.
109    Enabled(Option<NonZeroUsize>),
110    /// Disable thread-local caching and route all reuse through the shared global freelist.
111    Disabled,
112}
113
114/// Configuration for a buffer pool.
115#[derive(Debug, Clone)]
116pub struct BufferPoolConfig {
117    /// Minimum request size that should use pooled allocation.
118    ///
119    /// Requests smaller than this bypass the pool and use direct aligned
120    /// allocation instead. A value of `0` means all eligible requests use the
121    /// pool.
122    pub pool_min_size: usize,
123    /// Minimum buffer size. Must be >= alignment and a power of two.
124    pub min_size: NonZeroUsize,
125    /// Maximum buffer size. Must be a power of two and >= min_size.
126    pub max_size: NonZeroUsize,
127    /// Maximum number of buffers per size class.
128    ///
129    /// Size-class slots are identified by `u32`, so the per-class capacity is
130    /// capped by this type.
131    pub max_per_class: NonZeroU32,
132    /// Whether to create every tracked buffer during pool construction.
133    ///
134    /// When enabled, each size class creates `max_per_class` buffers and parks
135    /// them in the class-global freelist before the pool is returned. This
136    /// moves allocation cost to startup and makes the first reuse path avoid
137    /// heap allocation.
138    pub prefill: bool,
139    /// Buffer alignment. Must be a power of two.
140    pub alignment: NonZeroUsize,
141    /// Expected number of threads concurrently accessing the pool.
142    ///
143    /// This sizes the shared global freelist stripes. It is also used to derive
144    /// thread-cache capacity when the thread-cache policy is automatic, using
145    /// approximately half of [`Self::max_per_class`] divided across expected
146    /// threads.
147    pub parallelism: NonZeroUsize,
148    /// Policy for sizing the per-thread local cache in each size class.
149    ///
150    /// By default, thread-cache capacity is derived from [`Self::parallelism`].
151    /// [`Self::with_thread_cache_capacity`] uses an exact per-thread cache size.
152    /// [`Self::with_thread_cache_disabled`] bypasses thread-local caches.
153    pub(crate) thread_cache_config: BufferPoolThreadCacheConfig,
154}
155
156impl BufferPoolConfig {
157    /// Network I/O preset: 1KB to 128KB buffers, 4096 per class, not prefilled.
158    ///
159    /// Network operations typically need multiple concurrent buffers per
160    /// connection (message, encoding, encryption) so we allow 4096 buffers per
161    /// size class.
162    pub const fn for_network() -> Self {
163        Self {
164            pool_min_size: 0,
165            min_size: NZUsize!(1024),
166            max_size: NZUsize!(128 * 1024),
167            max_per_class: NZU32!(4096),
168            prefill: false,
169            alignment: NZUsize!(1),
170            parallelism: NZUsize!(1),
171            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
172        }
173    }
174
175    /// Storage I/O preset: `page_size` (usually 4KB) to 8MB buffers, 64 per class,
176    /// not prefilled.
177    pub fn for_storage() -> Self {
178        let page = NZUsize!(page_size());
179        Self {
180            pool_min_size: 0,
181            min_size: page,
182            max_size: NZUsize!(8 * 1024 * 1024),
183            max_per_class: NZU32!(64),
184            prefill: false,
185            // TODO (#2960): this needs to be page/block aligned for O_DIRECT
186            alignment: NZUsize!(1),
187            parallelism: NZUsize!(1),
188            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
189        }
190    }
191
192    /// Returns a copy of this config with a new minimum request size that uses pooling.
193    pub const fn with_pool_min_size(mut self, pool_min_size: usize) -> Self {
194        self.pool_min_size = pool_min_size;
195        self
196    }
197
198    /// Returns a copy of this config with a new minimum buffer size.
199    pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
200        self.min_size = min_size;
201        self
202    }
203
204    /// Returns a copy of this config with a new maximum buffer size.
205    pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
206        self.max_size = max_size;
207        self
208    }
209
210    /// Returns a copy of this config with a new maximum number of buffers per size class.
211    pub const fn with_max_per_class(mut self, max_per_class: NonZeroU32) -> Self {
212        self.max_per_class = max_per_class;
213        self
214    }
215
216    /// Returns a copy of this config with a new expected parallelism.
217    ///
218    /// This controls the minimum global-freelist stripe count, and controls
219    /// thread-cache capacity when the thread-cache policy is automatic. The
220    /// automatic policy reserves about half of each class for the global
221    /// freelist and divides the remaining capacity across expected threads.
222    pub const fn with_parallelism(mut self, parallelism: NonZeroUsize) -> Self {
223        self.parallelism = parallelism;
224        self
225    }
226
227    /// Returns a copy of this config with an explicit per-thread cache size.
228    ///
229    /// Global-freelist striping is set separately by [`Self::with_parallelism`].
230    pub const fn with_thread_cache_capacity(mut self, thread_cache_capacity: NonZeroUsize) -> Self {
231        self.thread_cache_config =
232            BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity));
233        self
234    }
235
236    /// Returns a copy of this config with thread-local caching disabled.
237    ///
238    /// Global-freelist striping is set separately by [`Self::with_parallelism`].
239    pub const fn with_thread_cache_disabled(mut self) -> Self {
240        self.thread_cache_config = BufferPoolThreadCacheConfig::Disabled;
241        self
242    }
243
244    /// Returns a copy of this config with a new prefill setting.
245    pub const fn with_prefill(mut self, prefill: bool) -> Self {
246        self.prefill = prefill;
247        self
248    }
249
250    /// Returns a copy of this config with a new alignment.
251    pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
252        self.alignment = alignment;
253        self
254    }
255
256    /// Returns a copy of this config sized for an approximate tracked-memory budget.
257    ///
258    /// This computes `max_per_class` as:
259    ///
260    /// `ceil(budget_bytes / sum(size_class_bytes))`
261    ///
262    /// where `size_class_bytes` includes every class from `min_size` to `max_size`.
263    /// This always rounds up to at least one buffer per size class, so the
264    /// resulting estimated capacity may exceed `budget_bytes`.
265    ///
266    /// # Panics
267    ///
268    /// - `min_size` is not a power of two
269    /// - `max_size` is not a power of two
270    /// - `max_size < min_size`
271    /// - the derived per-class capacity does not fit in `u32`.
272    pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
273        self.validate_size_class_bounds();
274
275        let mut class_bytes = 0usize;
276        let min_size = self.min_size.get();
277        for i in 0..Self::num_classes(min_size, self.max_size.get()) {
278            class_bytes = class_bytes.saturating_add(Self::class_size(min_size, i));
279        }
280        if class_bytes == 0 {
281            return self;
282        }
283        let max_per_class = u32::try_from(budget_bytes.get().div_ceil(class_bytes))
284            .expect("max_per_class must fit in u32 slot ids");
285        self.max_per_class =
286            NonZeroU32::new(max_per_class).expect("max_per_class must be non-zero");
287        self
288    }
289
290    /// Validates the size-class bounds, panicking on invalid values.
291    ///
292    /// # Panics
293    ///
294    /// - `min_size` is not a power of two
295    /// - `max_size` is not a power of two
296    /// - `max_size < min_size`
297    fn validate_size_class_bounds(&self) {
298        let min_size = self.min_size.get();
299        let max_size = self.max_size.get();
300
301        assert!(
302            min_size.is_power_of_two(),
303            "min_size must be a power of two"
304        );
305        assert!(
306            max_size.is_power_of_two(),
307            "max_size must be a power of two"
308        );
309        assert!(max_size >= min_size, "max_size must be >= min_size");
310    }
311
312    /// Validates the configuration, panicking on invalid values.
313    ///
314    /// # Panics
315    ///
316    /// - `alignment` is not a power of two
317    /// - `min_size` is not a power of two
318    /// - `max_size` is not a power of two
319    /// - `min_size < alignment`
320    /// - `max_size < min_size`
321    /// - `pool_min_size > min_size`
322    /// - explicit `thread_cache_capacity > max_per_class`
323    fn validate(&self) {
324        self.validate_size_class_bounds();
325        assert!(
326            self.alignment.is_power_of_two(),
327            "alignment must be a power of two"
328        );
329        assert!(
330            self.min_size >= self.alignment,
331            "min_size ({}) must be >= alignment ({})",
332            self.min_size,
333            self.alignment
334        );
335        assert!(
336            self.pool_min_size <= self.min_size.get(),
337            "pool_min_size ({}) must be <= min_size ({})",
338            self.pool_min_size,
339            self.min_size
340        );
341        if let BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity)) =
342            self.thread_cache_config
343        {
344            assert!(
345                thread_cache_capacity.get() <= self.max_per_class.get() as usize,
346                "thread_cache_capacity ({}) must be <= max_per_class ({})",
347                thread_cache_capacity,
348                self.max_per_class
349            );
350        }
351    }
352
353    /// Returns the number of size classes between validated bounds.
354    #[inline]
355    const fn num_classes(min_size: usize, max_size: usize) -> usize {
356        // Since sizes are powers of two, trailing zeros is the size-class
357        // exponent
358        (max_size.trailing_zeros() - min_size.trailing_zeros() + 1) as usize
359    }
360
361    /// Returns the buffer size for a validated size-class index.
362    #[inline]
363    const fn class_size(min_size: usize, index: usize) -> usize {
364        min_size << index
365    }
366
367    /// Resolves the effective per-thread cache size for each size class.
368    ///
369    /// Derived capacities divide half of the class budget across the expected
370    /// parallelism so cross-thread reuse remains effective. Small class budgets
371    /// may resolve to zero.
372    fn resolve_thread_cache_capacity(&self) -> usize {
373        match self.thread_cache_config {
374            BufferPoolThreadCacheConfig::Enabled(None) => {
375                let max_per_class = self.max_per_class.get() as usize;
376                let effective_threads = self.parallelism.get().min(max_per_class);
377                max_per_class / (2 * effective_threads)
378            }
379            BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity)) => {
380                thread_cache_capacity.get()
381            }
382            BufferPoolThreadCacheConfig::Disabled => 0,
383        }
384    }
385}
386
387/// Label for buffer pool metrics, identifying the size class.
388#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
389struct SizeClassLabel {
390    size_class: u64,
391}
392
393/// Metrics for the buffer pool.
394struct PoolMetrics {
395    /// Number of tracked buffers created for the size class.
396    created: GaugeFamily<SizeClassLabel>,
397    /// Total number of failed allocations (pool exhausted).
398    exhausted_total: CounterFamily<SizeClassLabel>,
399    /// Total number of oversized allocation requests.
400    oversized_total: Counter,
401}
402
403impl PoolMetrics {
404    fn new(registry: &mut impl Register) -> Self {
405        Self {
406            created: registry.register(
407                "buffer_pool_created",
408                "Number of tracked buffers created for the pool",
409                raw::Family::default(),
410            ),
411            exhausted_total: registry.register(
412                "buffer_pool_exhausted_total",
413                "Total number of failed allocations due to pool exhaustion",
414                raw::Family::default(),
415            ),
416            oversized_total: registry.register(
417                "buffer_pool_oversized_total",
418                "Total number of allocation requests exceeding max buffer size",
419                raw::Counter::default(),
420            ),
421        }
422    }
423}
424
425/// Per-size-class state.
426///
427/// Each class is a small two-level allocator:
428/// - a shared global freelist for tracked buffers visible to all threads
429/// - a per-thread local cache for same-thread reuse
430///
431/// The global freelist owns the allocation layout, slot reservation counter,
432/// and parking cells for this class. A tracked buffer can be globally parked,
433/// owned by a pooled backing, or parked in one thread's local cache, but the
434/// slot always belongs to this `SizeClass`.
435///
436/// Liveness follows the buffer ownership state. Global freelist entries rely on
437/// the pool's [`SizeClassHandle`] while the pool is alive and are drained when
438/// the pool is dropped. Pooled backing values carry a [`SizeClassLease`].
439/// Thread-local cache entries use banked strong references owned by the cache.
440/// Those non-global states are what allow a buffer to outlive the public
441/// [`BufferPool`] handle and still return to the correct freelist.
442///
443/// The freelist is the only place that deallocates tracked buffers. Returning a
444/// buffer to the freelist transfers buffer ownership back to that freelist and
445/// releases the pooled-backing lease or banked strong reference that kept the
446/// class alive while the buffer was outside the global freelist.
447///
448/// Allocation prefers the local cache, then refills from the global freelist,
449/// and only creates a new tracked buffer when no free buffer is available and
450/// the class still has remaining capacity.
451pub(super) struct SizeClass {
452    /// Dense global identifier for the TLS cache registry.
453    class_id: usize,
454    /// The buffer size for this class.
455    size: usize,
456    /// Global free list of tracked buffers available for reuse.
457    global: Freelist,
458    /// Maximum number of buffers retained in the current thread's local bin.
459    thread_cache_capacity: usize,
460}
461
462// SAFETY: shared state in `SizeClass` is synchronized through atomics and the
463// global free set. Per-thread bins are stored in thread-local registries and only
464// accessed by the current thread.
465unsafe impl Send for SizeClass {}
466// SAFETY: see above.
467unsafe impl Sync for SizeClass {}
468
469/// Non-owning raw identity for a size class.
470///
471/// # Size-class lifetime model
472///
473/// A [`SizeClass`] owns the [`Freelist`] for one buffer size class. The
474/// freelist creates tracked [`PooledBuffer`]s, owns the allocation layout
475/// needed to deallocate them, and is the only place that releases their memory.
476/// A `PooledBuffer` outside the freelist does not carry enough information to
477/// deallocate itself, so it must keep its originating `SizeClass` alive until
478/// it can return to that freelist.
479///
480/// The pool has three buffer states, and those states determine where the
481/// strong size-class references live.
482///
483/// - Global freelist: the buffer is parked in [`SizeClass::global`] and carries
484///   no per-buffer strong reference. While the public pool exists, the
485///   [`SizeClassHandle`] in [`BufferPoolInner::classes`] keeps the class alive.
486/// - Pooled view: the buffer is owned by mutable or immutable I/O view state
487///   and carries one [`SizeClassLease`], which is one strong reference to the
488///   class.
489/// - Thread-local cache: the [`TlsSizeClassCache`] stores the
490///   [`SizeClassToken`] once, and owns one banked strong reference for each
491///   initialized [`TlsSizeClassCacheEntry`]. A banked reference is an owned
492///   `Arc<SizeClass>` reference counted by TLS cache state instead of
493///   represented by a `SizeClassLease` value in each entry. Increasing `len`
494///   banks one reference, decreasing `len` transfers one reference back into a
495///   `SizeClassLease` or releases it to the global freelist.
496///
497/// Moving a buffer from the global freelist to pooled view or TLS state retains
498/// one class reference. Moving it back to the global freelist releases that
499/// reference. Moving between pooled view and TLS state transfers the same
500/// reference without touching the refcount.
501///
502/// Dropping the public [`BufferPool`] drains globally parked buffers, then
503/// drops its `SizeClassHandle`s. Pooled views and non-empty TLS caches may keep
504/// the `SizeClass` alive after that point. Empty TLS caches may still remember
505/// a token value, but with no banked references that token is only an inert
506/// identity value and must not be dereferenced.
507///
508/// This is the one raw pointer shape used by all pool-owned, pooled view, and
509/// thread-local references to a [`SizeClass`]. The pointer is always derived
510/// from [`Arc::into_raw`].
511///
512/// `SizeClassToken` itself owns nothing. It is only an identity token and raw
513/// pointer accepted by the `Arc` refcount APIs:
514/// - [`SizeClassHandle`] pairs a token with ownership of one strong reference.
515/// - [`SizeClassLease`] pairs a token with ownership of one strong reference.
516/// - [`TlsSizeClassCache`] stores a token plus `len` banked strong references.
517///
518/// Because the token is non-owning, it may be stale when held by an empty TLS
519/// cache. Code may dereference it or adjust the strong count only when another
520/// invariant proves the allocation is still live. For example,
521/// [`SizeClassHandle`] and [`SizeClassLease`] prove liveness through owned
522/// strong references, and a non-empty [`TlsSizeClassCache`] proves liveness
523/// through its banked entries.
524#[derive(Clone, Copy, Debug, PartialEq, Eq)]
525struct SizeClassToken {
526    ptr: ptr::NonNull<SizeClass>,
527}
528
529impl SizeClassToken {
530    /// Creates a token and owns the initial strong reference for `class`.
531    ///
532    /// The returned token is non-owning in the type system, but the raw pointer
533    /// still represents one strong reference. The caller must wrap it in an
534    /// owning type, such as [`SizeClassHandle`], or otherwise arrange for that
535    /// strong reference to be released.
536    fn new(class: SizeClass) -> Self {
537        let ptr = Arc::into_raw(Arc::new(class)).cast_mut();
538        // SAFETY: `Arc::into_raw` never returns null.
539        let ptr = unsafe { ptr::NonNull::new_unchecked(ptr) };
540        Self { ptr }
541    }
542
543    /// Returns the referenced size class.
544    ///
545    /// # Safety
546    ///
547    /// Some owner must currently hold a strong reference for this token.
548    #[inline(always)]
549    const unsafe fn as_ref(&self) -> &SizeClass {
550        // SAFETY: guaranteed by the caller.
551        unsafe { self.ptr.as_ref() }
552    }
553
554    /// Retains one strong reference for this token.
555    ///
556    /// # Safety
557    ///
558    /// Some owner must currently hold a strong reference for this token.
559    #[inline(always)]
560    unsafe fn retain(self) {
561        // SAFETY: guaranteed by the caller.
562        unsafe { Arc::increment_strong_count(self.ptr.as_ptr()) };
563    }
564
565    /// Releases one owned strong reference for this token.
566    ///
567    /// # Safety
568    ///
569    /// The caller must own one strong reference represented by this token.
570    #[inline(always)]
571    unsafe fn release(self) {
572        // SAFETY: guaranteed by the caller.
573        unsafe { Arc::decrement_strong_count(self.ptr.as_ptr()) };
574    }
575}
576
577/// Owning pool reference to a size class.
578///
579/// This is the pool's strong `Arc<SizeClass>` reference represented by a
580/// [`SizeClassToken`]. `SizeClassHandle` is the long-lived owner for a class
581/// while the [`BufferPoolInner`] exists. Dropping the handle releases that
582/// pool-owned strong reference. A class may still outlive the handle if pooled
583/// backing values or thread-local cache entries own additional references
584/// through [`SizeClassLease`] or banked TLS refs.
585///
586/// Functionally this is an `Arc<SizeClass>` stored in raw-token form. It exists
587/// to keep the pool-owned reference alive and to provide a live token for
588/// allocation paths that need to retain pooled-backing or TLS-banked
589/// references. The raw form keeps the already-loaded class pointer usable for
590/// explicit refcount operations without calling [`Arc::as_ptr`] or storing a
591/// second token alongside an `Arc`.
592struct SizeClassHandle {
593    token: SizeClassToken,
594}
595
596// SAFETY: `SizeClassHandle` owns a strong reference to a `SizeClass`, which is
597// `Send`.
598unsafe impl Send for SizeClassHandle {}
599// SAFETY: same argument as `Send`, shared access to `SizeClass` is synchronized.
600unsafe impl Sync for SizeClassHandle {}
601
602impl SizeClassHandle {
603    /// Creates a new size class and takes ownership of its initial strong ref.
604    ///
605    /// If `prefill` is true, the global freelist creates `max` buffers upfront
606    /// and makes them immediately available for reuse.
607    fn new(
608        class_id: usize,
609        size: usize,
610        alignment: usize,
611        max: NonZeroU32,
612        parallelism: NonZeroUsize,
613        thread_cache_capacity: usize,
614        prefill: bool,
615    ) -> Self {
616        let layout = Layout::from_size_align(size, alignment).expect("alignment is a power of two");
617        let freelist = Freelist::new(max, parallelism, layout, prefill);
618        let class = SizeClass {
619            class_id,
620            size,
621            global: freelist,
622            thread_cache_capacity,
623        };
624        Self {
625            token: SizeClassToken::new(class),
626        }
627    }
628
629    /// Creates a new tracked buffer and retains this size class for its slot.
630    #[inline(always)]
631    fn try_create(&self, zeroed: bool) -> Option<(u32, PooledBuffer, SizeClassLease)> {
632        let (slot, buffer) = self.global.try_create(zeroed)?;
633        let class = SizeClassLease::retain(self);
634        Some((slot, buffer, class))
635    }
636}
637
638impl Drop for SizeClassHandle {
639    fn drop(&mut self) {
640        // SAFETY: this handle owns one strong reference for `self.token`.
641        unsafe { self.token.release() };
642    }
643}
644
645impl std::ops::Deref for SizeClassHandle {
646    type Target = SizeClass;
647
648    #[inline(always)]
649    fn deref(&self) -> &Self::Target {
650        // SAFETY: this handle owns one strong reference for `self.token`.
651        unsafe { self.token.as_ref() }
652    }
653}
654
655/// Owned size-class reference for a pooled buffer outside the global freelist.
656///
657/// A pooled buffer outside the global freelist must keep its originating
658/// [`SizeClass`] alive so it can be returned after the [`BufferPool`] handle is
659/// dropped. This is one strong `Arc<SizeClass>` reference represented by a
660/// [`SizeClassToken`], with retain and release performed explicitly at the
661/// boundaries where a buffer enters or leaves global pool state.
662///
663/// Lifetime-wise this is the same kind of reference as [`SizeClassHandle`]:
664/// both own exactly one strong reference for a token. The types are separate
665/// because they live in different state machines. `SizeClassHandle` is ordinary
666/// RAII ownership for the pool's class vector. `SizeClassLease` is hot-path
667/// pooled view ownership that must be explicitly transferred into TLS cache
668/// state or returned to the global freelist.
669///
670/// The raw representation matters because the hot path mostly transfers
671/// ownership between pooled view state and this thread's local cache. A real
672/// `Arc<SizeClass>` field is pointer-sized too, but it is a non-`Copy` value
673/// with drop glue. Even when the strong count would not change, moving it
674/// through pooled buffer and cache-entry structs makes the compiler preserve
675/// destructor paths for those structs. `SizeClassLease` has no automatic drop:
676/// moving between pooled view and local-cache state is a plain pointer
677/// transfer, and only explicit calls such as [`Self::return_global`] adjust the
678/// strong count.
679///
680/// A lease must be consumed by one of those explicit transitions, such as
681/// [`Self::into_banked`] or [`Self::return_global`]. Because this type
682/// intentionally has no `Drop` implementation, simply dropping a lease value
683/// would leak the strong reference. This keeps hot transfers free of drop glue,
684/// but means every owner must complete one of the explicit transitions.
685///
686/// Thread-local cache entries do not store a lease per entry. The cache stores
687/// the class token once and owns one banked strong reference for each
688/// initialized entry. Popping from the local cache materializes a lease from
689/// one of those banked references without touching the strong count.
690///
691/// Globally parked buffers do not carry a class reference: taking from the
692/// global freelist retains the class, and returning to the global freelist
693/// releases it.
694#[must_use]
695pub(crate) struct SizeClassLease {
696    token: SizeClassToken,
697}
698
699// SAFETY: `SizeClassLease` owns one strong reference to a `SizeClass`, which is
700// `Send`.
701unsafe impl Send for SizeClassLease {}
702// SAFETY: same argument as `Send`, shared access to `SizeClass` is synchronized.
703unsafe impl Sync for SizeClassLease {}
704
705impl SizeClassLease {
706    /// Converts one banked class reference into a lease.
707    ///
708    /// This does not retain the class. It only changes how an already-owned
709    /// strong reference is represented: from TLS cache state into a
710    /// `SizeClassLease` value.
711    ///
712    /// # Safety
713    ///
714    /// The caller must own one banked strong reference for `class.token`, and
715    /// that retained reference must be transferred to the returned lease. This
716    /// must not consume the pool-owned reference held by `class` itself.
717    #[inline(always)]
718    const unsafe fn from_banked(class: &SizeClassHandle) -> Self {
719        Self { token: class.token }
720    }
721
722    /// Retains `class` for a buffer leaving the global freelist.
723    #[inline(always)]
724    fn retain(class: &SizeClassHandle) -> Self {
725        let token = class.token;
726        // SAFETY: the borrowed `class` owns one strong reference for `token`.
727        unsafe { token.retain() };
728        Self { token }
729    }
730
731    /// Transfers this lease into a TLS cache entry.
732    ///
733    /// This does not release the class. It consumes the lease and relies on the
734    /// caller to record one additional banked reference in TLS cache state,
735    /// normally by storing an entry and increasing the cache length. The cache
736    /// must later materialize or release exactly one lease for that entry.
737    ///
738    /// This is a no-op at runtime, it exists to mark the ownership transition.
739    #[inline(always)]
740    const fn into_banked(self) {}
741
742    /// Returns the referenced size class.
743    ///
744    /// The token is valid because `SizeClassLease` owns one strong reference.
745    #[inline(always)]
746    const fn class(&self) -> &SizeClass {
747        // SAFETY: guaranteed by the ownership invariant documented on
748        // `SizeClassLease`.
749        unsafe { self.token.as_ref() }
750    }
751
752    /// Returns the buffer size for this lease's size class.
753    #[inline(always)]
754    pub(crate) const fn size(&self) -> usize {
755        self.class().size
756    }
757
758    /// Returns a buffer to this class's global freelist and releases the class
759    /// reference.
760    ///
761    /// The buffer is parked before the strong reference is released. If this is
762    /// the last outstanding reference after the public pool has been dropped,
763    /// dropping the `SizeClass` will then drain the just-parked buffer.
764    #[inline(always)]
765    fn return_global(self, slot: u32, buffer: PooledBuffer) {
766        self.class().global.put(slot, buffer);
767        // SAFETY: this lease owns one strong reference.
768        unsafe { self.token.release() };
769    }
770}
771
772/// Free tracked buffer owned by a thread-local size-class cache.
773///
774/// This is allocator cache state, not a caller-visible pooled view. While an
775/// entry is held here, the buffer is owned by the current thread and is not
776/// visible to the class-global freelist.
777///
778/// The `slot` identifies the buffer within its [`SizeClass`]. The enclosing
779/// cache owns one banked size-class reference for this entry. The entry itself
780/// intentionally stores only `(buffer, slot)` so local pop/push does not move a
781/// class pointer per buffer.
782struct TlsSizeClassCacheEntry {
783    buffer: PooledBuffer,
784    slot: u32,
785}
786
787/// Per-thread cache for one size class's tracked buffers.
788///
789/// Each instance is stored in [`TlsSizeClassCaches`] under one global
790/// [`SizeClass::class_id`], so all entries in the cache belong to the same size
791/// class. The cache owns full [`PooledBuffer`] values while they are local,
792/// returning them to the global freelist happens only on miss refill, overflow,
793/// explicit flush, or thread exit.
794///
795/// `class` is a non-owning token for this cache's size class. It can be stale
796/// while `len == 0`, because an empty cache does not keep its pool alive. When
797/// `len > 0`, each initialized entry in `entries[..len]` owns one banked
798/// size-class reference, which keeps the pointed-to class alive. The entry
799/// itself stays small (`buffer, slot`), popping it materializes a
800/// [`SizeClassLease`] from one banked reference.
801///
802/// As described in [`SizeClassToken`], a banked reference is represented by
803/// cache state rather than by a value stored in the entry. Changing `len` is
804/// therefore an ownership transition as well as a stack operation.
805///
806/// An empty cache may keep a stale token only for identity checks. It must not
807/// dereference the token or adjust its strong count until a live
808/// [`SizeClassHandle`], [`SizeClassLease`], or banked entry proves the class is
809/// still alive.
810///
811/// The hot steady-state allocation path pops an entry from `entries`, and the
812/// hot return path pushes one back while there is room.
813struct TlsSizeClassCache {
814    class: SizeClassToken,
815    entries: Box<[MaybeUninit<TlsSizeClassCacheEntry>]>,
816    len: usize,
817    capacity: usize,
818}
819
820impl TlsSizeClassCache {
821    /// Creates a new empty cache with the given maximum thread-cache size.
822    ///
823    /// The cache stores `class` for identity, but starts with `len == 0` and
824    /// therefore owns no banked size-class references.
825    fn new(class: SizeClassToken, capacity: usize) -> Self {
826        let entries = (0..capacity)
827            .map(|_| MaybeUninit::uninit())
828            .collect::<Vec<_>>()
829            .into_boxed_slice();
830        Self {
831            class,
832            entries,
833            len: 0,
834            capacity,
835        }
836    }
837
838    /// Removes and returns one reusable buffer entry.
839    ///
840    /// Local hits are served directly from the cache. On a local miss, small
841    /// caches take only the buffer being returned to the caller. Larger caches
842    /// batch-take from the global freelist, return the first claimed buffer,
843    /// and retain the rest locally for future allocations.
844    ///
845    /// The returned lease is materialized from the banked size-class reference
846    /// associated with the returned entry.
847    #[inline(always)]
848    fn pop(&mut self, class: &SizeClassHandle) -> Option<(TlsSizeClassCacheEntry, SizeClassLease)> {
849        if let Some(entry) = self.pop_local() {
850            // SAFETY: the popped entry consumed one banked reference owned by
851            // this cache. Transfer that reference to the returned lease.
852            let lease = unsafe { SizeClassLease::from_banked(class) };
853            return Some((entry, lease));
854        }
855
856        // Take from the class-global freelist on a local miss.
857        self.pop_global(class)
858    }
859
860    /// Removes and returns one entry from this thread's local stack.
861    ///
862    /// This touches only thread-local cache state. A returned entry consumes
863    /// one banked size-class reference from this cache, the caller must
864    /// materialize or release that reference.
865    #[inline(always)]
866    fn pop_local(&mut self) -> Option<TlsSizeClassCacheEntry> {
867        if self.len == 0 {
868            return None;
869        }
870
871        self.len -= 1;
872        // SAFETY: entries in `0..self.len` are initialized. Decrementing `len`
873        // above makes this slot uninitialized again.
874        Some(unsafe { self.entries.get_unchecked(self.len).assume_init_read() })
875    }
876
877    /// Takes from the class-global freelist after the local stack misses.
878    ///
879    /// Every claimed global entry gets one retained class reference. The first
880    /// claimed entry is returned with that reference materialized as a
881    /// [`SizeClassLease`], additional claimed entries are parked in this cache
882    /// and counted by `len`.
883    ///
884    /// This is separate from [`Self::pop`] so the steady-state allocation hot
885    /// path can inline only the local cache hit. We annotate with `inline(never)`
886    /// to keep the refill and batching code out of `BufferPoolInner::try_alloc`,
887    /// reducing hot-path code size and register pressure.
888    #[inline(never)]
889    fn pop_global(
890        &mut self,
891        class: &SizeClassHandle,
892    ) -> Option<(TlsSizeClassCacheEntry, SizeClassLease)> {
893        // Tiny caches do not batch enough to justify the wider global claim.
894        // Keep their miss path equivalent to a single take.
895        if self.capacity < MIN_TLS_BATCH_CAPACITY {
896            return class.global.take().map(|(slot, buffer)| {
897                let lease = SizeClassLease::retain(class);
898                (TlsSizeClassCacheEntry { buffer, slot }, lease)
899            });
900        }
901
902        // Refill larger caches to half capacity. That leaves room for future
903        // same-thread returns while still amortizing the global atomic scan
904        // over several future local pops.
905        let mut entry = None;
906        let take = self.capacity / 2;
907        class.global.take_batch(take, |slot, buffer| {
908            // Each claimed global entry becomes either the returned lease or a
909            // local cache entry, so each needs one retained class reference.
910            let cache_entry = TlsSizeClassCacheEntry { buffer, slot };
911            let lease = SizeClassLease::retain(class);
912            if entry.is_none() {
913                // Hand the first claimed buffer to the allocation that missed
914                // locally. Additional claimed buffers refill the local cache.
915                entry = Some((cache_entry, lease));
916            } else {
917                // The take count is derived from the target occupancy, so
918                // refill cannot overflow the local cache. Push directly to
919                // avoid the spill checks used by return-to-cache.
920                self.push_local(cache_entry, lease);
921            }
922        });
923
924        entry
925    }
926
927    /// Pushes an entry into the local cache, spilling to global if full.
928    ///
929    /// Small local caches prioritize same-thread locality and route overflow
930    /// directly to the global freelist. Once the local cache is large enough to
931    /// batch effectively, half the entries are drained to amortize global queue
932    /// traffic across future returns.
933    #[inline(always)]
934    fn push(&mut self, lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
935        let entry = TlsSizeClassCacheEntry { buffer, slot };
936
937        if self.len < self.capacity {
938            // Keep the returned entry local while there is room.
939            self.push_local(entry, lease);
940            return;
941        }
942
943        // Handle overflow when the local stack is full.
944        self.push_full(entry, lease);
945    }
946
947    /// Pushes one entry onto this thread's local stack.
948    ///
949    /// The caller must ensure the stack has room. `lease` becomes the banked
950    /// size-class reference represented by `entry`.
951    #[inline(always)]
952    fn push_local(&mut self, entry: TlsSizeClassCacheEntry, lease: SizeClassLease) {
953        lease.into_banked();
954
955        // SAFETY: the caller ensured `self.len < self.capacity`, so this slot
956        // is in bounds and currently uninitialized.
957        unsafe {
958            self.entries.get_unchecked_mut(self.len).write(entry);
959        }
960        self.len += 1;
961    }
962
963    /// Handles a push after the local stack fills.
964    ///
965    /// Very small caches return the incoming entry directly to the global
966    /// freelist. Larger caches spill older local entries in a batch, then keep
967    /// the incoming entry local so the dropping thread retains the freshest
968    /// buffer.
969    ///
970    /// This is separate from [`Self::push`] so the steady-state return hot path
971    /// can inline only the local cache push. We annotate with `inline(never)`
972    /// to keep the spill and batching code out of pooled buffer drop when the
973    /// local cache has room.
974    #[inline(never)]
975    fn push_full(&mut self, entry: TlsSizeClassCacheEntry, lease: SizeClassLease) {
976        // Very small caches cannot spill enough entries to amortize a batch
977        // insert, so overflow goes straight to the global freelist.
978        if self.capacity < MIN_TLS_BATCH_CAPACITY {
979            lease.return_global(entry.slot, entry.buffer);
980            return;
981        }
982
983        // Spill half the cache to global to make room.
984        let spill = self.len.min(self.capacity / 2).max(1);
985        let end = self.len;
986        let start = end - spill;
987        // Stop tracking slots before moving them out.
988        self.len = start;
989
990        lease
991            .class()
992            .global
993            .put_batch((start..end).rev().map(|index| {
994                // SAFETY: `start..end` was initialized before `len` was lowered
995                // to `start`. Reading each slot moves it out and leaves the
996                // slot uninitialized.
997                let entry = unsafe { self.entries.as_mut_ptr().add(index).read().assume_init() };
998                // SAFETY: this drained entry carried one banked reference. The
999                // incoming class lease keeps the size class live while
1000                // `put_batch` parks the spilled entries.
1001                unsafe { self.class.release() };
1002                (entry.slot, entry.buffer)
1003            }));
1004
1005        // Keep the incoming entry local after making room.
1006        self.push_local(entry, lease);
1007    }
1008}
1009
1010impl Drop for TlsSizeClassCache {
1011    fn drop(&mut self) {
1012        if self.len == 0 {
1013            return;
1014        }
1015
1016        let entries = self.entries.as_mut_ptr();
1017        // SAFETY: each initialized entry carries one banked class reference out
1018        // of this cache. Because `self.len > 0`, those references keep `class`
1019        // live while the entries are parked.
1020        unsafe { self.class.as_ref() }
1021            .global
1022            .put_batch((0..self.len).rev().map(move |index| {
1023                // SAFETY: `0..self.len` is initialized. Reading each slot moves
1024                // it out and leaves the slot uninitialized.
1025                let entry = unsafe { entries.add(index).read().assume_init() };
1026                (entry.slot, entry.buffer)
1027            }));
1028
1029        for _ in 0..self.len {
1030            // SAFETY: each drained entry was returned to the global freelist,
1031            // so its banked reference can be released.
1032            unsafe { self.class.release() };
1033        }
1034    }
1035}
1036
1037/// Registry of one thread's per-size-class caches.
1038///
1039/// A [`BufferPool`] keeps its size classes in a vector, so allocation resolves
1040/// a request to an index within that pool. Thread-local caches need a different
1041/// key because a thread can use more than one pool. They use the process-global
1042/// [`SizeClass::class_id`] assigned by [`NEXT_SIZE_CLASS_ID`], so index `0` in
1043/// one pool cannot collide with index `0` in another pool.
1044///
1045/// The registry is a sparse vector indexed by `class_id`. Each initialized
1046/// entry is a [`TlsSizeClassCache`] for that global size class. Missing entries
1047/// mean this thread has not used that size class yet. Holes can remain for the
1048/// lifetime of the thread because class ids are monotonic and never reused.
1049/// Empty initialized caches can also remain after their pool has been dropped.
1050/// Their class token is inert while the cache is empty. If the class is still
1051/// live because a pooled buffer is outstanding, a later return of that buffer to
1052/// this same thread can bank a fresh reference and make the cache usable again.
1053///
1054/// We intentionally use `Vec<Option<...>>` because class ids are dense enough
1055/// for direct indexing to be cheaper than hashing, but a thread may initialize
1056/// only a subset of live size classes. This keeps the TLS-hit path to a bounds
1057/// check and an initialized-entry check, with no synchronization.
1058struct TlsSizeClassCaches {
1059    bins: Vec<Option<TlsSizeClassCache>>,
1060}
1061
1062impl TlsSizeClassCaches {
1063    /// Creates an empty registry.
1064    const fn new() -> Self {
1065        Self { bins: Vec::new() }
1066    }
1067
1068    /// Returns the cache for the given class, creating it lazily on first use.
1069    ///
1070    /// The caller must provide a live token from a [`SizeClassHandle`] or
1071    /// [`SizeClassLease`]. A missing cache copies that token for identity only
1072    /// and starts with no banked references. The first local push or global
1073    /// refill is what banks references for entries in that cache.
1074    #[inline(always)]
1075    fn get_or_init(
1076        &mut self,
1077        class: SizeClassToken,
1078        class_id: usize,
1079        capacity: usize,
1080    ) -> &mut TlsSizeClassCache {
1081        if class_id < self.bins.len() && self.bins[class_id].is_some() {
1082            return self.bins[class_id]
1083                .as_mut()
1084                .expect("class cache was checked as initialized");
1085        }
1086
1087        self.init(class, class_id, capacity)
1088    }
1089
1090    /// Initializes and returns the cache for `class_id`.
1091    ///
1092    /// This is separate from [`Self::get_or_init`] so the steady-state TLS hit
1093    /// can inline only the existing-cache lookup. We annotate with
1094    /// `inline(never)` to keep the resize and allocation path out of pooled
1095    /// allocation and drop.
1096    #[inline(never)]
1097    fn init(
1098        &mut self,
1099        class: SizeClassToken,
1100        class_id: usize,
1101        capacity: usize,
1102    ) -> &mut TlsSizeClassCache {
1103        if class_id >= self.bins.len() {
1104            self.bins.resize_with(class_id + 1, || None);
1105        }
1106        self.bins[class_id].get_or_insert_with(|| TlsSizeClassCache::new(class, capacity))
1107    }
1108
1109    /// Returns an initialized cache without creating a missing one.
1110    #[inline(always)]
1111    fn get(&mut self, class_id: usize) -> Option<&mut TlsSizeClassCache> {
1112        self.bins.get_mut(class_id).and_then(Option::as_mut)
1113    }
1114}
1115
1116impl Drop for TlsSizeClassCaches {
1117    fn drop(&mut self) {
1118        let this = self as *mut Self;
1119        BufferPoolThreadCache::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| {
1120            if fast.get() == this {
1121                fast.set(ptr::null_mut());
1122            }
1123        });
1124    }
1125}
1126
1127/// Access to the calling thread's local [`BufferPool`] caches.
1128///
1129/// This type hides the TLS layout used by pooled allocation and return. The
1130/// main TLS key owns the registry. It has a destructor, so thread exit drops
1131/// the registry and each `TlsSizeClassCache` flushes its remaining entries to
1132/// the class-global freelist.
1133///
1134/// Steady-state allocation and return first read `TLS_SIZE_CLASS_CACHES_FAST`.
1135/// If it points at this thread's registry and the requested class cache is
1136/// initialized, the caller touches only thread-local memory. Missing TLS state
1137/// routes through `cache_slow` or `push_slow`, which access the owning TLS key,
1138/// install the fast pointer, and lazily initialize the class cache.
1139///
1140/// Rust's access path for TLS values with destructors includes checks for
1141/// access during or after destruction. Those checks are correct, but they are
1142/// expensive on the hot pooled allocation/drop path. After first checked
1143/// access, we cache a raw pointer to the same registry in a destructor-free TLS
1144/// key and use that pointer for steady-state access.
1145///
1146/// If the checked key is unavailable during thread-local destruction, cache
1147/// access returns `None` and callers use the class-global freelist instead.
1148pub struct BufferPoolThreadCache;
1149
1150impl BufferPoolThreadCache {
1151    thread_local! {
1152        // Owns this thread's cache registry and drops it during thread exit.
1153        static TLS_SIZE_CLASS_CACHES: UnsafeCell<TlsSizeClassCaches> =
1154            const { UnsafeCell::new(TlsSizeClassCaches::new()) };
1155
1156        // Performance-only pointer to the same registry. This key has no
1157        // destructor, so the hot allocation/drop path avoids Rust's
1158        // destructor-aware access path for `TLS_SIZE_CLASS_CACHES`.
1159        static TLS_SIZE_CLASS_CACHES_FAST: Cell<*mut TlsSizeClassCaches> =
1160            const { Cell::new(ptr::null_mut()) };
1161    }
1162
1163    /// Flushes all local caches for the current thread into the global freelists.
1164    pub fn flush() {
1165        // If the owning TLS registry is unavailable during thread exit, this
1166        // is a no-op. The registry's own drop path will flush any remaining
1167        // entries.
1168        let _ = Self::TLS_SIZE_CLASS_CACHES.try_with(|caches| {
1169            // SAFETY: this TLS value is only ever accessed by the current thread.
1170            let caches = unsafe { &mut *caches.get() };
1171            for cache in caches.bins.iter_mut() {
1172                let _ = cache.take();
1173            }
1174        });
1175    }
1176
1177    /// Returns a buffer to the current thread's local cache for the given
1178    /// size class, spilling to the global freelist if the cache is full.
1179    ///
1180    /// The hot path uses only an already-initialized cache from the fast TLS
1181    /// pointer. If the fast pointer is missing, or this thread has not
1182    /// initialized the size class yet, [`Self::push_slow`] performs the checked
1183    /// TLS access and creates the local cache. The returned lease is live proof
1184    /// that initialization is safe. During thread-local teardown, checked TLS
1185    /// access can fail, in that case the buffer falls back to the global
1186    /// freelist.
1187    #[inline(always)]
1188    pub(super) fn push(lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
1189        let class = lease.class();
1190        if class.thread_cache_capacity == 0 {
1191            lease.return_global(slot, buffer);
1192            return;
1193        }
1194
1195        let caches = Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.get());
1196        if !caches.is_null() {
1197            // SAFETY: the fast pointer is set only from this thread's
1198            // `TLS_SIZE_CLASS_CACHES` value and cleared before that value
1199            // drops.
1200            if let Some(cache) = unsafe { (&mut *caches).get(class.class_id) } {
1201                cache.push(lease, slot, buffer);
1202                return;
1203            }
1204        }
1205
1206        Self::push_slow(lease, slot, buffer);
1207    }
1208
1209    /// Returns a buffer to the current thread's local cache after the fast
1210    /// lookup misses.
1211    ///
1212    /// This is called when the fast TLS pointer is not initialized, or when
1213    /// that pointer exists but this size class has no local cache yet. It
1214    /// installs the fast TLS pointer after successfully accessing the owning
1215    /// TLS key, then initializes the size-class cache if needed.
1216    ///
1217    /// This is separate from [`Self::push`] so the steady-state return hot path
1218    /// only contains the initialized-cache lookup and local push.
1219    #[inline(never)]
1220    fn push_slow(lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
1221        // Returning a pooled buffer can happen from arbitrary Drop code,
1222        // including during thread-local destruction. If the local cache is
1223        // unavailable, fall back to the global freelist instead of panicking.
1224        match Self::TLS_SIZE_CLASS_CACHES
1225            .try_with(|caches| {
1226                let class = lease.class();
1227                let caches = caches.get();
1228
1229                // Publish the checked owner TLS address to the fast key.
1230                Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.set(caches));
1231
1232                // SAFETY: this TLS value is only ever accessed by the current thread.
1233                ptr::NonNull::from(unsafe {
1234                    (&mut *caches).get_or_init(
1235                        lease.token,
1236                        class.class_id,
1237                        class.thread_cache_capacity,
1238                    )
1239                })
1240            })
1241            .ok()
1242        {
1243            Some(mut cache) => {
1244                // SAFETY: `cache` points to this thread's initialized TLS cache.
1245                unsafe { cache.as_mut().push(lease, slot, buffer) };
1246            }
1247            None => lease.return_global(slot, buffer),
1248        }
1249    }
1250
1251    /// Takes a buffer from the current thread's local cache for the given
1252    /// size class, refilling from the global freelist if the cache is empty.
1253    ///
1254    /// The hot path uses only an already-initialized cache from the fast TLS
1255    /// pointer. On a local miss, the global freelist is queried once. The first
1256    /// claimed buffer is returned to the caller, and any additional claimed
1257    /// buffers are appended directly to the local cache.
1258    #[inline(always)]
1259    fn pop(class: &SizeClassHandle) -> Option<(PooledBuffer, SizeClassLease, u32)> {
1260        if class.thread_cache_capacity == 0 {
1261            return class
1262                .global
1263                .take()
1264                .map(|(slot, buffer)| (buffer, SizeClassLease::retain(class), slot));
1265        }
1266
1267        let caches = Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.get());
1268        if !caches.is_null() {
1269            // SAFETY: the fast pointer is set only from this thread's
1270            // `TLS_SIZE_CLASS_CACHES` value and cleared before that value
1271            // drops.
1272            if let Some(cache) = unsafe { (&mut *caches).get(class.class_id) } {
1273                return cache
1274                    .pop(class)
1275                    .map(|(entry, lease)| (entry.buffer, lease, entry.slot));
1276            }
1277        }
1278
1279        // Resolve the cache and fall back to the global freelist if
1280        // unavailable.
1281        let Some(mut cache) = Self::cache_slow(class) else {
1282            return class
1283                .global
1284                .take()
1285                .map(|(slot, buffer)| (buffer, SizeClassLease::retain(class), slot));
1286        };
1287
1288        // SAFETY: `cache` points to this thread's initialized TLS cache.
1289        unsafe { cache.as_mut() }
1290            .pop(class)
1291            .map(|(entry, lease)| (entry.buffer, lease, entry.slot))
1292    }
1293
1294    /// Resolves the local cache after the fast TLS or class-cache lookup
1295    /// misses.
1296    ///
1297    /// This is called when the fast TLS pointer is not initialized, or when
1298    /// that pointer exists but this size class has no local cache yet. It
1299    /// installs the fast TLS pointer after successfully accessing the owning
1300    /// TLS key, then initializes the size-class cache if needed.
1301    #[inline(never)]
1302    fn cache_slow(class: &SizeClassHandle) -> Option<ptr::NonNull<TlsSizeClassCache>> {
1303        // Allocation can happen from caller-owned TLS destructors during thread
1304        // teardown. Return `None` instead of panicking if the owning TLS key is
1305        // unavailable.
1306        Self::TLS_SIZE_CLASS_CACHES
1307            .try_with(|caches| {
1308                let caches = caches.get();
1309
1310                // Publish the checked owner TLS address to the fast key.
1311                Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.set(caches));
1312
1313                // SAFETY: this TLS value is only ever accessed by the current thread.
1314                ptr::NonNull::from(unsafe {
1315                    (&mut *caches).get_or_init(
1316                        class.token,
1317                        class.class_id,
1318                        class.thread_cache_capacity,
1319                    )
1320                })
1321            })
1322            .ok()
1323    }
1324}
1325
1326/// Internal allocation result for pooled allocations.
1327struct Allocation {
1328    buffer: PooledBuffer,
1329    is_new: bool,
1330    lease: SizeClassLease,
1331    slot: u32,
1332}
1333
1334/// Internal state of the buffer pool.
1335pub(crate) struct BufferPoolInner {
1336    config: BufferPoolConfig,
1337    classes: Vec<SizeClassHandle>,
1338    metrics: PoolMetrics,
1339}
1340
1341impl Drop for BufferPoolInner {
1342    fn drop(&mut self) {
1343        // The public pool is going away. Drain globally parked buffers while
1344        // the pool-owned class handles are still live. Pooled views and live
1345        // TLS cache entries own their own size-class references, if they return
1346        // later, they will park their buffer and release the reference that kept
1347        // the class alive.
1348        for class in &self.classes {
1349            class.global.drain();
1350        }
1351    }
1352}
1353
1354impl BufferPoolInner {
1355    /// Try to allocate a buffer from the given size class.
1356    ///
1357    /// Uses a three-tier strategy:
1358    /// 1. **Thread-local cache** (fast path): no atomics, no contention.
1359    /// 2. **Global freelist**: atomic pop, then batch-refill the local cache
1360    ///    when the local bin is large enough to amortize shared-queue traffic.
1361    /// 3. **New allocation**: reserve a slot in the global freelist, then
1362    ///    allocate from the heap.
1363    ///
1364    /// If `zero_on_new` is true, newly-created buffers are allocated with
1365    /// `alloc_zeroed`. Reused buffers are never re-zeroed here.
1366    #[inline(always)]
1367    fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
1368        let class = &self.classes[class_index];
1369
1370        // Reuse path: try the thread-local cache first, then the global
1371        // freelist with batch refill when the local cache is large enough.
1372        if let Some((buffer, lease, slot)) = BufferPoolThreadCache::pop(class) {
1373            return Some(Allocation {
1374                buffer,
1375                is_new: false,
1376                lease,
1377                slot,
1378            });
1379        }
1380
1381        // Slow path: create a new tracked buffer and update metrics.
1382        self.try_alloc_new(class, zero_on_new)
1383    }
1384
1385    /// Creates a new tracked buffer after the reuse path fails.
1386    ///
1387    /// This is separate from [`Self::try_alloc`] so the steady-state allocation
1388    /// path can inline the TLS hit without also carrying slot reservation,
1389    /// metrics, and heap-allocation code.
1390    #[inline(never)]
1391    fn try_alloc_new(&self, class: &SizeClassHandle, zeroed: bool) -> Option<Allocation> {
1392        let label = SizeClassLabel {
1393            size_class: class.size as u64,
1394        };
1395        let Some((slot, buffer, lease)) = class.try_create(zeroed) else {
1396            self.metrics.exhausted_total.get_or_create(&label).inc();
1397            return None;
1398        };
1399
1400        self.metrics.created.get_or_create(&label).inc();
1401        Some(Allocation {
1402            buffer,
1403            is_new: true,
1404            lease,
1405            slot,
1406        })
1407    }
1408}
1409
1410/// A pool of reusable, aligned buffers.
1411///
1412/// Buffers are organized into power-of-two size classes. When a buffer is
1413/// requested, the smallest size class that fits is used. Pooled buffers are
1414/// automatically returned when their final owning view is dropped.
1415///
1416/// # Alignment
1417///
1418/// Buffer alignment is guaranteed only at the base pointer (when `cursor == 0`).
1419/// After calling [`bytes::Buf::advance`], the pointer returned by `as_mut_ptr()` may
1420/// no longer be aligned. For direct I/O operations that require alignment,
1421/// do not advance the buffer before use.
1422#[derive(Clone)]
1423pub struct BufferPool {
1424    inner: Arc<BufferPoolInner>,
1425}
1426
1427impl std::fmt::Debug for BufferPool {
1428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1429        f.debug_struct("BufferPool")
1430            .field("config", &self.inner.config)
1431            .field("num_classes", &self.inner.classes.len())
1432            .finish()
1433    }
1434}
1435
1436/// Global allocator for [`SizeClass::class_id`].
1437///
1438/// `class_id` is the key used by [`TlsSizeClassCaches`]. It must be global, not
1439/// pool-local, because the same thread-local registry serves every
1440/// [`BufferPool`] touched by the thread. Without a global id, two different
1441/// pools could share a class index and accidentally share one local cache.
1442///
1443/// Ids are monotonic and never reused. Reuse would make stale per-thread cache
1444/// state ambiguous after a pool is dropped and a later pool creates a new size
1445/// class with the same id. Avoiding reuse means the hot path can index directly
1446/// without generation checks, at the cost of possible holes in each thread's
1447/// sparse registry.
1448///
1449/// Relaxed ordering is sufficient: the atomic operation is only used to assign
1450/// unique ids, not to publish any associated size-class state.
1451static NEXT_SIZE_CLASS_ID: AtomicUsize = AtomicUsize::new(0);
1452
1453impl BufferPool {
1454    /// Creates a new buffer pool with the given configuration.
1455    ///
1456    /// # Panics
1457    ///
1458    /// Panics if the configuration is invalid.
1459    pub(crate) fn new(config: BufferPoolConfig, registry: &mut impl Register) -> Self {
1460        config.validate();
1461        let metrics = PoolMetrics::new(registry);
1462        let num_classes =
1463            BufferPoolConfig::num_classes(config.min_size.get(), config.max_size.get());
1464        let mut classes = Vec::with_capacity(num_classes);
1465        let thread_cache_capacity = config.resolve_thread_cache_capacity();
1466        for i in 0..num_classes {
1467            let size = BufferPoolConfig::class_size(config.min_size.get(), i);
1468            let class_id = NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed);
1469            let class = SizeClassHandle::new(
1470                class_id,
1471                size,
1472                config.alignment.get(),
1473                config.max_per_class,
1474                config.parallelism,
1475                thread_cache_capacity,
1476                config.prefill,
1477            );
1478            classes.push(class);
1479        }
1480
1481        // Initialize created metrics after constructor prefill.
1482        if config.prefill {
1483            for class in &classes {
1484                let label = SizeClassLabel {
1485                    size_class: class.size as u64,
1486                };
1487                metrics
1488                    .created
1489                    .get_or_create(&label)
1490                    .set(config.max_per_class.get() as i64);
1491            }
1492        }
1493
1494        Self {
1495            inner: Arc::new(BufferPoolInner {
1496                config,
1497                classes,
1498                metrics,
1499            }),
1500        }
1501    }
1502
1503    /// Returns the size class index for a given size, or `None` if `size > max_size`.
1504    #[inline(always)]
1505    fn class_index(&self, size: usize) -> Option<usize> {
1506        let min_size = self.inner.config.min_size.get();
1507        let max_size = self.inner.config.max_size.get();
1508        if size > max_size {
1509            return None;
1510        }
1511        if size <= min_size {
1512            return Some(0);
1513        }
1514
1515        // Pool construction guarantees `min_size` and `max_size` are powers of
1516        // two. Since `min_size < size <= max_size`, `next_power_of_two()`
1517        // resolves to a valid class and its exponent must be greater than
1518        // `min_size`'s exponent. Use wrapping arithmetic to avoid a release
1519        // overflow-check branch in this hot helper.
1520        Some(
1521            size.next_power_of_two()
1522                .trailing_zeros()
1523                .wrapping_sub(min_size.trailing_zeros()) as usize,
1524        )
1525    }
1526
1527    /// Returns the size class index for `capacity`, recording oversized metrics on failure.
1528    #[inline]
1529    fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
1530        let class_index = self.class_index(capacity);
1531        if class_index.is_none() {
1532            self.inner.metrics.oversized_total.inc();
1533        }
1534        class_index
1535    }
1536
1537    /// Attempts to allocate a buffer without falling back on pool miss.
1538    ///
1539    /// Unlike [`Self::alloc`], this method does not fall back to untracked
1540    /// allocation on exhaustion or oversized requests. Requests smaller than
1541    /// [`BufferPoolConfig::pool_min_size`] intentionally bypass pooling and
1542    /// return an untracked aligned allocation instead.
1543    ///
1544    /// The returned buffer has `len() == 0` and `capacity() >= capacity`.
1545    ///
1546    /// # Initialization
1547    ///
1548    /// The returned buffer contains **uninitialized memory**. Do not read from
1549    /// it until data has been written.
1550    ///
1551    /// # Errors
1552    ///
1553    /// - [`PoolError::Oversized`]: `capacity` exceeds `max_size`
1554    /// - [`PoolError::Exhausted`]: pool exhausted for the required size class
1555    #[inline(always)]
1556    pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
1557        if capacity < self.inner.config.pool_min_size {
1558            let size = capacity.max(1);
1559            return Ok(IoBufMut::with_alignment(size, self.inner.config.alignment));
1560        }
1561
1562        let class_index = self
1563            .class_index_or_record_oversized(capacity)
1564            .ok_or(PoolError::Oversized)?;
1565
1566        let buffer = self
1567            .inner
1568            .try_alloc(class_index, false)
1569            .map(|allocation| {
1570                PooledBufMut::new(allocation.buffer, allocation.lease, allocation.slot)
1571            })
1572            .ok_or(PoolError::Exhausted)?;
1573        Ok(IoBufMut::from_pooled(buffer))
1574    }
1575
1576    /// Allocates a buffer with capacity for at least `capacity` bytes.
1577    ///
1578    /// The returned buffer has `len() == 0` and `capacity() >= capacity`,
1579    /// matching the semantics of [`IoBufMut::with_capacity`] and
1580    /// [`bytes::BytesMut::with_capacity`]. Use [`bytes::BufMut::put_slice`] or
1581    /// other [`bytes::BufMut`] methods to write data to the buffer.
1582    ///
1583    /// If the pool can provide a buffer (capacity within limits and pool not
1584    /// exhausted), this returns a pooled buffer that will be returned to the
1585    /// pool when dropped. Requests smaller than [`BufferPoolConfig::pool_min_size`]
1586    /// bypass pooling and return an untracked aligned allocation. Oversized or
1587    /// exhausted requests also fall back to an untracked aligned heap allocation
1588    /// that is deallocated when dropped.
1589    ///
1590    /// Use [`Self::try_alloc`] if eligible requests must fail instead of falling
1591    /// back to direct allocation.
1592    ///
1593    /// # Initialization
1594    ///
1595    /// The returned buffer contains **uninitialized memory**. Do not read from
1596    /// it until data has been written.
1597    pub fn alloc(&self, capacity: usize) -> IoBufMut {
1598        self.try_alloc(capacity).unwrap_or_else(|_| {
1599            let size = capacity.max(self.inner.config.min_size.get());
1600            IoBufMut::with_alignment(size, self.inner.config.alignment)
1601        })
1602    }
1603
1604    /// Allocates a buffer and sets its readable length to `len` without
1605    /// initializing bytes.
1606    ///
1607    /// Equivalent to [`Self::alloc`] followed by [`IoBufMut::set_len`].
1608    ///
1609    /// # Safety
1610    ///
1611    /// Caller must ensure all bytes are initialized before any read operation.
1612    pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
1613        let mut buf = self.alloc(len);
1614        // SAFETY: guaranteed by caller.
1615        unsafe { buf.set_len(len) };
1616        buf
1617    }
1618
1619    /// Attempts to allocate a zero-initialized buffer without falling back on
1620    /// pool miss.
1621    ///
1622    /// Unlike [`Self::alloc_zeroed`], this method does not fall back to
1623    /// untracked allocation on exhaustion or oversized requests. Requests
1624    /// smaller than [`BufferPoolConfig::pool_min_size`] intentionally bypass
1625    /// pooling and return an untracked aligned allocation instead.
1626    ///
1627    /// The returned buffer has `len() == len` and `capacity() >= len`.
1628    ///
1629    /// # Initialization
1630    ///
1631    /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
1632    /// may be uninitialized.
1633    ///
1634    /// # Errors
1635    ///
1636    /// - [`PoolError::Oversized`]: `len` exceeds `max_size`
1637    /// - [`PoolError::Exhausted`]: pool exhausted for the required size class
1638    pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
1639        if len < self.inner.config.pool_min_size {
1640            let size = len.max(1);
1641            let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
1642            buf.truncate(len);
1643            return Ok(buf);
1644        }
1645
1646        let class_index = self
1647            .class_index_or_record_oversized(len)
1648            .ok_or(PoolError::Oversized)?;
1649        let allocation = self
1650            .inner
1651            .try_alloc(class_index, true)
1652            .ok_or(PoolError::Exhausted)?;
1653
1654        let mut buf = IoBufMut::from_pooled(PooledBufMut::new(
1655            allocation.buffer,
1656            allocation.lease,
1657            allocation.slot,
1658        ));
1659        if allocation.is_new {
1660            // SAFETY: buffer was allocated with alloc_zeroed, so bytes in 0..len are initialized.
1661            unsafe { buf.set_len(len) };
1662        } else {
1663            // Reused buffers may contain old bytes, re-zero requested readable range.
1664            // SAFETY: `as_mut_ptr()` is valid for writes up to `capacity() >= len` bytes.
1665            unsafe {
1666                std::ptr::write_bytes(buf.as_mut_ptr(), 0, len);
1667                buf.set_len(len);
1668            }
1669        }
1670        Ok(buf)
1671    }
1672
1673    /// Allocates a zero-initialized buffer with readable length `len`.
1674    ///
1675    /// The returned buffer has `len() == len` and `capacity() >= len`.
1676    ///
1677    /// If the pool can provide a buffer (len within limits and pool not
1678    /// exhausted), this returns a pooled buffer that will be returned to the
1679    /// pool when dropped. Requests smaller than [`BufferPoolConfig::pool_min_size`]
1680    /// bypass pooling and return an untracked aligned allocation. Oversized or
1681    /// exhausted requests also fall back to an untracked aligned heap allocation
1682    /// that is deallocated when dropped.
1683    ///
1684    /// Use this for read APIs that require an initialized `&mut [u8]`. This
1685    /// avoids `unsafe set_len` at callsites.
1686    ///
1687    /// Use [`Self::try_alloc_zeroed`] if eligible requests must fail instead of
1688    /// falling back to direct allocation.
1689    ///
1690    /// # Initialization
1691    ///
1692    /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
1693    /// may be uninitialized.
1694    pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
1695        self.try_alloc_zeroed(len).unwrap_or_else(|_| {
1696            // Pool exhausted or oversized: allocate untracked zeroed memory.
1697            let size = len.max(self.inner.config.min_size.get());
1698            let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
1699            buf.truncate(len);
1700            buf
1701        })
1702    }
1703
1704    /// Returns the pool configuration.
1705    pub fn config(&self) -> &BufferPoolConfig {
1706        &self.inner.config
1707    }
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712    use super::*;
1713    use crate::{
1714        iobuf::{cache_line_size, freelist, IoBuf},
1715        telemetry::metrics::Registry,
1716    };
1717    use bytes::{Buf, BufMut};
1718    use commonware_utils::NZU32;
1719    use std::{
1720        sync::{mpsc, Arc},
1721        thread,
1722    };
1723
1724    fn test_size_class(size: usize, alignment: usize) -> SizeClassHandle {
1725        SizeClassHandle::new(
1726            NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
1727            size,
1728            alignment,
1729            NZU32!(8),
1730            NZUsize!(4),
1731            4,
1732            false,
1733        )
1734    }
1735
1736    fn test_pool(config: BufferPoolConfig) -> BufferPool {
1737        let mut registry = Registry::default();
1738        BufferPool::new(config, &mut registry)
1739    }
1740
1741    /// Creates a test config with page alignment.
1742    fn test_config(min_size: usize, max_size: usize, max_per_class: u32) -> BufferPoolConfig {
1743        BufferPoolConfig {
1744            pool_min_size: 0,
1745            min_size: NZUsize!(min_size),
1746            max_size: NZUsize!(max_size),
1747            max_per_class: NZU32!(max_per_class),
1748            parallelism: NZUsize!(1),
1749            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
1750            prefill: false,
1751            alignment: NZUsize!(page_size()),
1752        }
1753    }
1754
1755    /// Returns the current strong count without changing it after the helper
1756    /// returns.
1757    fn size_class_strong_count(class: &SizeClassHandle) -> usize {
1758        // SAFETY: the borrowed handle owns one strong reference for `class.token`
1759        // for the duration of this call.
1760        unsafe { class.token.retain() };
1761        // SAFETY: the increment above created the strong reference consumed by
1762        // this temporary Arc.
1763        let arc = unsafe { Arc::from_raw(class.token.ptr.as_ptr()) };
1764        Arc::strong_count(&arc) - 1
1765    }
1766
1767    /// Helper to get the number of caller-owned tracked buffers for a size class.
1768    ///
1769    /// With TLS enabled, tracked buffers can be free in either the shared
1770    /// freelist or the current thread's local cache.
1771    fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1772        let class_index = pool.class_index(size).unwrap();
1773        let class = &pool.inner.classes[class_index];
1774        get_global_created(class) - get_global_len(class) - get_local_len(class)
1775    }
1776
1777    /// Helper to get the number of free buffers visible to the current thread.
1778    fn get_available(pool: &BufferPool, size: usize) -> i64 {
1779        let class_index = pool.class_index(size).unwrap();
1780        let class = &pool.inner.classes[class_index];
1781        (get_global_len(class) + get_local_len(class)) as i64
1782    }
1783
1784    /// Helper to get the number of free buffers parked in the global freelist.
1785    fn get_global_len(class: &SizeClass) -> usize {
1786        freelist::tests::len(&class.global)
1787    }
1788
1789    /// Helper to get the number of buffers created by the global freelist.
1790    fn get_global_created(class: &SizeClass) -> usize {
1791        freelist::tests::created(&class.global)
1792    }
1793
1794    /// Helper to get the number of free buffers parked in the current thread's
1795    /// local cache for a size class.
1796    fn get_local_len(class: &SizeClass) -> usize {
1797        BufferPoolThreadCache::TLS_SIZE_CLASS_CACHES.with(|caches| {
1798            // SAFETY: this TLS value is only ever accessed by the current thread.
1799            let caches = unsafe { &*caches.get() };
1800            caches
1801                .bins
1802                .get(class.class_id)
1803                .and_then(Option::as_ref)
1804                .map_or(0, |cache| cache.len)
1805        })
1806    }
1807
1808    #[test]
1809    fn test_page_size() {
1810        let size = page_size();
1811        assert!(size >= 4096);
1812        assert!(size.is_power_of_two());
1813    }
1814
1815    #[test]
1816    fn test_config_validation() {
1817        let page = page_size();
1818        let config = test_config(page, page * 4, 10);
1819        config.validate();
1820    }
1821
1822    #[test]
1823    #[should_panic(expected = "thread_cache_capacity (11) must be <= max_per_class (10)")]
1824    fn test_config_invalid_thread_cache_capacity() {
1825        let page = page_size();
1826        let config = test_config(page, page * 4, 10).with_thread_cache_capacity(NZUsize!(11));
1827        config.validate();
1828    }
1829
1830    #[test]
1831    #[should_panic(expected = "min_size must be a power of two")]
1832    fn test_config_invalid_min_size() {
1833        let config = BufferPoolConfig {
1834            pool_min_size: 0,
1835            min_size: NZUsize!(3000),
1836            max_size: NZUsize!(8192),
1837            max_per_class: NZU32!(10),
1838            parallelism: NZUsize!(1),
1839            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
1840            prefill: false,
1841            alignment: NZUsize!(page_size()),
1842        };
1843        config.validate();
1844    }
1845
1846    #[test]
1847    fn test_pool_class_index() {
1848        let page = page_size();
1849        let pool = test_pool(test_config(page, page * 8, 10));
1850
1851        // Classes: page, page*2, page*4, page*8
1852        assert_eq!(pool.inner.classes.len(), 4);
1853
1854        assert_eq!(pool.class_index(1), Some(0));
1855        assert_eq!(pool.class_index(page), Some(0));
1856        assert_eq!(pool.class_index(page + 1), Some(1));
1857        assert_eq!(pool.class_index(page * 2), Some(1));
1858        assert_eq!(pool.class_index(page * 4 + 1), Some(3));
1859        assert_eq!(pool.class_index(page * 8 - 1), Some(3));
1860        assert_eq!(pool.class_index(page * 8), Some(3));
1861        assert_eq!(pool.class_index(page * 8 + 1), None);
1862    }
1863
1864    #[test]
1865    fn test_pool_alloc_and_return() {
1866        let page = page_size();
1867        let pool = test_pool(test_config(page, page * 4, 2));
1868
1869        // Allocate a buffer - returns buffer with len=0, capacity >= requested
1870        let buf = pool.try_alloc(page).unwrap();
1871        assert!(buf.capacity() >= page);
1872        assert_eq!(buf.len(), 0);
1873
1874        // Drop returns to pool
1875        drop(buf);
1876
1877        // Can allocate again
1878        let buf2 = pool.try_alloc(page).unwrap();
1879        assert!(buf2.capacity() >= page);
1880        assert_eq!(buf2.len(), 0);
1881    }
1882
1883    #[test]
1884    fn test_alloc_len_sets_len() {
1885        let page = page_size();
1886        let pool = test_pool(test_config(page, page * 4, 2));
1887
1888        // SAFETY: we immediately initialize all bytes before reading.
1889        let mut buf = unsafe { pool.alloc_len(100) };
1890        assert_eq!(buf.len(), 100);
1891        buf.as_mut().fill(0xAB);
1892        let frozen = buf.freeze();
1893        assert_eq!(frozen.as_ref(), &[0xAB; 100]);
1894    }
1895
1896    #[test]
1897    fn test_alloc_zeroed_sets_len_and_zeros() {
1898        let page = page_size();
1899        let pool = test_pool(test_config(page, page * 4, 2));
1900
1901        let buf = pool.alloc_zeroed(100);
1902        assert_eq!(buf.len(), 100);
1903        assert!(buf.as_ref().iter().all(|&b| b == 0));
1904    }
1905
1906    #[test]
1907    fn test_try_alloc_zeroed_sets_len_and_zeros() {
1908        let page = page_size();
1909        let pool = test_pool(test_config(page, page * 4, 2));
1910
1911        let buf = pool.try_alloc_zeroed(page).unwrap();
1912        assert!(buf.is_pooled());
1913        assert_eq!(buf.len(), page);
1914        assert!(buf.as_ref().iter().all(|&b| b == 0));
1915    }
1916
1917    #[test]
1918    fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
1919        let page = page_size();
1920        let pool = test_pool(test_config(page, page, 1));
1921
1922        // Exhaust pooled capacity for this class.
1923        let _pooled = pool.try_alloc(page).unwrap();
1924
1925        let buf = pool.alloc_zeroed(100);
1926        assert!(!buf.is_pooled());
1927        assert_eq!(buf.len(), 100);
1928        assert!(buf.as_ref().iter().all(|&b| b == 0));
1929    }
1930
1931    #[test]
1932    fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
1933        let page = page_size();
1934        let pool = test_pool(test_config(page, page, 1));
1935
1936        let mut first = pool.alloc_zeroed(page);
1937        assert!(first.is_pooled());
1938        assert!(first.as_ref().iter().all(|&b| b == 0));
1939
1940        // Dirty the buffer before returning it to the pool.
1941        first.as_mut().fill(0xAB);
1942        drop(first);
1943
1944        let second = pool.alloc_zeroed(page);
1945        assert!(second.is_pooled());
1946        assert_eq!(second.len(), page);
1947        assert!(second.as_ref().iter().all(|&b| b == 0));
1948    }
1949
1950    #[test]
1951    fn test_requests_smaller_than_pool_min_size_bypass_pool() {
1952        let pool = test_pool(BufferPoolConfig {
1953            pool_min_size: 512,
1954            min_size: NZUsize!(512),
1955            max_size: NZUsize!(1024),
1956            max_per_class: NZU32!(2),
1957            parallelism: NZUsize!(1),
1958            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
1959            prefill: false,
1960            alignment: NZUsize!(128),
1961        });
1962
1963        let buf = pool.try_alloc(200).unwrap();
1964        assert!(!buf.is_pooled());
1965        assert_eq!(buf.capacity(), 200);
1966
1967        let zeroed = pool.try_alloc_zeroed(200).unwrap();
1968        assert!(!zeroed.is_pooled());
1969        assert_eq!(zeroed.len(), 200);
1970        assert!(zeroed.as_ref().iter().all(|&b| b == 0));
1971
1972        let pooled = pool.try_alloc(512).unwrap();
1973        assert!(pooled.is_pooled());
1974        assert_eq!(pooled.capacity(), 512);
1975    }
1976
1977    #[test]
1978    fn test_pool_size_classes() {
1979        let page = page_size();
1980        let pool = test_pool(test_config(page, page * 4, 10));
1981
1982        // Small request gets smallest class
1983        let buf1 = pool.try_alloc(page).unwrap();
1984        assert_eq!(buf1.capacity(), page);
1985
1986        // Larger request gets appropriate class
1987        let buf2 = pool.try_alloc(page + 1).unwrap();
1988        assert_eq!(buf2.capacity(), page * 2);
1989
1990        let buf3 = pool.try_alloc(page * 3).unwrap();
1991        assert_eq!(buf3.capacity(), page * 4);
1992    }
1993
1994    #[test]
1995    fn test_prefill() {
1996        let page = NZUsize!(page_size());
1997        let pool = test_pool(BufferPoolConfig {
1998            pool_min_size: 0,
1999            min_size: page,
2000            max_size: page,
2001            max_per_class: NZU32!(5),
2002            parallelism: NZUsize!(1),
2003            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
2004            prefill: true,
2005            alignment: page,
2006        });
2007
2008        // Should be able to allocate max_per_class buffers immediately
2009        let mut bufs = Vec::new();
2010        for _ in 0..5 {
2011            bufs.push(pool.try_alloc(page.get()).expect("alloc should succeed"));
2012        }
2013
2014        // Next allocation should fail
2015        assert!(pool.try_alloc(page.get()).is_err());
2016    }
2017
2018    #[test]
2019    fn test_config_for_network() {
2020        let config = BufferPoolConfig::for_network();
2021        config.validate();
2022        assert_eq!(config.pool_min_size, 0);
2023        assert_eq!(config.min_size.get(), 1024);
2024        assert_eq!(config.max_size.get(), 128 * 1024);
2025        assert_eq!(config.max_per_class.get(), 4096);
2026        assert_eq!(config.parallelism, NZUsize!(1));
2027        assert_eq!(
2028            config.thread_cache_config,
2029            BufferPoolThreadCacheConfig::Enabled(None)
2030        );
2031        assert!(!config.prefill);
2032        assert_eq!(config.alignment.get(), 1);
2033    }
2034
2035    #[test]
2036    fn test_config_for_storage() {
2037        let config = BufferPoolConfig::for_storage();
2038        config.validate();
2039        assert_eq!(config.pool_min_size, 0);
2040        assert_eq!(config.min_size.get(), page_size());
2041        assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
2042        assert_eq!(config.max_per_class.get(), 64);
2043        assert_eq!(config.parallelism, NZUsize!(1));
2044        assert_eq!(
2045            config.thread_cache_config,
2046            BufferPoolThreadCacheConfig::Enabled(None)
2047        );
2048        assert!(!config.prefill);
2049        assert_eq!(config.alignment.get(), 1);
2050    }
2051
2052    #[test]
2053    fn test_storage_config_supports_default_allocations() {
2054        // The storage preset's max_size (8 MB) should be allocatable out of the box.
2055        let pool = test_pool(BufferPoolConfig::for_storage());
2056
2057        let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
2058        assert_eq!(buf.capacity(), 8 * 1024 * 1024);
2059    }
2060
2061    #[test]
2062    fn test_config_builders() {
2063        let page = NZUsize!(page_size());
2064        let config = BufferPoolConfig::for_storage()
2065            .with_pool_min_size(1024)
2066            .with_max_per_class(NZU32!(64))
2067            .with_parallelism(NZUsize!(4))
2068            .with_thread_cache_capacity(NZUsize!(8))
2069            .with_prefill(true)
2070            .with_min_size(page)
2071            .with_max_size(NZUsize!(128 * 1024));
2072
2073        config.validate();
2074        assert_eq!(config.pool_min_size, 1024);
2075        assert_eq!(config.min_size, page);
2076        assert_eq!(config.max_size.get(), 128 * 1024);
2077        assert_eq!(config.max_per_class.get(), 64);
2078        assert_eq!(config.parallelism, NZUsize!(4));
2079        assert_eq!(
2080            config.thread_cache_config,
2081            BufferPoolThreadCacheConfig::Enabled(Some(NZUsize!(8)))
2082        );
2083        assert!(config.prefill);
2084        assert_eq!(config.alignment.get(), 1);
2085
2086        // Alignment can be tuned explicitly as long as min_size is also adjusted.
2087        let aligned = BufferPoolConfig::for_network()
2088            .with_pool_min_size(256)
2089            .with_parallelism(NZUsize!(4))
2090            .with_alignment(NZUsize!(256))
2091            .with_min_size(NZUsize!(256));
2092        aligned.validate();
2093        assert_eq!(aligned.parallelism, NZUsize!(4));
2094        assert_eq!(
2095            aligned.thread_cache_config,
2096            BufferPoolThreadCacheConfig::Enabled(None)
2097        );
2098        assert_eq!(aligned.alignment.get(), 256);
2099        assert_eq!(aligned.min_size.get(), 256);
2100    }
2101
2102    #[test]
2103    fn test_parallelism_policy_resolves_thread_cache_capacity() {
2104        let page = page_size();
2105
2106        // Half the class budget is divided across expected threads.
2107        let pool = test_pool(test_config(page, page, 64).with_parallelism(NZUsize!(8)));
2108        let class_index = pool.class_index(page).unwrap();
2109        assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 4);
2110
2111        // Large classes scale past the previous eight-slot cap.
2112        let pool = test_pool(test_config(page, page, 4096).with_parallelism(NZUsize!(8)));
2113        let class_index = pool.class_index(page).unwrap();
2114        assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 256);
2115    }
2116
2117    #[test]
2118    fn test_auto_thread_cache_disables_when_parallelism_exceeds_budget() {
2119        let page = page_size();
2120
2121        // With only two buffers and eight expected threads, the auto policy's
2122        // per-thread share is zero: 2 / (2 * min(8, 2)) == 0. In that case the
2123        // pool should disable TLS instead of forcing every thread to retain at
2124        // least one buffer.
2125        let pool = test_pool(test_config(page, page, 2).with_parallelism(NZUsize!(8)));
2126        let class_index = pool.class_index(page).unwrap();
2127        let class = &pool.inner.classes[class_index];
2128        assert_eq!(class.thread_cache_capacity, 0);
2129
2130        // Exhaust the size class so the only way the main thread can allocate
2131        // again is if the worker's returned buffers are globally visible.
2132        let first = pool.try_alloc(page).expect("first tracked allocation");
2133        let second = pool.try_alloc(page).expect("second tracked allocation");
2134
2135        let pool_for_thread = pool.clone();
2136        let (returned_tx, returned_rx) = mpsc::channel();
2137        let (release_tx, release_rx) = mpsc::channel();
2138        let handle = thread::spawn(move || {
2139            // Return both buffers from another thread. The thread stays alive
2140            // after the drops, so any TLS entries it retained would remain
2141            // invisible to the main thread until `release_rx` fires.
2142            drop(first);
2143            drop(second);
2144            returned_tx.send(()).expect("signal returned buffers");
2145            release_rx.recv().expect("release worker");
2146            drop(pool_for_thread);
2147        });
2148
2149        returned_rx.recv().expect("wait for returned buffers");
2150
2151        // Both allocations must succeed while the worker thread is still
2152        // alive. Before auto capacity could resolve to zero, one returned
2153        // buffer could remain stranded in the worker's TLS cache and this
2154        // second allocation would report exhaustion.
2155        let _first = pool.try_alloc(page).expect("first global reuse");
2156        let _second = pool.try_alloc(page).expect("second global reuse");
2157
2158        release_tx.send(()).expect("release worker");
2159        handle.join().expect("worker should not panic");
2160    }
2161
2162    #[test]
2163    fn test_parallelism_policy_resolves_freelist_stripes() {
2164        let page = page_size();
2165        let pool = test_pool(test_config(page, page, 64).with_parallelism(NZUsize!(16)));
2166
2167        let class_index = pool.class_index(page).unwrap();
2168        assert_eq!(
2169            freelist::tests::num_words(&pool.inner.classes[class_index].global),
2170            16
2171        );
2172
2173        // When expected parallelism rounds above capacity, the freelist caps
2174        // stripes so every word can contain at least one slot.
2175        let pool = test_pool(test_config(page, page, 12).with_parallelism(NZUsize!(9)));
2176
2177        let class_index = pool.class_index(page).unwrap();
2178        assert_eq!(
2179            freelist::tests::num_words(&pool.inner.classes[class_index].global),
2180            8
2181        );
2182
2183        // Disabling thread-local caches should not change global striping.
2184        let pool = test_pool(
2185            test_config(page, page, 64)
2186                .with_parallelism(NZUsize!(16))
2187                .with_thread_cache_disabled(),
2188        );
2189
2190        let class_index = pool.class_index(page).unwrap();
2191        assert_eq!(
2192            freelist::tests::num_words(&pool.inner.classes[class_index].global),
2193            16
2194        );
2195    }
2196
2197    #[test]
2198    fn test_fixed_thread_cache_capacity_overrides_auto_capacity() {
2199        let page = page_size();
2200        let pool = test_pool(
2201            test_config(page, page, 64)
2202                .with_parallelism(NZUsize!(8))
2203                .with_thread_cache_capacity(NZUsize!(7)),
2204        );
2205        let class_index = pool.class_index(page).unwrap();
2206
2207        // Fixed capacity should bypass the derived parallelism heuristic.
2208        assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 7);
2209        assert_eq!(
2210            freelist::tests::num_words(&pool.inner.classes[class_index].global),
2211            8
2212        );
2213    }
2214
2215    #[test]
2216    fn test_disabled_thread_cache_does_not_retain_buffers_locally() {
2217        let page = page_size();
2218        let pool = test_pool(test_config(page, page, 2).with_thread_cache_disabled());
2219        let class_index = pool.class_index(page).unwrap();
2220        let class = &pool.inner.classes[class_index];
2221
2222        let tracked = pool.try_alloc(page).expect("tracked allocation");
2223        drop(tracked);
2224
2225        // Disabled thread caching still routes returns through the global
2226        // freelist, but should never retain buffers in the current thread.
2227        assert_eq!(class.thread_cache_capacity, 0);
2228        assert_eq!(get_local_len(class), 0);
2229        assert_eq!(get_global_len(class), 1);
2230    }
2231
2232    #[test]
2233    fn test_thread_cache_flush_moves_local_entries_to_global() {
2234        let page = page_size();
2235        let pool =
2236            test_pool(test_config(page, page * 2, 8).with_thread_cache_capacity(NZUsize!(4)));
2237
2238        // Use two distinct size classes so the test exercises the whole TLS
2239        // registry, not just a single per-class cache entry.
2240        let small_index = pool.class_index(page).unwrap();
2241        let large_index = pool.class_index(page + 1).unwrap();
2242        let small_class = &pool.inner.classes[small_index];
2243        let large_class = &pool.inner.classes[large_index];
2244
2245        // Return one buffer from each class to the current thread. With local
2246        // caching enabled, both drops should stay in the thread-local bins.
2247        let small = pool.try_alloc(page).expect("tracked allocation");
2248        let large = pool.try_alloc(page + 1).expect("tracked allocation");
2249        drop(small);
2250        drop(large);
2251
2252        // Before flushing, both buffers are only visible via the current
2253        // thread's local caches, nothing has been pushed to the global queues.
2254        assert_eq!(get_local_len(small_class), 1);
2255        assert_eq!(get_local_len(large_class), 1);
2256        assert_eq!(get_global_len(small_class), 0);
2257        assert_eq!(get_global_len(large_class), 0);
2258
2259        // Flushing should walk the entire TLS registry, drop every local cache,
2260        // and let each cache's drop implementation return its buffers to the
2261        // shared global freelists.
2262        BufferPoolThreadCache::flush();
2263
2264        // After flush, the current thread retains nothing locally and both
2265        // buffers are once again visible through their class-global queues.
2266        assert_eq!(get_local_len(small_class), 0);
2267        assert_eq!(get_local_len(large_class), 0);
2268        assert_eq!(get_global_len(small_class), 1);
2269        assert_eq!(get_global_len(large_class), 1);
2270    }
2271
2272    #[test]
2273    fn test_config_with_budget_bytes() {
2274        // Classes: 4, 8, 16 (sum = 28). Budget 280 => max_per_class = 10.
2275        let config = BufferPoolConfig {
2276            pool_min_size: 0,
2277            min_size: NZUsize!(4),
2278            max_size: NZUsize!(16),
2279            max_per_class: NZU32!(1),
2280            parallelism: NZUsize!(1),
2281            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
2282            prefill: false,
2283            alignment: NZUsize!(4),
2284        }
2285        .with_budget_bytes(NZUsize!(280));
2286        assert_eq!(config.max_per_class.get(), 10);
2287
2288        // Budget 10 rounds up to one buffer per class.
2289        let small_budget = BufferPoolConfig {
2290            pool_min_size: 0,
2291            min_size: NZUsize!(4),
2292            max_size: NZUsize!(16),
2293            max_per_class: NZU32!(1),
2294            parallelism: NZUsize!(1),
2295            thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
2296            prefill: false,
2297            alignment: NZUsize!(4),
2298        }
2299        .with_budget_bytes(NZUsize!(10));
2300        assert_eq!(small_budget.max_per_class.get(), 1);
2301    }
2302
2303    #[test]
2304    fn test_pool_error_display() {
2305        assert_eq!(
2306            PoolError::Oversized.to_string(),
2307            "requested capacity exceeds maximum buffer size"
2308        );
2309        assert_eq!(
2310            PoolError::Exhausted.to_string(),
2311            "pool exhausted for required size class"
2312        );
2313    }
2314
2315    #[test]
2316    fn test_pool_debug_and_config_accessor() {
2317        // Debug formatting and config accessor should be consistent.
2318        let page = page_size();
2319        let pool = test_pool(test_config(page, page, 2));
2320
2321        let debug = format!("{pool:?}");
2322        assert!(debug.contains("BufferPool"));
2323        assert!(debug.contains("num_classes"));
2324        assert_eq!(pool.config().min_size.get(), page);
2325    }
2326
2327    #[test]
2328    fn test_return_buffer_local_overflow_spills_to_global() {
2329        let page = page_size();
2330        let pool = test_pool(test_config(page, page, 2));
2331        let class_index = pool
2332            .class_index(page)
2333            .expect("class exists for page-sized buffer");
2334
2335        let tracked1 = pool.try_alloc(page).expect("first tracked allocation");
2336        let tracked2 = pool.try_alloc(page).expect("second tracked allocation");
2337
2338        // The first return should stay entirely in the current thread's local cache.
2339        drop(tracked1);
2340        assert_eq!(get_global_len(&pool.inner.classes[class_index]), 0);
2341        assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
2342
2343        // Returning another tracked buffer should route overflow to the global
2344        // freelist and retain one in the current thread's local bin.
2345        drop(tracked2);
2346        assert_eq!(get_global_len(&pool.inner.classes[class_index]), 1);
2347        assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
2348        assert_eq!(get_available(&pool, page), 2);
2349    }
2350
2351    #[test]
2352    fn test_small_local_cache_overflow_preserves_locality() {
2353        let page = page_size();
2354        let pool = test_pool(test_config(page, page, 2));
2355
2356        // With `thread_cache_capacity == 1`, the first return stays local and the
2357        // second overflows directly to global instead of spilling the hot
2358        // local entry through the shared queue.
2359        let mut tracked1 = pool.try_alloc(page).expect("first tracked allocation");
2360        let ptr1 = tracked1.as_mut_ptr();
2361        let mut tracked2 = pool.try_alloc(page).expect("second tracked allocation");
2362        let ptr2 = tracked2.as_mut_ptr();
2363
2364        drop(tracked1);
2365        drop(tracked2);
2366
2367        let mut reused_local = pool.try_alloc(page).expect("reuse from local cache");
2368        assert_eq!(reused_local.as_mut_ptr(), ptr1);
2369
2370        let mut reused_global = pool.try_alloc(page).expect("reuse from global freelist");
2371        assert_eq!(reused_global.as_mut_ptr(), ptr2);
2372    }
2373
2374    #[test]
2375    fn test_large_local_cache_batches_overflow_and_refill() {
2376        let page = page_size();
2377        let threads = std::thread::available_parallelism().map_or(1, NonZeroUsize::get);
2378        let max_per_class =
2379            u32::try_from(threads * 8).expect("test capacity must fit in u32 slot ids");
2380        let pool = test_pool(test_config(page, page, max_per_class));
2381        let class_index = pool
2382            .class_index(page)
2383            .expect("class exists for page-sized buffer");
2384        let class = &pool.inner.classes[class_index];
2385
2386        assert!(class.thread_cache_capacity >= MIN_TLS_BATCH_CAPACITY);
2387
2388        // Drop enough distinct pooled buffers to force an overflow from a
2389        // full local cache. Large bins should spill half the entries to global
2390        // and keep the remainder local for fast same-thread reuse.
2391        let mut bufs = Vec::new();
2392        for _ in 0..class.thread_cache_capacity + 1 {
2393            bufs.push(pool.try_alloc(page).expect("tracked allocation"));
2394        }
2395        for buf in bufs {
2396            drop(buf);
2397        }
2398
2399        assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 + 1);
2400        assert_eq!(get_global_len(class), class.thread_cache_capacity / 2);
2401
2402        // Drain the local half, then hit global once. That global take should
2403        // batch-refill the local cache back up to the configured target.
2404        let mut reused = Vec::new();
2405        for _ in 0..class.thread_cache_capacity / 2 + 1 {
2406            reused.push(pool.try_alloc(page).expect("local reuse"));
2407        }
2408        assert_eq!(get_local_len(class), 0);
2409        assert_eq!(get_global_len(class), class.thread_cache_capacity / 2);
2410
2411        let _global = pool.try_alloc(page).expect("global reuse with refill");
2412        assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 - 1);
2413        assert_eq!(get_global_len(class), 0);
2414    }
2415
2416    #[test]
2417    fn test_global_batch_alloc_stops_when_global_runs_empty() {
2418        let class = test_size_class(64, 64);
2419        let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2420
2421        // A short global freelist should return the allocation and stop
2422        // without filling the local cache to its batch target.
2423        class.global.put(slot, buffer);
2424        let (buffer, lease, slot) = BufferPoolThreadCache::pop(&class).expect("global allocation");
2425
2426        assert_eq!(get_local_len(&class), 0);
2427        assert_eq!(get_global_len(&class), 0);
2428
2429        // Return the manually popped entry so the freelist owns and deallocates
2430        // the buffer at test teardown.
2431        lease.return_global(slot, buffer);
2432    }
2433
2434    #[test]
2435    fn test_size_class_leases_use_raw_arc_tokens_across_cache_paths() {
2436        let class = test_size_class(64, 64);
2437        let mut cache = TlsSizeClassCache::new(class.token, MIN_TLS_BATCH_CAPACITY);
2438        assert_eq!(size_class_strong_count(&class), 1);
2439
2440        let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2441        let lease = SizeClassLease::retain(&class);
2442        assert_eq!(size_class_strong_count(&class), 2);
2443
2444        // Moving a pooled-buffer lease into the local cache banks the same strong
2445        // reference, it should not clone the class.
2446        cache.push(lease, slot, buffer);
2447        assert_eq!(size_class_strong_count(&class), 2);
2448
2449        let (entry, lease) = cache.pop(&class).expect("local cache pop");
2450        assert_eq!(size_class_strong_count(&class), 2);
2451        lease.return_global(entry.slot, entry.buffer);
2452        assert_eq!(size_class_strong_count(&class), 1);
2453
2454        for _ in 0..2 {
2455            let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2456            class.global.put(slot, buffer);
2457        }
2458
2459        let (entry, lease) = cache.pop(&class).expect("global refill");
2460        assert_eq!(size_class_strong_count(&class), 3);
2461
2462        lease.return_global(entry.slot, entry.buffer);
2463        assert_eq!(size_class_strong_count(&class), 2);
2464
2465        // Dropping the cache returns the banked refill entry and releases its
2466        // size-class reference.
2467        drop(cache);
2468        assert_eq!(size_class_strong_count(&class), 1);
2469    }
2470
2471    #[test]
2472    fn test_tls_size_class_cache_push_tolerates_empty_spill() {
2473        let class = test_size_class(64, 64);
2474        let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2475        let lease = SizeClassLease::retain(&class);
2476        let mut cache = TlsSizeClassCache::new(class.token, 0);
2477
2478        // Small local capacities should bypass batching and push straight to
2479        // global. The retained reference above is represented by this lease and
2480        // transferred into `cache.push`.
2481        cache.push(lease, slot, buffer);
2482        assert_eq!(cache.len, 0);
2483        drop(cache);
2484    }
2485
2486    #[test]
2487    fn test_global_freelist_returns_each_slot_once() {
2488        // Use a two-slot class with TLS capacity one so this test can exercise
2489        // the class-global freelist directly without involving local-cache
2490        // refill or spill behavior.
2491        let class = SizeClassHandle::new(
2492            NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
2493            64,
2494            64,
2495            NZU32!(2),
2496            NZUsize!(1),
2497            1,
2498            false,
2499        );
2500
2501        // Create both slot ids and keep each allocation's pointer so we can
2502        // verify that the freelist returns the same buffer parked for that slot.
2503        let (slot0, buffer0) = class.global.try_create(false).expect("first slot");
2504        let ptr0 = buffer0.as_ptr();
2505        let (slot1, buffer1) = class.global.try_create(false).expect("second slot");
2506        let ptr1 = buffer1.as_ptr();
2507
2508        class.global.put(slot0, buffer0);
2509        class.global.put(slot1, buffer1);
2510
2511        // The freelist does not preserve insertion order, so normalize by slot
2512        // before asserting identity. The important property is that each slot is
2513        // returned exactly once with its original parked buffer.
2514        let mut popped = [
2515            class.global.take().expect("first pop"),
2516            class.global.take().expect("second pop"),
2517        ];
2518        popped.sort_by_key(|(slot, _)| *slot);
2519
2520        assert_eq!(popped[0].0, slot0);
2521        assert_eq!(popped[0].1.as_ptr(), ptr0);
2522        assert_eq!(popped[1].0, slot1);
2523        assert_eq!(popped[1].1.as_ptr(), ptr1);
2524
2525        // Both slots were claimed above, so the global freelist is empty.
2526        assert!(class.global.take().is_none());
2527
2528        // Return the buffers so the freelist owns and deallocates them when the
2529        // test size class is dropped.
2530        for (slot, buffer) in popped {
2531            class.global.put(slot, buffer);
2532        }
2533    }
2534
2535    #[test]
2536    fn test_pooled_debug_and_empty_into_bytes_paths() {
2537        // Debug formatting for pooled mutable/immutable wrappers, and empty
2538        // into_bytes should detach without retaining the pool allocation.
2539        let page = page_size();
2540        let class = test_size_class(page, page);
2541        let (slot0, buffer0, class0) = class.try_create(false).expect("first slot");
2542        let (slot1, buffer1, class1) = class.try_create(false).expect("second slot");
2543        let (slot2, buffer2, class2) = class.try_create(false).expect("third slot");
2544
2545        // Mutable pooled debug should include cursor position.
2546        let pooled_mut_debug = {
2547            let pooled_mut = PooledBufMut::new(buffer0, class0, slot0);
2548            format!("{pooled_mut:?}")
2549        };
2550        assert!(pooled_mut_debug.contains("PooledBufMut"));
2551        assert!(pooled_mut_debug.contains("cursor"));
2552
2553        // Empty mutable buffer converts to empty Bytes without retaining pool memory.
2554        let empty_from_mut = PooledBufMut::new(buffer1, class1, slot1);
2555        assert!(empty_from_mut.into_bytes().is_empty());
2556
2557        // Immutable pooled debug should include capacity.
2558        let pooled = PooledBufMut::new(buffer2, class2, slot2).into_pooled();
2559        let pooled_debug = format!("{pooled:?}");
2560        assert!(pooled_debug.contains("PooledBuf"));
2561        assert!(pooled_debug.contains("capacity"));
2562        assert!(pooled.into_bytes().is_empty());
2563
2564        BufferPoolThreadCache::flush();
2565    }
2566
2567    #[test]
2568    fn test_freeze_returns_buffer_to_pool() {
2569        let page = page_size();
2570        let pool = test_pool(test_config(page, page, 2));
2571
2572        // Initially: 0 allocated, 0 available
2573        assert_eq!(get_allocated(&pool, page), 0);
2574        assert_eq!(get_available(&pool, page), 0);
2575
2576        // Allocate and freeze
2577        let buf = pool.try_alloc(page).unwrap();
2578        assert_eq!(get_allocated(&pool, page), 1);
2579        assert_eq!(get_available(&pool, page), 0);
2580
2581        let iobuf = buf.freeze();
2582        // Still allocated (held by IoBuf)
2583        assert_eq!(get_allocated(&pool, page), 1);
2584
2585        // Drop the IoBuf - buffer should return to pool
2586        drop(iobuf);
2587        assert_eq!(get_allocated(&pool, page), 0);
2588        assert_eq!(get_available(&pool, page), 1);
2589    }
2590
2591    #[test]
2592    fn test_refcount_and_copy_to_bytes_paths() {
2593        let page = page_size();
2594        let pool = test_pool(test_config(page, page, 2));
2595
2596        // Refcount behavior:
2597        // - clone/slice keep the pooled allocation alive
2598        // - empty slice does not keep ownership
2599        {
2600            let mut buf = pool.try_alloc(page).unwrap();
2601            buf.put_slice(&[0xAA; 100]);
2602            let iobuf = buf.freeze();
2603            let clone = iobuf.clone();
2604            let slice = iobuf.slice(10..40);
2605            let empty = iobuf.slice(10..10);
2606            assert!(empty.is_empty());
2607            drop(iobuf);
2608            assert_eq!(get_allocated(&pool, page), 1);
2609            drop(slice);
2610            assert_eq!(get_allocated(&pool, page), 1);
2611            drop(clone);
2612            assert_eq!(get_allocated(&pool, page), 0);
2613        }
2614
2615        // IoBuf::copy_to_bytes behavior:
2616        // - zero-length copy is empty and non-advancing
2617        // - partial copy advances while keeping ownership alive
2618        // - full drain transfers ownership out of source
2619        // - zero-length copy on already-empty source stays detached
2620        {
2621            let mut buf = pool.try_alloc(page).unwrap();
2622            buf.put_slice(&[0x42; 100]);
2623            let mut iobuf = buf.freeze();
2624
2625            let zero = iobuf.copy_to_bytes(0);
2626            assert!(zero.is_empty());
2627            assert_eq!(iobuf.remaining(), 100);
2628
2629            let partial = iobuf.copy_to_bytes(30);
2630            assert_eq!(&partial[..], &[0x42; 30]);
2631            assert_eq!(iobuf.remaining(), 70);
2632
2633            let rest = iobuf.copy_to_bytes(70);
2634            assert_eq!(&rest[..], &[0x42; 70]);
2635            assert_eq!(iobuf.remaining(), 0);
2636
2637            // Zero-length copy on empty should not transfer ownership.
2638            let empty = iobuf.copy_to_bytes(0);
2639            assert!(empty.is_empty());
2640
2641            drop(iobuf);
2642            assert_eq!(get_allocated(&pool, page), 1);
2643            drop(zero);
2644            drop(partial);
2645            assert_eq!(get_allocated(&pool, page), 1);
2646            drop(rest);
2647            assert_eq!(get_allocated(&pool, page), 0);
2648        }
2649
2650        // IoBufMut::copy_to_bytes mirrors the immutable ownership semantics.
2651        {
2652            let buf = pool.try_alloc(page).unwrap();
2653            let mut iobufmut = buf;
2654            iobufmut.put_slice(&[0x7E; 100]);
2655
2656            let zero = iobufmut.copy_to_bytes(0);
2657            assert!(zero.is_empty());
2658            assert_eq!(iobufmut.remaining(), 100);
2659
2660            let partial = iobufmut.copy_to_bytes(30);
2661            assert_eq!(&partial[..], &[0x7E; 30]);
2662            assert_eq!(iobufmut.remaining(), 70);
2663
2664            let rest = iobufmut.copy_to_bytes(70);
2665            assert_eq!(&rest[..], &[0x7E; 70]);
2666            assert_eq!(iobufmut.remaining(), 0);
2667
2668            drop(iobufmut);
2669            assert_eq!(get_allocated(&pool, page), 1);
2670            drop(zero);
2671            drop(partial);
2672            assert_eq!(get_allocated(&pool, page), 1);
2673            drop(rest);
2674            assert_eq!(get_allocated(&pool, page), 0);
2675        }
2676    }
2677
2678    #[test]
2679    fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
2680        // IoBuf -> IoBufMut should recover pooled ownership for unique non-full views.
2681        let page = page_size();
2682        let pool = test_pool(test_config(page, page, 2));
2683
2684        let buf = pool.try_alloc(page).unwrap();
2685        assert_eq!(get_allocated(&pool, page), 1);
2686
2687        let iobuf = buf.freeze();
2688        assert_eq!(get_allocated(&pool, page), 1);
2689
2690        let iobufmut: IoBufMut = iobuf.into();
2691
2692        // Conversion reused pooled storage instead of copying.
2693        assert_eq!(
2694            get_allocated(&pool, page),
2695            1,
2696            "pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
2697        );
2698        assert_eq!(get_available(&pool, page), 0);
2699
2700        // Dropping returns the pooled buffer.
2701        drop(iobufmut);
2702        assert_eq!(get_allocated(&pool, page), 0);
2703        assert_eq!(get_available(&pool, page), 1);
2704    }
2705
2706    #[test]
2707    fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
2708        // IoBuf -> IoBufMut via From should preserve data and keep pooled
2709        // ownership for a fully-written unique view.
2710        let page = page_size();
2711        let pool = test_pool(test_config(page, page, 2));
2712
2713        // Fill a pooled buffer completely and freeze.
2714        let mut buf = pool.try_alloc(page).unwrap();
2715        buf.put_slice(&vec![0xEE; page]);
2716        let iobuf = buf.freeze();
2717
2718        // Convert back to mutable; should reuse pooled storage.
2719        let iobufmut: IoBufMut = iobuf.into();
2720        assert_eq!(iobufmut.len(), page);
2721        assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
2722        assert_eq!(get_allocated(&pool, page), 1);
2723        assert_eq!(get_available(&pool, page), 0);
2724
2725        // Dropping returns the buffer to the pool.
2726        drop(iobufmut);
2727        assert_eq!(get_allocated(&pool, page), 0);
2728        assert_eq!(get_available(&pool, page), 1);
2729    }
2730
2731    #[test]
2732    fn test_iobuf_try_into_mut_recycles_full_unique_view() {
2733        // try_into_mut on a uniquely-owned full-view pooled IoBuf should recover
2734        // mutable ownership without copying, preserving data and pool tracking.
2735        let page = page_size();
2736        let pool = test_pool(test_config(page, page, 2));
2737
2738        let mut buf = pool.try_alloc(page).unwrap();
2739        buf.put_slice(&vec![0xAB; page]);
2740        let iobuf = buf.freeze();
2741        assert_eq!(get_allocated(&pool, page), 1);
2742
2743        // Unique full view should recycle.
2744        let recycled = iobuf
2745            .try_into_mut()
2746            .expect("unique full-view pooled buffer should recycle");
2747        assert_eq!(recycled.len(), page);
2748        assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
2749        assert_eq!(recycled.capacity(), page);
2750        assert_eq!(get_allocated(&pool, page), 1);
2751
2752        drop(recycled);
2753        assert_eq!(get_allocated(&pool, page), 0);
2754        assert_eq!(get_available(&pool, page), 1);
2755    }
2756
2757    #[test]
2758    fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
2759        let page = page_size();
2760        let pool = test_pool(test_config(page, page, 2));
2761
2762        // Unique sliced views can recover mutable ownership without copying.
2763        let mut buf = pool.try_alloc(page).unwrap();
2764        buf.put_slice(&vec![0xCD; page]);
2765        let iobuf = buf.freeze();
2766        let sliced = iobuf.slice(1..page);
2767        drop(iobuf);
2768        let recycled = sliced
2769            .try_into_mut()
2770            .expect("unique sliced pooled buffer should recycle");
2771        assert_eq!(recycled.len(), page - 1);
2772        assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
2773        assert_eq!(recycled.capacity(), page - 1);
2774        assert_eq!(get_allocated(&pool, page), 1);
2775        drop(recycled);
2776        assert_eq!(get_allocated(&pool, page), 0);
2777        assert_eq!(get_available(&pool, page), 1);
2778
2779        // Shared views still cannot recover mutable ownership.
2780        let mut buf = pool.try_alloc(page).unwrap();
2781        buf.put_slice(&vec![0xEF; page]);
2782        let iobuf = buf.freeze();
2783        let cloned = iobuf.clone();
2784        let iobuf = iobuf
2785            .try_into_mut()
2786            .expect_err("shared pooled buffer must not convert to mutable");
2787
2788        drop(cloned);
2789        drop(iobuf);
2790        assert_eq!(get_allocated(&pool, page), 0);
2791        assert!(get_available(&pool, page) >= 1);
2792    }
2793
2794    #[test]
2795    fn test_multithreaded_alloc_freeze_return() {
2796        let page = page_size();
2797        let pool = Arc::new(test_pool(test_config(page, page, 100)));
2798
2799        let mut handles = vec![];
2800
2801        // Reduce iterations under miri (atomics are slow)
2802        cfg_if::cfg_if! {
2803            if #[cfg(miri)] {
2804                let iterations = 100;
2805            } else {
2806                let iterations = 1000;
2807            }
2808        }
2809
2810        // Spawn multiple threads that allocate, freeze, clone, and drop
2811        for _ in 0..10 {
2812            let pool = pool.clone();
2813            let handle = thread::spawn(move || {
2814                for _ in 0..iterations {
2815                    let buf = pool.try_alloc(page).unwrap();
2816                    let iobuf = buf.freeze();
2817
2818                    // Clone a few times
2819                    let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
2820                    drop(iobuf);
2821
2822                    // Drop clones
2823                    for clone in clones {
2824                        drop(clone);
2825                    }
2826                }
2827            });
2828            handles.push(handle);
2829        }
2830
2831        // Wait for all threads
2832        for handle in handles {
2833            handle.join().unwrap();
2834        }
2835
2836        // Worker threads may retain free buffers in their own local caches, so
2837        // the main thread cannot assert that all of them are visible here.
2838        // It should still be able to allocate successfully once the workers finish.
2839        let _buf = pool
2840            .try_alloc(page)
2841            .expect("pool should remain usable after multithreaded test");
2842    }
2843
2844    #[test]
2845    fn test_cross_thread_buffer_return() {
2846        // Allocate on one thread, freeze, send to another thread, drop there
2847        let page = page_size();
2848        let pool = test_pool(test_config(page, page, 100));
2849
2850        let (tx, rx) = mpsc::channel();
2851
2852        // Allocate and freeze on main thread
2853        for _ in 0..50 {
2854            let buf = pool.try_alloc(page).unwrap();
2855            let iobuf = buf.freeze();
2856            tx.send(iobuf).unwrap();
2857        }
2858        drop(tx);
2859
2860        // Receive and drop on another thread. Cross-thread returns initialize
2861        // the dropping thread's local cache, so the buffers remain local to that
2862        // thread instead of bouncing through the global freelist.
2863        let handle = thread::spawn(move || {
2864            while let Ok(iobuf) = rx.recv() {
2865                drop(iobuf);
2866            }
2867
2868            let class_index = pool
2869                .class_index(page)
2870                .expect("class exists for page-sized buffer");
2871            assert_eq!(get_local_len(&pool.inner.classes[class_index]), 50);
2872            assert_eq!(get_global_len(&pool.inner.classes[class_index]), 0);
2873
2874            for _ in 0..50 {
2875                let _buf = pool
2876                    .try_alloc(page)
2877                    .expect("dropping thread should be able to reuse locally returned buffers");
2878            }
2879        });
2880
2881        handle.join().unwrap();
2882    }
2883
2884    #[test]
2885    fn test_thread_exit_flushes_local_bin() {
2886        // When a thread exits, its TLS cache Drop flushes buffers back to the
2887        // global freelist, making them available to other threads.
2888        let page = page_size();
2889        let pool = Arc::new(test_pool(test_config(page, page, 1)));
2890
2891        // Allocate and return a buffer on a worker thread, then let it exit.
2892        let worker_pool = pool.clone();
2893        thread::spawn(move || {
2894            let buf = worker_pool
2895                .try_alloc(page)
2896                .expect("worker should allocate tracked buffer");
2897            drop(buf);
2898        })
2899        .join()
2900        .expect("worker thread should exit cleanly");
2901
2902        // After thread exit, the buffer should be in the global freelist (not
2903        // stuck in a dead thread's local cache).
2904        let class_index = pool
2905            .class_index(page)
2906            .expect("class exists for page-sized buffer");
2907        assert_eq!(get_global_len(&pool.inner.classes[class_index]), 1);
2908        assert_eq!(get_local_len(&pool.inner.classes[class_index]), 0);
2909
2910        // The flushed buffer should be reusable from the main thread.
2911        let _buf = pool
2912            .try_alloc(page)
2913            .expect("thread-exited local buffer should be reusable");
2914    }
2915
2916    #[test]
2917    fn test_pool_drop_drains_global_freelist() {
2918        // Dropping the pool should immediately reclaim globally-visible free
2919        // tracked buffers, while leaving TLS-cached buffers alone.
2920        let page = page_size();
2921        let pool = test_pool(test_config(page, page, 2));
2922        let class_index = pool
2923            .class_index(page)
2924            .expect("class exists for page-sized buffer");
2925        let class = &pool.inner.classes[class_index];
2926        // Keep a test-owned handle so the class remains inspectable after
2927        // dropping the public pool below.
2928        // SAFETY: `class` owns one strong reference for `class.token`.
2929        unsafe { class.token.retain() };
2930        let class = SizeClassHandle { token: class.token };
2931
2932        // Return one buffer to the current thread's local cache and overflow
2933        // the other into the shared global freelist.
2934        let buf1 = pool.try_alloc(page).unwrap();
2935        let buf2 = pool.try_alloc(page).unwrap();
2936        drop(buf1);
2937        drop(buf2);
2938
2939        assert_eq!(get_global_len(&class), 1);
2940        assert_eq!(get_local_len(&class), 1);
2941
2942        // Pool drop should drain only the global freelist. The thread-local
2943        // cache remains untouched until thread exit.
2944        drop(pool);
2945
2946        assert_eq!(get_global_len(&class), 0);
2947        assert_eq!(get_local_len(&class), 1);
2948        assert_eq!(get_global_created(&class), 2);
2949    }
2950
2951    #[test]
2952    fn test_pool_dropped_before_buffer() {
2953        // What happens if the pool is dropped while buffers are still in use?
2954        // The size class remains alive until the last tracked buffer is dropped.
2955
2956        let page = page_size();
2957        let pool = test_pool(test_config(page, page, 2));
2958
2959        let mut buf = pool.try_alloc(page).unwrap();
2960        buf.put_slice(&[0u8; 100]);
2961        let iobuf = buf.freeze();
2962
2963        // Drop the pool while buffer is still alive
2964        drop(pool);
2965
2966        // Buffer should still be usable
2967        assert_eq!(iobuf.len(), 100);
2968
2969        // Dropping the buffer should not panic and should return to the retained size class.
2970        drop(iobuf);
2971        // No assertion here - we just want to make sure it doesn't panic
2972    }
2973
2974    #[test]
2975    fn test_pool_exhaustion_and_recovery() {
2976        // Test pool exhaustion and recovery.
2977        let page = page_size();
2978        let pool = test_pool(test_config(page, page, 3));
2979
2980        // Exhaust the pool
2981        let buf1 = pool.try_alloc(page).expect("first alloc");
2982        let buf2 = pool.try_alloc(page).expect("second alloc");
2983        let buf3 = pool.try_alloc(page).expect("third alloc");
2984        assert!(pool.try_alloc(page).is_err(), "pool should be exhausted");
2985
2986        // Return one buffer
2987        drop(buf1);
2988
2989        // Should be able to allocate again
2990        let buf4 = pool.try_alloc(page).expect("alloc after return");
2991        assert!(pool.try_alloc(page).is_err(), "pool exhausted again");
2992
2993        // Return all and verify freelist reuse
2994        drop(buf2);
2995        drop(buf3);
2996        drop(buf4);
2997
2998        assert_eq!(get_allocated(&pool, page), 0);
2999        assert_eq!(get_available(&pool, page), 3);
3000
3001        // Allocate again - should reuse from freelist
3002        let _buf5 = pool.try_alloc(page).expect("reuse from freelist");
3003        assert_eq!(get_available(&pool, page), 2);
3004    }
3005
3006    #[test]
3007    fn test_try_alloc_errors() {
3008        // Test try_alloc error variants.
3009        let page = page_size();
3010        let pool = test_pool(test_config(page, page, 2));
3011
3012        // Oversized request
3013        let result = pool.try_alloc(page * 10);
3014        assert_eq!(result.unwrap_err(), PoolError::Oversized);
3015
3016        // Exhaust pool
3017        let _buf1 = pool.try_alloc(page).unwrap();
3018        let _buf2 = pool.try_alloc(page).unwrap();
3019        let result = pool.try_alloc(page);
3020        assert_eq!(result.unwrap_err(), PoolError::Exhausted);
3021    }
3022
3023    #[test]
3024    fn test_try_alloc_zeroed_errors() {
3025        // try_alloc_zeroed should return the same error variants as try_alloc.
3026        let page = page_size();
3027        let pool = test_pool(test_config(page, page, 2));
3028
3029        // Oversized request.
3030        let result = pool.try_alloc_zeroed(page * 10);
3031        assert_eq!(result.unwrap_err(), PoolError::Oversized);
3032
3033        // Exhaust pool, then verify Exhausted error.
3034        let _buf1 = pool.try_alloc_zeroed(page).unwrap();
3035        let _buf2 = pool.try_alloc_zeroed(page).unwrap();
3036        let result = pool.try_alloc_zeroed(page);
3037        assert_eq!(result.unwrap_err(), PoolError::Exhausted);
3038    }
3039
3040    #[test]
3041    fn test_fallback_allocation() {
3042        // Test fallback allocation when pool is exhausted or oversized.
3043        let page = page_size();
3044        let pool = test_pool(test_config(page, page, 2));
3045
3046        // Exhaust the pool
3047        let buf1 = pool.try_alloc(page).unwrap();
3048        let buf2 = pool.try_alloc(page).unwrap();
3049        assert!(buf1.is_pooled());
3050        assert!(buf2.is_pooled());
3051
3052        // Fallback via alloc() when exhausted - still aligned, but untracked
3053        let mut fallback_exhausted = pool.alloc(page);
3054        assert!(!fallback_exhausted.is_pooled());
3055        assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
3056
3057        // Fallback via alloc() when oversized - still aligned, but untracked
3058        let mut fallback_oversized = pool.alloc(page * 10);
3059        assert!(!fallback_oversized.is_pooled());
3060        assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
3061
3062        // Verify pool counters unchanged by fallback allocations
3063        assert_eq!(get_allocated(&pool, page), 2);
3064
3065        // Drop fallback buffers - should not affect pool counters
3066        drop(fallback_exhausted);
3067        drop(fallback_oversized);
3068        assert_eq!(get_allocated(&pool, page), 2);
3069
3070        // Drop tracked buffers - counters should decrease
3071        drop(buf1);
3072        drop(buf2);
3073        assert_eq!(get_allocated(&pool, page), 0);
3074    }
3075
3076    #[test]
3077    fn test_is_pooled() {
3078        // IoBufMut from the pool should report is_pooled, while heap-backed
3079        // buffers should not.
3080        let page = page_size();
3081        let pool = test_pool(test_config(page, page, 10));
3082
3083        let pooled = pool.try_alloc(page).unwrap();
3084        assert!(pooled.is_pooled());
3085
3086        let owned = IoBufMut::with_capacity(100);
3087        assert!(!owned.is_pooled());
3088    }
3089
3090    #[test]
3091    fn test_iobuf_is_pooled() {
3092        let page = page_size();
3093        let pool = test_pool(test_config(page, page, 2));
3094
3095        let pooled = pool.try_alloc(page).unwrap().freeze();
3096        assert!(pooled.is_pooled());
3097
3098        // Oversized alloc uses untracked fallback allocation.
3099        let fallback = pool.alloc(page * 10).freeze();
3100        assert!(!fallback.is_pooled());
3101
3102        let bytes = IoBuf::copy_from_slice(b"hello");
3103        assert!(!bytes.is_pooled());
3104    }
3105
3106    #[test]
3107    fn test_buffer_alignment() {
3108        let page = page_size();
3109        let cache_line = cache_line_size();
3110
3111        // Reduce max_per_class under miri (atomics are slow)
3112        cfg_if::cfg_if! {
3113            if #[cfg(miri)] {
3114                let storage_config = BufferPoolConfig {
3115                    max_per_class: NZU32!(32),
3116                    ..BufferPoolConfig::for_storage().with_alignment(NZUsize!(page))
3117                };
3118                let network_config = BufferPoolConfig {
3119                    max_per_class: NZU32!(32),
3120                    ..BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line))
3121                };
3122            } else {
3123                let storage_config =
3124                    BufferPoolConfig::for_storage().with_alignment(NZUsize!(page));
3125                let network_config =
3126                    BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line));
3127            }
3128        }
3129
3130        // Storage preset - page aligned
3131        let storage_buffer_pool = test_pool(storage_config);
3132        let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
3133        assert_eq!(
3134            buf.as_mut_ptr() as usize % page,
3135            0,
3136            "storage buffer not page-aligned"
3137        );
3138
3139        // Network preset - cache-line aligned
3140        let network_buffer_pool = test_pool(network_config);
3141        let mut buf = network_buffer_pool.try_alloc(100).unwrap();
3142        assert_eq!(
3143            buf.as_mut_ptr() as usize % cache_line,
3144            0,
3145            "network buffer not cache-line aligned"
3146        );
3147    }
3148}