commonware_runtime/iobuf/pool.rs
1//! Buffer pool for efficient I/O operations.
2//!
3//! Provides pooled, aligned buffers that can be reused to reduce allocation
4//! overhead. Buffer alignment is configurable: use page alignment for storage I/O
5//! (required for direct I/O and DMA), or cache-line alignment for network I/O
6//! (reduces fragmentation).
7//!
8//! # Thread Safety
9//!
10//! [`BufferPool`] is `Send + Sync` and can be safely shared across threads.
11//! Allocation and deallocation use atomic counters together with a bounded
12//! lock-free global freelist plus per-thread caches.
13//!
14//! # Pool Lifecycle
15//!
16//! Tracked buffers held by pooled views or cached in thread-local bins keep a
17//! strong reference to the originating size class. Buffers can outlive the
18//! public [`BufferPool`] handle and still return to their original size class.
19//! - Untracked fallback allocations store no class reference and deallocate
20//! directly when dropped.
21//! - Requests smaller than [`BufferPoolConfig::pool_min_size`] bypass pooling
22//! entirely and return untracked aligned allocations from both
23//! [`BufferPool::try_alloc`] and [`BufferPool::alloc`].
24//! - Dropping [`BufferPool`] drains only the shared global freelists, pooled
25//! views and buffers cached in a live thread's local cache can keep their
26//! size class alive until they are dropped or the thread exits.
27//!
28//! # Size Classes
29//!
30//! Buffers are organized into power-of-two size classes from `min_size` to
31//! `max_size`. For example, with `min_size = 4096` and `max_size = 32768`:
32//! - Class 0: 4096 bytes
33//! - Class 1: 8192 bytes
34//! - Class 2: 16384 bytes
35//! - Class 3: 32768 bytes
36//!
37//! Allocation requests are rounded up to the next size class. Requests larger
38//! than `max_size` return [`PoolError::Oversized`] from [`BufferPool::try_alloc`],
39//! or fall back to an untracked aligned heap allocation from [`BufferPool::alloc`].
40//!
41//! # Cache Structure
42//!
43//! Each size class uses a two-level allocator:
44//! - a small per-thread local cache for steady-state same-thread reuse
45//! - a shared global freelist for refill and spill between threads
46//!
47//! When a local cache misses, the pool refills a small batch from the global
48//! freelist before attempting to create a new tracked buffer. Returned buffers
49//! first try to re-enter the dropping thread's local cache, spilling a bounded
50//! batch back to the global freelist if needed.
51
52use super::{freelist::Freelist, page_size, IoBufMut};
53use crate::{
54 iobuf::buffer::{PooledBufMut, PooledBuffer},
55 telemetry::metrics::{raw, Counter, CounterFamily, EncodeLabelSet, GaugeFamily, Register},
56};
57use commonware_utils::{NZUsize, NZU32};
58use std::{
59 alloc::Layout,
60 cell::{Cell, UnsafeCell},
61 mem::MaybeUninit,
62 num::{NonZeroU32, NonZeroUsize},
63 ptr,
64 sync::{
65 atomic::{AtomicUsize, Ordering},
66 Arc,
67 },
68};
69
70/// Minimum thread-local cache capacity required before refill/spill batches.
71///
72/// Below this threshold TLS still provides same-thread locality, but batching
73/// would degrade to single-buffer moves and add policy complexity without
74/// amortizing shared-queue traffic.
75const MIN_TLS_BATCH_CAPACITY: usize = 4;
76
77/// Error returned when buffer pool allocation fails.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum PoolError {
80 /// The requested capacity exceeds the maximum buffer size.
81 Oversized,
82 /// The pool is exhausted for the required size class.
83 Exhausted,
84}
85
86impl std::fmt::Display for PoolError {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
90 Self::Exhausted => write!(f, "pool exhausted for required size class"),
91 }
92 }
93}
94
95impl std::error::Error for PoolError {}
96
97/// Policy for sizing each thread's cache within a buffer pool size class.
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub(crate) enum BufferPoolThreadCacheConfig {
100 /// Enable thread-local caching.
101 ///
102 /// `None` derives the per-thread cache size from the pool's per-class
103 /// capacity and expected parallelism, reserving about half of each class
104 /// for the shared freelist. Small per-class budgets may resolve to zero,
105 /// disabling thread-local caching so free buffers do not become stranded in
106 /// other threads.
107 ///
108 /// `Some(n)` uses an exact per-thread cache size for every size class.
109 Enabled(Option<NonZeroUsize>),
110 /// Disable thread-local caching and route all reuse through the shared global freelist.
111 Disabled,
112}
113
114/// Configuration for a buffer pool.
115#[derive(Debug, Clone)]
116pub struct BufferPoolConfig {
117 /// Minimum request size that should use pooled allocation.
118 ///
119 /// Requests smaller than this bypass the pool and use direct aligned
120 /// allocation instead. A value of `0` means all eligible requests use the
121 /// pool.
122 pub pool_min_size: usize,
123 /// Minimum buffer size. Must be >= alignment and a power of two.
124 pub min_size: NonZeroUsize,
125 /// Maximum buffer size. Must be a power of two and >= min_size.
126 pub max_size: NonZeroUsize,
127 /// Maximum number of buffers per size class.
128 ///
129 /// Size-class slots are identified by `u32`, so the per-class capacity is
130 /// capped by this type.
131 pub max_per_class: NonZeroU32,
132 /// Whether to create every tracked buffer during pool construction.
133 ///
134 /// When enabled, each size class creates `max_per_class` buffers and parks
135 /// them in the class-global freelist before the pool is returned. This
136 /// moves allocation cost to startup and makes the first reuse path avoid
137 /// heap allocation.
138 pub prefill: bool,
139 /// Buffer alignment. Must be a power of two.
140 pub alignment: NonZeroUsize,
141 /// Expected number of threads concurrently accessing the pool.
142 ///
143 /// This sizes the shared global freelist stripes. It is also used to derive
144 /// thread-cache capacity when the thread-cache policy is automatic, using
145 /// approximately half of [`Self::max_per_class`] divided across expected
146 /// threads.
147 pub parallelism: NonZeroUsize,
148 /// Policy for sizing the per-thread local cache in each size class.
149 ///
150 /// By default, thread-cache capacity is derived from [`Self::parallelism`].
151 /// [`Self::with_thread_cache_capacity`] uses an exact per-thread cache size.
152 /// [`Self::with_thread_cache_disabled`] bypasses thread-local caches.
153 pub(crate) thread_cache_config: BufferPoolThreadCacheConfig,
154}
155
156impl BufferPoolConfig {
157 /// Network I/O preset: 1KB to 128KB buffers, 4096 per class, not prefilled.
158 ///
159 /// Network operations typically need multiple concurrent buffers per
160 /// connection (message, encoding, encryption) so we allow 4096 buffers per
161 /// size class.
162 pub const fn for_network() -> Self {
163 Self {
164 pool_min_size: 0,
165 min_size: NZUsize!(1024),
166 max_size: NZUsize!(128 * 1024),
167 max_per_class: NZU32!(4096),
168 prefill: false,
169 alignment: NZUsize!(1),
170 parallelism: NZUsize!(1),
171 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
172 }
173 }
174
175 /// Storage I/O preset: `page_size` (usually 4KB) to 8MB buffers, 64 per class,
176 /// not prefilled.
177 pub fn for_storage() -> Self {
178 let page = NZUsize!(page_size());
179 Self {
180 pool_min_size: 0,
181 min_size: page,
182 max_size: NZUsize!(8 * 1024 * 1024),
183 max_per_class: NZU32!(64),
184 prefill: false,
185 // TODO (#2960): this needs to be page/block aligned for O_DIRECT
186 alignment: NZUsize!(1),
187 parallelism: NZUsize!(1),
188 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
189 }
190 }
191
192 /// Returns a copy of this config with a new minimum request size that uses pooling.
193 pub const fn with_pool_min_size(mut self, pool_min_size: usize) -> Self {
194 self.pool_min_size = pool_min_size;
195 self
196 }
197
198 /// Returns a copy of this config with a new minimum buffer size.
199 pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
200 self.min_size = min_size;
201 self
202 }
203
204 /// Returns a copy of this config with a new maximum buffer size.
205 pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
206 self.max_size = max_size;
207 self
208 }
209
210 /// Returns a copy of this config with a new maximum number of buffers per size class.
211 pub const fn with_max_per_class(mut self, max_per_class: NonZeroU32) -> Self {
212 self.max_per_class = max_per_class;
213 self
214 }
215
216 /// Returns a copy of this config with a new expected parallelism.
217 ///
218 /// This controls the minimum global-freelist stripe count, and controls
219 /// thread-cache capacity when the thread-cache policy is automatic. The
220 /// automatic policy reserves about half of each class for the global
221 /// freelist and divides the remaining capacity across expected threads.
222 pub const fn with_parallelism(mut self, parallelism: NonZeroUsize) -> Self {
223 self.parallelism = parallelism;
224 self
225 }
226
227 /// Returns a copy of this config with an explicit per-thread cache size.
228 ///
229 /// Global-freelist striping is set separately by [`Self::with_parallelism`].
230 pub const fn with_thread_cache_capacity(mut self, thread_cache_capacity: NonZeroUsize) -> Self {
231 self.thread_cache_config =
232 BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity));
233 self
234 }
235
236 /// Returns a copy of this config with thread-local caching disabled.
237 ///
238 /// Global-freelist striping is set separately by [`Self::with_parallelism`].
239 pub const fn with_thread_cache_disabled(mut self) -> Self {
240 self.thread_cache_config = BufferPoolThreadCacheConfig::Disabled;
241 self
242 }
243
244 /// Returns a copy of this config with a new prefill setting.
245 pub const fn with_prefill(mut self, prefill: bool) -> Self {
246 self.prefill = prefill;
247 self
248 }
249
250 /// Returns a copy of this config with a new alignment.
251 pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
252 self.alignment = alignment;
253 self
254 }
255
256 /// Returns a copy of this config sized for an approximate tracked-memory budget.
257 ///
258 /// This computes `max_per_class` as:
259 ///
260 /// `ceil(budget_bytes / sum(size_class_bytes))`
261 ///
262 /// where `size_class_bytes` includes every class from `min_size` to `max_size`.
263 /// This always rounds up to at least one buffer per size class, so the
264 /// resulting estimated capacity may exceed `budget_bytes`.
265 ///
266 /// # Panics
267 ///
268 /// - `min_size` is not a power of two
269 /// - `max_size` is not a power of two
270 /// - `max_size < min_size`
271 /// - the derived per-class capacity does not fit in `u32`.
272 pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
273 self.validate_size_class_bounds();
274
275 let mut class_bytes = 0usize;
276 let min_size = self.min_size.get();
277 for i in 0..Self::num_classes(min_size, self.max_size.get()) {
278 class_bytes = class_bytes.saturating_add(Self::class_size(min_size, i));
279 }
280 if class_bytes == 0 {
281 return self;
282 }
283 let max_per_class = u32::try_from(budget_bytes.get().div_ceil(class_bytes))
284 .expect("max_per_class must fit in u32 slot ids");
285 self.max_per_class =
286 NonZeroU32::new(max_per_class).expect("max_per_class must be non-zero");
287 self
288 }
289
290 /// Validates the size-class bounds, panicking on invalid values.
291 ///
292 /// # Panics
293 ///
294 /// - `min_size` is not a power of two
295 /// - `max_size` is not a power of two
296 /// - `max_size < min_size`
297 fn validate_size_class_bounds(&self) {
298 let min_size = self.min_size.get();
299 let max_size = self.max_size.get();
300
301 assert!(
302 min_size.is_power_of_two(),
303 "min_size must be a power of two"
304 );
305 assert!(
306 max_size.is_power_of_two(),
307 "max_size must be a power of two"
308 );
309 assert!(max_size >= min_size, "max_size must be >= min_size");
310 }
311
312 /// Validates the configuration, panicking on invalid values.
313 ///
314 /// # Panics
315 ///
316 /// - `alignment` is not a power of two
317 /// - `min_size` is not a power of two
318 /// - `max_size` is not a power of two
319 /// - `min_size < alignment`
320 /// - `max_size < min_size`
321 /// - `pool_min_size > min_size`
322 /// - explicit `thread_cache_capacity > max_per_class`
323 fn validate(&self) {
324 self.validate_size_class_bounds();
325 assert!(
326 self.alignment.is_power_of_two(),
327 "alignment must be a power of two"
328 );
329 assert!(
330 self.min_size >= self.alignment,
331 "min_size ({}) must be >= alignment ({})",
332 self.min_size,
333 self.alignment
334 );
335 assert!(
336 self.pool_min_size <= self.min_size.get(),
337 "pool_min_size ({}) must be <= min_size ({})",
338 self.pool_min_size,
339 self.min_size
340 );
341 if let BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity)) =
342 self.thread_cache_config
343 {
344 assert!(
345 thread_cache_capacity.get() <= self.max_per_class.get() as usize,
346 "thread_cache_capacity ({}) must be <= max_per_class ({})",
347 thread_cache_capacity,
348 self.max_per_class
349 );
350 }
351 }
352
353 /// Returns the number of size classes between validated bounds.
354 #[inline]
355 const fn num_classes(min_size: usize, max_size: usize) -> usize {
356 // Since sizes are powers of two, trailing zeros is the size-class
357 // exponent
358 (max_size.trailing_zeros() - min_size.trailing_zeros() + 1) as usize
359 }
360
361 /// Returns the buffer size for a validated size-class index.
362 #[inline]
363 const fn class_size(min_size: usize, index: usize) -> usize {
364 min_size << index
365 }
366
367 /// Resolves the effective per-thread cache size for each size class.
368 ///
369 /// Derived capacities divide half of the class budget across the expected
370 /// parallelism so cross-thread reuse remains effective. Small class budgets
371 /// may resolve to zero.
372 fn resolve_thread_cache_capacity(&self) -> usize {
373 match self.thread_cache_config {
374 BufferPoolThreadCacheConfig::Enabled(None) => {
375 let max_per_class = self.max_per_class.get() as usize;
376 let effective_threads = self.parallelism.get().min(max_per_class);
377 max_per_class / (2 * effective_threads)
378 }
379 BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity)) => {
380 thread_cache_capacity.get()
381 }
382 BufferPoolThreadCacheConfig::Disabled => 0,
383 }
384 }
385}
386
387/// Label for buffer pool metrics, identifying the size class.
388#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
389struct SizeClassLabel {
390 size_class: u64,
391}
392
393/// Metrics for the buffer pool.
394struct PoolMetrics {
395 /// Number of tracked buffers created for the size class.
396 created: GaugeFamily<SizeClassLabel>,
397 /// Total number of failed allocations (pool exhausted).
398 exhausted_total: CounterFamily<SizeClassLabel>,
399 /// Total number of oversized allocation requests.
400 oversized_total: Counter,
401}
402
403impl PoolMetrics {
404 fn new(registry: &mut impl Register) -> Self {
405 Self {
406 created: registry.register(
407 "buffer_pool_created",
408 "Number of tracked buffers created for the pool",
409 raw::Family::default(),
410 ),
411 exhausted_total: registry.register(
412 "buffer_pool_exhausted_total",
413 "Total number of failed allocations due to pool exhaustion",
414 raw::Family::default(),
415 ),
416 oversized_total: registry.register(
417 "buffer_pool_oversized_total",
418 "Total number of allocation requests exceeding max buffer size",
419 raw::Counter::default(),
420 ),
421 }
422 }
423}
424
425/// Per-size-class state.
426///
427/// Each class is a small two-level allocator:
428/// - a shared global freelist for tracked buffers visible to all threads
429/// - a per-thread local cache for same-thread reuse
430///
431/// The global freelist owns the allocation layout, slot reservation counter,
432/// and parking cells for this class. A tracked buffer can be globally parked,
433/// owned by a pooled backing, or parked in one thread's local cache, but the
434/// slot always belongs to this `SizeClass`.
435///
436/// Liveness follows the buffer ownership state. Global freelist entries rely on
437/// the pool's [`SizeClassHandle`] while the pool is alive and are drained when
438/// the pool is dropped. Pooled backing values carry a [`SizeClassLease`].
439/// Thread-local cache entries use banked strong references owned by the cache.
440/// Those non-global states are what allow a buffer to outlive the public
441/// [`BufferPool`] handle and still return to the correct freelist.
442///
443/// The freelist is the only place that deallocates tracked buffers. Returning a
444/// buffer to the freelist transfers buffer ownership back to that freelist and
445/// releases the pooled-backing lease or banked strong reference that kept the
446/// class alive while the buffer was outside the global freelist.
447///
448/// Allocation prefers the local cache, then refills from the global freelist,
449/// and only creates a new tracked buffer when no free buffer is available and
450/// the class still has remaining capacity.
451pub(super) struct SizeClass {
452 /// Dense global identifier for the TLS cache registry.
453 class_id: usize,
454 /// The buffer size for this class.
455 size: usize,
456 /// Global free list of tracked buffers available for reuse.
457 global: Freelist,
458 /// Maximum number of buffers retained in the current thread's local bin.
459 thread_cache_capacity: usize,
460}
461
462// SAFETY: shared state in `SizeClass` is synchronized through atomics and the
463// global free set. Per-thread bins are stored in thread-local registries and only
464// accessed by the current thread.
465unsafe impl Send for SizeClass {}
466// SAFETY: see above.
467unsafe impl Sync for SizeClass {}
468
469/// Non-owning raw identity for a size class.
470///
471/// # Size-class lifetime model
472///
473/// A [`SizeClass`] owns the [`Freelist`] for one buffer size class. The
474/// freelist creates tracked [`PooledBuffer`]s, owns the allocation layout
475/// needed to deallocate them, and is the only place that releases their memory.
476/// A `PooledBuffer` outside the freelist does not carry enough information to
477/// deallocate itself, so it must keep its originating `SizeClass` alive until
478/// it can return to that freelist.
479///
480/// The pool has three buffer states, and those states determine where the
481/// strong size-class references live.
482///
483/// - Global freelist: the buffer is parked in [`SizeClass::global`] and carries
484/// no per-buffer strong reference. While the public pool exists, the
485/// [`SizeClassHandle`] in [`BufferPoolInner::classes`] keeps the class alive.
486/// - Pooled view: the buffer is owned by mutable or immutable I/O view state
487/// and carries one [`SizeClassLease`], which is one strong reference to the
488/// class.
489/// - Thread-local cache: the [`TlsSizeClassCache`] stores the
490/// [`SizeClassToken`] once, and owns one banked strong reference for each
491/// initialized [`TlsSizeClassCacheEntry`]. A banked reference is an owned
492/// `Arc<SizeClass>` reference counted by TLS cache state instead of
493/// represented by a `SizeClassLease` value in each entry. Increasing `len`
494/// banks one reference, decreasing `len` transfers one reference back into a
495/// `SizeClassLease` or releases it to the global freelist.
496///
497/// Moving a buffer from the global freelist to pooled view or TLS state retains
498/// one class reference. Moving it back to the global freelist releases that
499/// reference. Moving between pooled view and TLS state transfers the same
500/// reference without touching the refcount.
501///
502/// Dropping the public [`BufferPool`] drains globally parked buffers, then
503/// drops its `SizeClassHandle`s. Pooled views and non-empty TLS caches may keep
504/// the `SizeClass` alive after that point. Empty TLS caches may still remember
505/// a token value, but with no banked references that token is only an inert
506/// identity value and must not be dereferenced.
507///
508/// This is the one raw pointer shape used by all pool-owned, pooled view, and
509/// thread-local references to a [`SizeClass`]. The pointer is always derived
510/// from [`Arc::into_raw`].
511///
512/// `SizeClassToken` itself owns nothing. It is only an identity token and raw
513/// pointer accepted by the `Arc` refcount APIs:
514/// - [`SizeClassHandle`] pairs a token with ownership of one strong reference.
515/// - [`SizeClassLease`] pairs a token with ownership of one strong reference.
516/// - [`TlsSizeClassCache`] stores a token plus `len` banked strong references.
517///
518/// Because the token is non-owning, it may be stale when held by an empty TLS
519/// cache. Code may dereference it or adjust the strong count only when another
520/// invariant proves the allocation is still live. For example,
521/// [`SizeClassHandle`] and [`SizeClassLease`] prove liveness through owned
522/// strong references, and a non-empty [`TlsSizeClassCache`] proves liveness
523/// through its banked entries.
524#[derive(Clone, Copy, Debug, PartialEq, Eq)]
525struct SizeClassToken {
526 ptr: ptr::NonNull<SizeClass>,
527}
528
529impl SizeClassToken {
530 /// Creates a token and owns the initial strong reference for `class`.
531 ///
532 /// The returned token is non-owning in the type system, but the raw pointer
533 /// still represents one strong reference. The caller must wrap it in an
534 /// owning type, such as [`SizeClassHandle`], or otherwise arrange for that
535 /// strong reference to be released.
536 fn new(class: SizeClass) -> Self {
537 let ptr = Arc::into_raw(Arc::new(class)).cast_mut();
538 // SAFETY: `Arc::into_raw` never returns null.
539 let ptr = unsafe { ptr::NonNull::new_unchecked(ptr) };
540 Self { ptr }
541 }
542
543 /// Returns the referenced size class.
544 ///
545 /// # Safety
546 ///
547 /// Some owner must currently hold a strong reference for this token.
548 #[inline(always)]
549 const unsafe fn as_ref(&self) -> &SizeClass {
550 // SAFETY: guaranteed by the caller.
551 unsafe { self.ptr.as_ref() }
552 }
553
554 /// Retains one strong reference for this token.
555 ///
556 /// # Safety
557 ///
558 /// Some owner must currently hold a strong reference for this token.
559 #[inline(always)]
560 unsafe fn retain(self) {
561 // SAFETY: guaranteed by the caller.
562 unsafe { Arc::increment_strong_count(self.ptr.as_ptr()) };
563 }
564
565 /// Releases one owned strong reference for this token.
566 ///
567 /// # Safety
568 ///
569 /// The caller must own one strong reference represented by this token.
570 #[inline(always)]
571 unsafe fn release(self) {
572 // SAFETY: guaranteed by the caller.
573 unsafe { Arc::decrement_strong_count(self.ptr.as_ptr()) };
574 }
575}
576
577/// Owning pool reference to a size class.
578///
579/// This is the pool's strong `Arc<SizeClass>` reference represented by a
580/// [`SizeClassToken`]. `SizeClassHandle` is the long-lived owner for a class
581/// while the [`BufferPoolInner`] exists. Dropping the handle releases that
582/// pool-owned strong reference. A class may still outlive the handle if pooled
583/// backing values or thread-local cache entries own additional references
584/// through [`SizeClassLease`] or banked TLS refs.
585///
586/// Functionally this is an `Arc<SizeClass>` stored in raw-token form. It exists
587/// to keep the pool-owned reference alive and to provide a live token for
588/// allocation paths that need to retain pooled-backing or TLS-banked
589/// references. The raw form keeps the already-loaded class pointer usable for
590/// explicit refcount operations without calling [`Arc::as_ptr`] or storing a
591/// second token alongside an `Arc`.
592struct SizeClassHandle {
593 token: SizeClassToken,
594}
595
596// SAFETY: `SizeClassHandle` owns a strong reference to a `SizeClass`, which is
597// `Send`.
598unsafe impl Send for SizeClassHandle {}
599// SAFETY: same argument as `Send`, shared access to `SizeClass` is synchronized.
600unsafe impl Sync for SizeClassHandle {}
601
602impl SizeClassHandle {
603 /// Creates a new size class and takes ownership of its initial strong ref.
604 ///
605 /// If `prefill` is true, the global freelist creates `max` buffers upfront
606 /// and makes them immediately available for reuse.
607 fn new(
608 class_id: usize,
609 size: usize,
610 alignment: usize,
611 max: NonZeroU32,
612 parallelism: NonZeroUsize,
613 thread_cache_capacity: usize,
614 prefill: bool,
615 ) -> Self {
616 let layout = Layout::from_size_align(size, alignment).expect("alignment is a power of two");
617 let freelist = Freelist::new(max, parallelism, layout, prefill);
618 let class = SizeClass {
619 class_id,
620 size,
621 global: freelist,
622 thread_cache_capacity,
623 };
624 Self {
625 token: SizeClassToken::new(class),
626 }
627 }
628
629 /// Creates a new tracked buffer and retains this size class for its slot.
630 #[inline(always)]
631 fn try_create(&self, zeroed: bool) -> Option<(u32, PooledBuffer, SizeClassLease)> {
632 let (slot, buffer) = self.global.try_create(zeroed)?;
633 let class = SizeClassLease::retain(self);
634 Some((slot, buffer, class))
635 }
636}
637
638impl Drop for SizeClassHandle {
639 fn drop(&mut self) {
640 // SAFETY: this handle owns one strong reference for `self.token`.
641 unsafe { self.token.release() };
642 }
643}
644
645impl std::ops::Deref for SizeClassHandle {
646 type Target = SizeClass;
647
648 #[inline(always)]
649 fn deref(&self) -> &Self::Target {
650 // SAFETY: this handle owns one strong reference for `self.token`.
651 unsafe { self.token.as_ref() }
652 }
653}
654
655/// Owned size-class reference for a pooled buffer outside the global freelist.
656///
657/// A pooled buffer outside the global freelist must keep its originating
658/// [`SizeClass`] alive so it can be returned after the [`BufferPool`] handle is
659/// dropped. This is one strong `Arc<SizeClass>` reference represented by a
660/// [`SizeClassToken`], with retain and release performed explicitly at the
661/// boundaries where a buffer enters or leaves global pool state.
662///
663/// Lifetime-wise this is the same kind of reference as [`SizeClassHandle`]:
664/// both own exactly one strong reference for a token. The types are separate
665/// because they live in different state machines. `SizeClassHandle` is ordinary
666/// RAII ownership for the pool's class vector. `SizeClassLease` is hot-path
667/// pooled view ownership that must be explicitly transferred into TLS cache
668/// state or returned to the global freelist.
669///
670/// The raw representation matters because the hot path mostly transfers
671/// ownership between pooled view state and this thread's local cache. A real
672/// `Arc<SizeClass>` field is pointer-sized too, but it is a non-`Copy` value
673/// with drop glue. Even when the strong count would not change, moving it
674/// through pooled buffer and cache-entry structs makes the compiler preserve
675/// destructor paths for those structs. `SizeClassLease` has no automatic drop:
676/// moving between pooled view and local-cache state is a plain pointer
677/// transfer, and only explicit calls such as [`Self::return_global`] adjust the
678/// strong count.
679///
680/// A lease must be consumed by one of those explicit transitions, such as
681/// [`Self::into_banked`] or [`Self::return_global`]. Because this type
682/// intentionally has no `Drop` implementation, simply dropping a lease value
683/// would leak the strong reference. This keeps hot transfers free of drop glue,
684/// but means every owner must complete one of the explicit transitions.
685///
686/// Thread-local cache entries do not store a lease per entry. The cache stores
687/// the class token once and owns one banked strong reference for each
688/// initialized entry. Popping from the local cache materializes a lease from
689/// one of those banked references without touching the strong count.
690///
691/// Globally parked buffers do not carry a class reference: taking from the
692/// global freelist retains the class, and returning to the global freelist
693/// releases it.
694#[must_use]
695pub(crate) struct SizeClassLease {
696 token: SizeClassToken,
697}
698
699// SAFETY: `SizeClassLease` owns one strong reference to a `SizeClass`, which is
700// `Send`.
701unsafe impl Send for SizeClassLease {}
702// SAFETY: same argument as `Send`, shared access to `SizeClass` is synchronized.
703unsafe impl Sync for SizeClassLease {}
704
705impl SizeClassLease {
706 /// Converts one banked class reference into a lease.
707 ///
708 /// This does not retain the class. It only changes how an already-owned
709 /// strong reference is represented: from TLS cache state into a
710 /// `SizeClassLease` value.
711 ///
712 /// # Safety
713 ///
714 /// The caller must own one banked strong reference for `class.token`, and
715 /// that retained reference must be transferred to the returned lease. This
716 /// must not consume the pool-owned reference held by `class` itself.
717 #[inline(always)]
718 const unsafe fn from_banked(class: &SizeClassHandle) -> Self {
719 Self { token: class.token }
720 }
721
722 /// Retains `class` for a buffer leaving the global freelist.
723 #[inline(always)]
724 fn retain(class: &SizeClassHandle) -> Self {
725 let token = class.token;
726 // SAFETY: the borrowed `class` owns one strong reference for `token`.
727 unsafe { token.retain() };
728 Self { token }
729 }
730
731 /// Transfers this lease into a TLS cache entry.
732 ///
733 /// This does not release the class. It consumes the lease and relies on the
734 /// caller to record one additional banked reference in TLS cache state,
735 /// normally by storing an entry and increasing the cache length. The cache
736 /// must later materialize or release exactly one lease for that entry.
737 ///
738 /// This is a no-op at runtime, it exists to mark the ownership transition.
739 #[inline(always)]
740 const fn into_banked(self) {}
741
742 /// Returns the referenced size class.
743 ///
744 /// The token is valid because `SizeClassLease` owns one strong reference.
745 #[inline(always)]
746 const fn class(&self) -> &SizeClass {
747 // SAFETY: guaranteed by the ownership invariant documented on
748 // `SizeClassLease`.
749 unsafe { self.token.as_ref() }
750 }
751
752 /// Returns the buffer size for this lease's size class.
753 #[inline(always)]
754 pub(crate) const fn size(&self) -> usize {
755 self.class().size
756 }
757
758 /// Returns a buffer to this class's global freelist and releases the class
759 /// reference.
760 ///
761 /// The buffer is parked before the strong reference is released. If this is
762 /// the last outstanding reference after the public pool has been dropped,
763 /// dropping the `SizeClass` will then drain the just-parked buffer.
764 #[inline(always)]
765 fn return_global(self, slot: u32, buffer: PooledBuffer) {
766 self.class().global.put(slot, buffer);
767 // SAFETY: this lease owns one strong reference.
768 unsafe { self.token.release() };
769 }
770}
771
772/// Free tracked buffer owned by a thread-local size-class cache.
773///
774/// This is allocator cache state, not a caller-visible pooled view. While an
775/// entry is held here, the buffer is owned by the current thread and is not
776/// visible to the class-global freelist.
777///
778/// The `slot` identifies the buffer within its [`SizeClass`]. The enclosing
779/// cache owns one banked size-class reference for this entry. The entry itself
780/// intentionally stores only `(buffer, slot)` so local pop/push does not move a
781/// class pointer per buffer.
782struct TlsSizeClassCacheEntry {
783 buffer: PooledBuffer,
784 slot: u32,
785}
786
787/// Per-thread cache for one size class's tracked buffers.
788///
789/// Each instance is stored in [`TlsSizeClassCaches`] under one global
790/// [`SizeClass::class_id`], so all entries in the cache belong to the same size
791/// class. The cache owns full [`PooledBuffer`] values while they are local,
792/// returning them to the global freelist happens only on miss refill, overflow,
793/// explicit flush, or thread exit.
794///
795/// `class` is a non-owning token for this cache's size class. It can be stale
796/// while `len == 0`, because an empty cache does not keep its pool alive. When
797/// `len > 0`, each initialized entry in `entries[..len]` owns one banked
798/// size-class reference, which keeps the pointed-to class alive. The entry
799/// itself stays small (`buffer, slot`), popping it materializes a
800/// [`SizeClassLease`] from one banked reference.
801///
802/// As described in [`SizeClassToken`], a banked reference is represented by
803/// cache state rather than by a value stored in the entry. Changing `len` is
804/// therefore an ownership transition as well as a stack operation.
805///
806/// An empty cache may keep a stale token only for identity checks. It must not
807/// dereference the token or adjust its strong count until a live
808/// [`SizeClassHandle`], [`SizeClassLease`], or banked entry proves the class is
809/// still alive.
810///
811/// The hot steady-state allocation path pops an entry from `entries`, and the
812/// hot return path pushes one back while there is room.
813struct TlsSizeClassCache {
814 class: SizeClassToken,
815 entries: Box<[MaybeUninit<TlsSizeClassCacheEntry>]>,
816 len: usize,
817 capacity: usize,
818}
819
820impl TlsSizeClassCache {
821 /// Creates a new empty cache with the given maximum thread-cache size.
822 ///
823 /// The cache stores `class` for identity, but starts with `len == 0` and
824 /// therefore owns no banked size-class references.
825 fn new(class: SizeClassToken, capacity: usize) -> Self {
826 let entries = (0..capacity)
827 .map(|_| MaybeUninit::uninit())
828 .collect::<Vec<_>>()
829 .into_boxed_slice();
830 Self {
831 class,
832 entries,
833 len: 0,
834 capacity,
835 }
836 }
837
838 /// Removes and returns one reusable buffer entry.
839 ///
840 /// Local hits are served directly from the cache. On a local miss, small
841 /// caches take only the buffer being returned to the caller. Larger caches
842 /// batch-take from the global freelist, return the first claimed buffer,
843 /// and retain the rest locally for future allocations.
844 ///
845 /// The returned lease is materialized from the banked size-class reference
846 /// associated with the returned entry.
847 #[inline(always)]
848 fn pop(&mut self, class: &SizeClassHandle) -> Option<(TlsSizeClassCacheEntry, SizeClassLease)> {
849 if let Some(entry) = self.pop_local() {
850 // SAFETY: the popped entry consumed one banked reference owned by
851 // this cache. Transfer that reference to the returned lease.
852 let lease = unsafe { SizeClassLease::from_banked(class) };
853 return Some((entry, lease));
854 }
855
856 // Take from the class-global freelist on a local miss.
857 self.pop_global(class)
858 }
859
860 /// Removes and returns one entry from this thread's local stack.
861 ///
862 /// This touches only thread-local cache state. A returned entry consumes
863 /// one banked size-class reference from this cache, the caller must
864 /// materialize or release that reference.
865 #[inline(always)]
866 fn pop_local(&mut self) -> Option<TlsSizeClassCacheEntry> {
867 if self.len == 0 {
868 return None;
869 }
870
871 self.len -= 1;
872 // SAFETY: entries in `0..self.len` are initialized. Decrementing `len`
873 // above makes this slot uninitialized again.
874 Some(unsafe { self.entries.get_unchecked(self.len).assume_init_read() })
875 }
876
877 /// Takes from the class-global freelist after the local stack misses.
878 ///
879 /// Every claimed global entry gets one retained class reference. The first
880 /// claimed entry is returned with that reference materialized as a
881 /// [`SizeClassLease`], additional claimed entries are parked in this cache
882 /// and counted by `len`.
883 ///
884 /// This is separate from [`Self::pop`] so the steady-state allocation hot
885 /// path can inline only the local cache hit. We annotate with `inline(never)`
886 /// to keep the refill and batching code out of `BufferPoolInner::try_alloc`,
887 /// reducing hot-path code size and register pressure.
888 #[inline(never)]
889 fn pop_global(
890 &mut self,
891 class: &SizeClassHandle,
892 ) -> Option<(TlsSizeClassCacheEntry, SizeClassLease)> {
893 // Tiny caches do not batch enough to justify the wider global claim.
894 // Keep their miss path equivalent to a single take.
895 if self.capacity < MIN_TLS_BATCH_CAPACITY {
896 return class.global.take().map(|(slot, buffer)| {
897 let lease = SizeClassLease::retain(class);
898 (TlsSizeClassCacheEntry { buffer, slot }, lease)
899 });
900 }
901
902 // Refill larger caches to half capacity. That leaves room for future
903 // same-thread returns while still amortizing the global atomic scan
904 // over several future local pops.
905 let mut entry = None;
906 let take = self.capacity / 2;
907 class.global.take_batch(take, |slot, buffer| {
908 // Each claimed global entry becomes either the returned lease or a
909 // local cache entry, so each needs one retained class reference.
910 let cache_entry = TlsSizeClassCacheEntry { buffer, slot };
911 let lease = SizeClassLease::retain(class);
912 if entry.is_none() {
913 // Hand the first claimed buffer to the allocation that missed
914 // locally. Additional claimed buffers refill the local cache.
915 entry = Some((cache_entry, lease));
916 } else {
917 // The take count is derived from the target occupancy, so
918 // refill cannot overflow the local cache. Push directly to
919 // avoid the spill checks used by return-to-cache.
920 self.push_local(cache_entry, lease);
921 }
922 });
923
924 entry
925 }
926
927 /// Pushes an entry into the local cache, spilling to global if full.
928 ///
929 /// Small local caches prioritize same-thread locality and route overflow
930 /// directly to the global freelist. Once the local cache is large enough to
931 /// batch effectively, half the entries are drained to amortize global queue
932 /// traffic across future returns.
933 #[inline(always)]
934 fn push(&mut self, lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
935 let entry = TlsSizeClassCacheEntry { buffer, slot };
936
937 if self.len < self.capacity {
938 // Keep the returned entry local while there is room.
939 self.push_local(entry, lease);
940 return;
941 }
942
943 // Handle overflow when the local stack is full.
944 self.push_full(entry, lease);
945 }
946
947 /// Pushes one entry onto this thread's local stack.
948 ///
949 /// The caller must ensure the stack has room. `lease` becomes the banked
950 /// size-class reference represented by `entry`.
951 #[inline(always)]
952 fn push_local(&mut self, entry: TlsSizeClassCacheEntry, lease: SizeClassLease) {
953 lease.into_banked();
954
955 // SAFETY: the caller ensured `self.len < self.capacity`, so this slot
956 // is in bounds and currently uninitialized.
957 unsafe {
958 self.entries.get_unchecked_mut(self.len).write(entry);
959 }
960 self.len += 1;
961 }
962
963 /// Handles a push after the local stack fills.
964 ///
965 /// Very small caches return the incoming entry directly to the global
966 /// freelist. Larger caches spill older local entries in a batch, then keep
967 /// the incoming entry local so the dropping thread retains the freshest
968 /// buffer.
969 ///
970 /// This is separate from [`Self::push`] so the steady-state return hot path
971 /// can inline only the local cache push. We annotate with `inline(never)`
972 /// to keep the spill and batching code out of pooled buffer drop when the
973 /// local cache has room.
974 #[inline(never)]
975 fn push_full(&mut self, entry: TlsSizeClassCacheEntry, lease: SizeClassLease) {
976 // Very small caches cannot spill enough entries to amortize a batch
977 // insert, so overflow goes straight to the global freelist.
978 if self.capacity < MIN_TLS_BATCH_CAPACITY {
979 lease.return_global(entry.slot, entry.buffer);
980 return;
981 }
982
983 // Spill half the cache to global to make room.
984 let spill = self.len.min(self.capacity / 2).max(1);
985 let end = self.len;
986 let start = end - spill;
987 // Stop tracking slots before moving them out.
988 self.len = start;
989
990 lease
991 .class()
992 .global
993 .put_batch((start..end).rev().map(|index| {
994 // SAFETY: `start..end` was initialized before `len` was lowered
995 // to `start`. Reading each slot moves it out and leaves the
996 // slot uninitialized.
997 let entry = unsafe { self.entries.as_mut_ptr().add(index).read().assume_init() };
998 // SAFETY: this drained entry carried one banked reference. The
999 // incoming class lease keeps the size class live while
1000 // `put_batch` parks the spilled entries.
1001 unsafe { self.class.release() };
1002 (entry.slot, entry.buffer)
1003 }));
1004
1005 // Keep the incoming entry local after making room.
1006 self.push_local(entry, lease);
1007 }
1008}
1009
1010impl Drop for TlsSizeClassCache {
1011 fn drop(&mut self) {
1012 if self.len == 0 {
1013 return;
1014 }
1015
1016 let entries = self.entries.as_mut_ptr();
1017 // SAFETY: each initialized entry carries one banked class reference out
1018 // of this cache. Because `self.len > 0`, those references keep `class`
1019 // live while the entries are parked.
1020 unsafe { self.class.as_ref() }
1021 .global
1022 .put_batch((0..self.len).rev().map(move |index| {
1023 // SAFETY: `0..self.len` is initialized. Reading each slot moves
1024 // it out and leaves the slot uninitialized.
1025 let entry = unsafe { entries.add(index).read().assume_init() };
1026 (entry.slot, entry.buffer)
1027 }));
1028
1029 for _ in 0..self.len {
1030 // SAFETY: each drained entry was returned to the global freelist,
1031 // so its banked reference can be released.
1032 unsafe { self.class.release() };
1033 }
1034 }
1035}
1036
1037/// Registry of one thread's per-size-class caches.
1038///
1039/// A [`BufferPool`] keeps its size classes in a vector, so allocation resolves
1040/// a request to an index within that pool. Thread-local caches need a different
1041/// key because a thread can use more than one pool. They use the process-global
1042/// [`SizeClass::class_id`] assigned by [`NEXT_SIZE_CLASS_ID`], so index `0` in
1043/// one pool cannot collide with index `0` in another pool.
1044///
1045/// The registry is a sparse vector indexed by `class_id`. Each initialized
1046/// entry is a [`TlsSizeClassCache`] for that global size class. Missing entries
1047/// mean this thread has not used that size class yet. Holes can remain for the
1048/// lifetime of the thread because class ids are monotonic and never reused.
1049/// Empty initialized caches can also remain after their pool has been dropped.
1050/// Their class token is inert while the cache is empty. If the class is still
1051/// live because a pooled buffer is outstanding, a later return of that buffer to
1052/// this same thread can bank a fresh reference and make the cache usable again.
1053///
1054/// We intentionally use `Vec<Option<...>>` because class ids are dense enough
1055/// for direct indexing to be cheaper than hashing, but a thread may initialize
1056/// only a subset of live size classes. This keeps the TLS-hit path to a bounds
1057/// check and an initialized-entry check, with no synchronization.
1058struct TlsSizeClassCaches {
1059 bins: Vec<Option<TlsSizeClassCache>>,
1060}
1061
1062impl TlsSizeClassCaches {
1063 /// Creates an empty registry.
1064 const fn new() -> Self {
1065 Self { bins: Vec::new() }
1066 }
1067
1068 /// Returns the cache for the given class, creating it lazily on first use.
1069 ///
1070 /// The caller must provide a live token from a [`SizeClassHandle`] or
1071 /// [`SizeClassLease`]. A missing cache copies that token for identity only
1072 /// and starts with no banked references. The first local push or global
1073 /// refill is what banks references for entries in that cache.
1074 #[inline(always)]
1075 fn get_or_init(
1076 &mut self,
1077 class: SizeClassToken,
1078 class_id: usize,
1079 capacity: usize,
1080 ) -> &mut TlsSizeClassCache {
1081 if class_id < self.bins.len() && self.bins[class_id].is_some() {
1082 return self.bins[class_id]
1083 .as_mut()
1084 .expect("class cache was checked as initialized");
1085 }
1086
1087 self.init(class, class_id, capacity)
1088 }
1089
1090 /// Initializes and returns the cache for `class_id`.
1091 ///
1092 /// This is separate from [`Self::get_or_init`] so the steady-state TLS hit
1093 /// can inline only the existing-cache lookup. We annotate with
1094 /// `inline(never)` to keep the resize and allocation path out of pooled
1095 /// allocation and drop.
1096 #[inline(never)]
1097 fn init(
1098 &mut self,
1099 class: SizeClassToken,
1100 class_id: usize,
1101 capacity: usize,
1102 ) -> &mut TlsSizeClassCache {
1103 if class_id >= self.bins.len() {
1104 self.bins.resize_with(class_id + 1, || None);
1105 }
1106 self.bins[class_id].get_or_insert_with(|| TlsSizeClassCache::new(class, capacity))
1107 }
1108
1109 /// Returns an initialized cache without creating a missing one.
1110 #[inline(always)]
1111 fn get(&mut self, class_id: usize) -> Option<&mut TlsSizeClassCache> {
1112 self.bins.get_mut(class_id).and_then(Option::as_mut)
1113 }
1114}
1115
1116impl Drop for TlsSizeClassCaches {
1117 fn drop(&mut self) {
1118 let this = self as *mut Self;
1119 BufferPoolThreadCache::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| {
1120 if fast.get() == this {
1121 fast.set(ptr::null_mut());
1122 }
1123 });
1124 }
1125}
1126
1127/// Access to the calling thread's local [`BufferPool`] caches.
1128///
1129/// This type hides the TLS layout used by pooled allocation and return. The
1130/// main TLS key owns the registry. It has a destructor, so thread exit drops
1131/// the registry and each `TlsSizeClassCache` flushes its remaining entries to
1132/// the class-global freelist.
1133///
1134/// Steady-state allocation and return first read `TLS_SIZE_CLASS_CACHES_FAST`.
1135/// If it points at this thread's registry and the requested class cache is
1136/// initialized, the caller touches only thread-local memory. Missing TLS state
1137/// routes through `cache_slow` or `push_slow`, which access the owning TLS key,
1138/// install the fast pointer, and lazily initialize the class cache.
1139///
1140/// Rust's access path for TLS values with destructors includes checks for
1141/// access during or after destruction. Those checks are correct, but they are
1142/// expensive on the hot pooled allocation/drop path. After first checked
1143/// access, we cache a raw pointer to the same registry in a destructor-free TLS
1144/// key and use that pointer for steady-state access.
1145///
1146/// If the checked key is unavailable during thread-local destruction, cache
1147/// access returns `None` and callers use the class-global freelist instead.
1148pub struct BufferPoolThreadCache;
1149
1150impl BufferPoolThreadCache {
1151 thread_local! {
1152 // Owns this thread's cache registry and drops it during thread exit.
1153 static TLS_SIZE_CLASS_CACHES: UnsafeCell<TlsSizeClassCaches> =
1154 const { UnsafeCell::new(TlsSizeClassCaches::new()) };
1155
1156 // Performance-only pointer to the same registry. This key has no
1157 // destructor, so the hot allocation/drop path avoids Rust's
1158 // destructor-aware access path for `TLS_SIZE_CLASS_CACHES`.
1159 static TLS_SIZE_CLASS_CACHES_FAST: Cell<*mut TlsSizeClassCaches> =
1160 const { Cell::new(ptr::null_mut()) };
1161 }
1162
1163 /// Flushes all local caches for the current thread into the global freelists.
1164 pub fn flush() {
1165 // If the owning TLS registry is unavailable during thread exit, this
1166 // is a no-op. The registry's own drop path will flush any remaining
1167 // entries.
1168 let _ = Self::TLS_SIZE_CLASS_CACHES.try_with(|caches| {
1169 // SAFETY: this TLS value is only ever accessed by the current thread.
1170 let caches = unsafe { &mut *caches.get() };
1171 for cache in caches.bins.iter_mut() {
1172 let _ = cache.take();
1173 }
1174 });
1175 }
1176
1177 /// Returns a buffer to the current thread's local cache for the given
1178 /// size class, spilling to the global freelist if the cache is full.
1179 ///
1180 /// The hot path uses only an already-initialized cache from the fast TLS
1181 /// pointer. If the fast pointer is missing, or this thread has not
1182 /// initialized the size class yet, [`Self::push_slow`] performs the checked
1183 /// TLS access and creates the local cache. The returned lease is live proof
1184 /// that initialization is safe. During thread-local teardown, checked TLS
1185 /// access can fail, in that case the buffer falls back to the global
1186 /// freelist.
1187 #[inline(always)]
1188 pub(super) fn push(lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
1189 let class = lease.class();
1190 if class.thread_cache_capacity == 0 {
1191 lease.return_global(slot, buffer);
1192 return;
1193 }
1194
1195 let caches = Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.get());
1196 if !caches.is_null() {
1197 // SAFETY: the fast pointer is set only from this thread's
1198 // `TLS_SIZE_CLASS_CACHES` value and cleared before that value
1199 // drops.
1200 if let Some(cache) = unsafe { (&mut *caches).get(class.class_id) } {
1201 cache.push(lease, slot, buffer);
1202 return;
1203 }
1204 }
1205
1206 Self::push_slow(lease, slot, buffer);
1207 }
1208
1209 /// Returns a buffer to the current thread's local cache after the fast
1210 /// lookup misses.
1211 ///
1212 /// This is called when the fast TLS pointer is not initialized, or when
1213 /// that pointer exists but this size class has no local cache yet. It
1214 /// installs the fast TLS pointer after successfully accessing the owning
1215 /// TLS key, then initializes the size-class cache if needed.
1216 ///
1217 /// This is separate from [`Self::push`] so the steady-state return hot path
1218 /// only contains the initialized-cache lookup and local push.
1219 #[inline(never)]
1220 fn push_slow(lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
1221 // Returning a pooled buffer can happen from arbitrary Drop code,
1222 // including during thread-local destruction. If the local cache is
1223 // unavailable, fall back to the global freelist instead of panicking.
1224 match Self::TLS_SIZE_CLASS_CACHES
1225 .try_with(|caches| {
1226 let class = lease.class();
1227 let caches = caches.get();
1228
1229 // Publish the checked owner TLS address to the fast key.
1230 Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.set(caches));
1231
1232 // SAFETY: this TLS value is only ever accessed by the current thread.
1233 ptr::NonNull::from(unsafe {
1234 (&mut *caches).get_or_init(
1235 lease.token,
1236 class.class_id,
1237 class.thread_cache_capacity,
1238 )
1239 })
1240 })
1241 .ok()
1242 {
1243 Some(mut cache) => {
1244 // SAFETY: `cache` points to this thread's initialized TLS cache.
1245 unsafe { cache.as_mut().push(lease, slot, buffer) };
1246 }
1247 None => lease.return_global(slot, buffer),
1248 }
1249 }
1250
1251 /// Takes a buffer from the current thread's local cache for the given
1252 /// size class, refilling from the global freelist if the cache is empty.
1253 ///
1254 /// The hot path uses only an already-initialized cache from the fast TLS
1255 /// pointer. On a local miss, the global freelist is queried once. The first
1256 /// claimed buffer is returned to the caller, and any additional claimed
1257 /// buffers are appended directly to the local cache.
1258 #[inline(always)]
1259 fn pop(class: &SizeClassHandle) -> Option<(PooledBuffer, SizeClassLease, u32)> {
1260 if class.thread_cache_capacity == 0 {
1261 return class
1262 .global
1263 .take()
1264 .map(|(slot, buffer)| (buffer, SizeClassLease::retain(class), slot));
1265 }
1266
1267 let caches = Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.get());
1268 if !caches.is_null() {
1269 // SAFETY: the fast pointer is set only from this thread's
1270 // `TLS_SIZE_CLASS_CACHES` value and cleared before that value
1271 // drops.
1272 if let Some(cache) = unsafe { (&mut *caches).get(class.class_id) } {
1273 return cache
1274 .pop(class)
1275 .map(|(entry, lease)| (entry.buffer, lease, entry.slot));
1276 }
1277 }
1278
1279 // Resolve the cache and fall back to the global freelist if
1280 // unavailable.
1281 let Some(mut cache) = Self::cache_slow(class) else {
1282 return class
1283 .global
1284 .take()
1285 .map(|(slot, buffer)| (buffer, SizeClassLease::retain(class), slot));
1286 };
1287
1288 // SAFETY: `cache` points to this thread's initialized TLS cache.
1289 unsafe { cache.as_mut() }
1290 .pop(class)
1291 .map(|(entry, lease)| (entry.buffer, lease, entry.slot))
1292 }
1293
1294 /// Resolves the local cache after the fast TLS or class-cache lookup
1295 /// misses.
1296 ///
1297 /// This is called when the fast TLS pointer is not initialized, or when
1298 /// that pointer exists but this size class has no local cache yet. It
1299 /// installs the fast TLS pointer after successfully accessing the owning
1300 /// TLS key, then initializes the size-class cache if needed.
1301 #[inline(never)]
1302 fn cache_slow(class: &SizeClassHandle) -> Option<ptr::NonNull<TlsSizeClassCache>> {
1303 // Allocation can happen from caller-owned TLS destructors during thread
1304 // teardown. Return `None` instead of panicking if the owning TLS key is
1305 // unavailable.
1306 Self::TLS_SIZE_CLASS_CACHES
1307 .try_with(|caches| {
1308 let caches = caches.get();
1309
1310 // Publish the checked owner TLS address to the fast key.
1311 Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.set(caches));
1312
1313 // SAFETY: this TLS value is only ever accessed by the current thread.
1314 ptr::NonNull::from(unsafe {
1315 (&mut *caches).get_or_init(
1316 class.token,
1317 class.class_id,
1318 class.thread_cache_capacity,
1319 )
1320 })
1321 })
1322 .ok()
1323 }
1324}
1325
1326/// Internal allocation result for pooled allocations.
1327struct Allocation {
1328 buffer: PooledBuffer,
1329 is_new: bool,
1330 lease: SizeClassLease,
1331 slot: u32,
1332}
1333
1334/// Internal state of the buffer pool.
1335pub(crate) struct BufferPoolInner {
1336 config: BufferPoolConfig,
1337 classes: Vec<SizeClassHandle>,
1338 metrics: PoolMetrics,
1339}
1340
1341impl Drop for BufferPoolInner {
1342 fn drop(&mut self) {
1343 // The public pool is going away. Drain globally parked buffers while
1344 // the pool-owned class handles are still live. Pooled views and live
1345 // TLS cache entries own their own size-class references, if they return
1346 // later, they will park their buffer and release the reference that kept
1347 // the class alive.
1348 for class in &self.classes {
1349 class.global.drain();
1350 }
1351 }
1352}
1353
1354impl BufferPoolInner {
1355 /// Try to allocate a buffer from the given size class.
1356 ///
1357 /// Uses a three-tier strategy:
1358 /// 1. **Thread-local cache** (fast path): no atomics, no contention.
1359 /// 2. **Global freelist**: atomic pop, then batch-refill the local cache
1360 /// when the local bin is large enough to amortize shared-queue traffic.
1361 /// 3. **New allocation**: reserve a slot in the global freelist, then
1362 /// allocate from the heap.
1363 ///
1364 /// If `zero_on_new` is true, newly-created buffers are allocated with
1365 /// `alloc_zeroed`. Reused buffers are never re-zeroed here.
1366 #[inline(always)]
1367 fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
1368 let class = &self.classes[class_index];
1369
1370 // Reuse path: try the thread-local cache first, then the global
1371 // freelist with batch refill when the local cache is large enough.
1372 if let Some((buffer, lease, slot)) = BufferPoolThreadCache::pop(class) {
1373 return Some(Allocation {
1374 buffer,
1375 is_new: false,
1376 lease,
1377 slot,
1378 });
1379 }
1380
1381 // Slow path: create a new tracked buffer and update metrics.
1382 self.try_alloc_new(class, zero_on_new)
1383 }
1384
1385 /// Creates a new tracked buffer after the reuse path fails.
1386 ///
1387 /// This is separate from [`Self::try_alloc`] so the steady-state allocation
1388 /// path can inline the TLS hit without also carrying slot reservation,
1389 /// metrics, and heap-allocation code.
1390 #[inline(never)]
1391 fn try_alloc_new(&self, class: &SizeClassHandle, zeroed: bool) -> Option<Allocation> {
1392 let label = SizeClassLabel {
1393 size_class: class.size as u64,
1394 };
1395 let Some((slot, buffer, lease)) = class.try_create(zeroed) else {
1396 self.metrics.exhausted_total.get_or_create(&label).inc();
1397 return None;
1398 };
1399
1400 self.metrics.created.get_or_create(&label).inc();
1401 Some(Allocation {
1402 buffer,
1403 is_new: true,
1404 lease,
1405 slot,
1406 })
1407 }
1408}
1409
1410/// A pool of reusable, aligned buffers.
1411///
1412/// Buffers are organized into power-of-two size classes. When a buffer is
1413/// requested, the smallest size class that fits is used. Pooled buffers are
1414/// automatically returned when their final owning view is dropped.
1415///
1416/// # Alignment
1417///
1418/// Buffer alignment is guaranteed only at the base pointer (when `cursor == 0`).
1419/// After calling [`bytes::Buf::advance`], the pointer returned by `as_mut_ptr()` may
1420/// no longer be aligned. For direct I/O operations that require alignment,
1421/// do not advance the buffer before use.
1422#[derive(Clone)]
1423pub struct BufferPool {
1424 inner: Arc<BufferPoolInner>,
1425}
1426
1427impl std::fmt::Debug for BufferPool {
1428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1429 f.debug_struct("BufferPool")
1430 .field("config", &self.inner.config)
1431 .field("num_classes", &self.inner.classes.len())
1432 .finish()
1433 }
1434}
1435
1436/// Global allocator for [`SizeClass::class_id`].
1437///
1438/// `class_id` is the key used by [`TlsSizeClassCaches`]. It must be global, not
1439/// pool-local, because the same thread-local registry serves every
1440/// [`BufferPool`] touched by the thread. Without a global id, two different
1441/// pools could share a class index and accidentally share one local cache.
1442///
1443/// Ids are monotonic and never reused. Reuse would make stale per-thread cache
1444/// state ambiguous after a pool is dropped and a later pool creates a new size
1445/// class with the same id. Avoiding reuse means the hot path can index directly
1446/// without generation checks, at the cost of possible holes in each thread's
1447/// sparse registry.
1448///
1449/// Relaxed ordering is sufficient: the atomic operation is only used to assign
1450/// unique ids, not to publish any associated size-class state.
1451static NEXT_SIZE_CLASS_ID: AtomicUsize = AtomicUsize::new(0);
1452
1453impl BufferPool {
1454 /// Creates a new buffer pool with the given configuration.
1455 ///
1456 /// # Panics
1457 ///
1458 /// Panics if the configuration is invalid.
1459 pub(crate) fn new(config: BufferPoolConfig, registry: &mut impl Register) -> Self {
1460 config.validate();
1461 let metrics = PoolMetrics::new(registry);
1462 let num_classes =
1463 BufferPoolConfig::num_classes(config.min_size.get(), config.max_size.get());
1464 let mut classes = Vec::with_capacity(num_classes);
1465 let thread_cache_capacity = config.resolve_thread_cache_capacity();
1466 for i in 0..num_classes {
1467 let size = BufferPoolConfig::class_size(config.min_size.get(), i);
1468 let class_id = NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed);
1469 let class = SizeClassHandle::new(
1470 class_id,
1471 size,
1472 config.alignment.get(),
1473 config.max_per_class,
1474 config.parallelism,
1475 thread_cache_capacity,
1476 config.prefill,
1477 );
1478 classes.push(class);
1479 }
1480
1481 // Initialize created metrics after constructor prefill.
1482 if config.prefill {
1483 for class in &classes {
1484 let label = SizeClassLabel {
1485 size_class: class.size as u64,
1486 };
1487 metrics
1488 .created
1489 .get_or_create(&label)
1490 .set(config.max_per_class.get() as i64);
1491 }
1492 }
1493
1494 Self {
1495 inner: Arc::new(BufferPoolInner {
1496 config,
1497 classes,
1498 metrics,
1499 }),
1500 }
1501 }
1502
1503 /// Returns the size class index for a given size, or `None` if `size > max_size`.
1504 #[inline(always)]
1505 fn class_index(&self, size: usize) -> Option<usize> {
1506 let min_size = self.inner.config.min_size.get();
1507 let max_size = self.inner.config.max_size.get();
1508 if size > max_size {
1509 return None;
1510 }
1511 if size <= min_size {
1512 return Some(0);
1513 }
1514
1515 // Pool construction guarantees `min_size` and `max_size` are powers of
1516 // two. Since `min_size < size <= max_size`, `next_power_of_two()`
1517 // resolves to a valid class and its exponent must be greater than
1518 // `min_size`'s exponent. Use wrapping arithmetic to avoid a release
1519 // overflow-check branch in this hot helper.
1520 Some(
1521 size.next_power_of_two()
1522 .trailing_zeros()
1523 .wrapping_sub(min_size.trailing_zeros()) as usize,
1524 )
1525 }
1526
1527 /// Returns the size class index for `capacity`, recording oversized metrics on failure.
1528 #[inline]
1529 fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
1530 let class_index = self.class_index(capacity);
1531 if class_index.is_none() {
1532 self.inner.metrics.oversized_total.inc();
1533 }
1534 class_index
1535 }
1536
1537 /// Attempts to allocate a buffer without falling back on pool miss.
1538 ///
1539 /// Unlike [`Self::alloc`], this method does not fall back to untracked
1540 /// allocation on exhaustion or oversized requests. Requests smaller than
1541 /// [`BufferPoolConfig::pool_min_size`] intentionally bypass pooling and
1542 /// return an untracked aligned allocation instead.
1543 ///
1544 /// The returned buffer has `len() == 0` and `capacity() >= capacity`.
1545 ///
1546 /// # Initialization
1547 ///
1548 /// The returned buffer contains **uninitialized memory**. Do not read from
1549 /// it until data has been written.
1550 ///
1551 /// # Errors
1552 ///
1553 /// - [`PoolError::Oversized`]: `capacity` exceeds `max_size`
1554 /// - [`PoolError::Exhausted`]: pool exhausted for the required size class
1555 #[inline(always)]
1556 pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
1557 if capacity < self.inner.config.pool_min_size {
1558 let size = capacity.max(1);
1559 return Ok(IoBufMut::with_alignment(size, self.inner.config.alignment));
1560 }
1561
1562 let class_index = self
1563 .class_index_or_record_oversized(capacity)
1564 .ok_or(PoolError::Oversized)?;
1565
1566 let buffer = self
1567 .inner
1568 .try_alloc(class_index, false)
1569 .map(|allocation| {
1570 PooledBufMut::new(allocation.buffer, allocation.lease, allocation.slot)
1571 })
1572 .ok_or(PoolError::Exhausted)?;
1573 Ok(IoBufMut::from_pooled(buffer))
1574 }
1575
1576 /// Allocates a buffer with capacity for at least `capacity` bytes.
1577 ///
1578 /// The returned buffer has `len() == 0` and `capacity() >= capacity`,
1579 /// matching the semantics of [`IoBufMut::with_capacity`] and
1580 /// [`bytes::BytesMut::with_capacity`]. Use [`bytes::BufMut::put_slice`] or
1581 /// other [`bytes::BufMut`] methods to write data to the buffer.
1582 ///
1583 /// If the pool can provide a buffer (capacity within limits and pool not
1584 /// exhausted), this returns a pooled buffer that will be returned to the
1585 /// pool when dropped. Requests smaller than [`BufferPoolConfig::pool_min_size`]
1586 /// bypass pooling and return an untracked aligned allocation. Oversized or
1587 /// exhausted requests also fall back to an untracked aligned heap allocation
1588 /// that is deallocated when dropped.
1589 ///
1590 /// Use [`Self::try_alloc`] if eligible requests must fail instead of falling
1591 /// back to direct allocation.
1592 ///
1593 /// # Initialization
1594 ///
1595 /// The returned buffer contains **uninitialized memory**. Do not read from
1596 /// it until data has been written.
1597 pub fn alloc(&self, capacity: usize) -> IoBufMut {
1598 self.try_alloc(capacity).unwrap_or_else(|_| {
1599 let size = capacity.max(self.inner.config.min_size.get());
1600 IoBufMut::with_alignment(size, self.inner.config.alignment)
1601 })
1602 }
1603
1604 /// Allocates a buffer and sets its readable length to `len` without
1605 /// initializing bytes.
1606 ///
1607 /// Equivalent to [`Self::alloc`] followed by [`IoBufMut::set_len`].
1608 ///
1609 /// # Safety
1610 ///
1611 /// Caller must ensure all bytes are initialized before any read operation.
1612 pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
1613 let mut buf = self.alloc(len);
1614 // SAFETY: guaranteed by caller.
1615 unsafe { buf.set_len(len) };
1616 buf
1617 }
1618
1619 /// Attempts to allocate a zero-initialized buffer without falling back on
1620 /// pool miss.
1621 ///
1622 /// Unlike [`Self::alloc_zeroed`], this method does not fall back to
1623 /// untracked allocation on exhaustion or oversized requests. Requests
1624 /// smaller than [`BufferPoolConfig::pool_min_size`] intentionally bypass
1625 /// pooling and return an untracked aligned allocation instead.
1626 ///
1627 /// The returned buffer has `len() == len` and `capacity() >= len`.
1628 ///
1629 /// # Initialization
1630 ///
1631 /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
1632 /// may be uninitialized.
1633 ///
1634 /// # Errors
1635 ///
1636 /// - [`PoolError::Oversized`]: `len` exceeds `max_size`
1637 /// - [`PoolError::Exhausted`]: pool exhausted for the required size class
1638 pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
1639 if len < self.inner.config.pool_min_size {
1640 let size = len.max(1);
1641 let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
1642 buf.truncate(len);
1643 return Ok(buf);
1644 }
1645
1646 let class_index = self
1647 .class_index_or_record_oversized(len)
1648 .ok_or(PoolError::Oversized)?;
1649 let allocation = self
1650 .inner
1651 .try_alloc(class_index, true)
1652 .ok_or(PoolError::Exhausted)?;
1653
1654 let mut buf = IoBufMut::from_pooled(PooledBufMut::new(
1655 allocation.buffer,
1656 allocation.lease,
1657 allocation.slot,
1658 ));
1659 if allocation.is_new {
1660 // SAFETY: buffer was allocated with alloc_zeroed, so bytes in 0..len are initialized.
1661 unsafe { buf.set_len(len) };
1662 } else {
1663 // Reused buffers may contain old bytes, re-zero requested readable range.
1664 // SAFETY: `as_mut_ptr()` is valid for writes up to `capacity() >= len` bytes.
1665 unsafe {
1666 std::ptr::write_bytes(buf.as_mut_ptr(), 0, len);
1667 buf.set_len(len);
1668 }
1669 }
1670 Ok(buf)
1671 }
1672
1673 /// Allocates a zero-initialized buffer with readable length `len`.
1674 ///
1675 /// The returned buffer has `len() == len` and `capacity() >= len`.
1676 ///
1677 /// If the pool can provide a buffer (len within limits and pool not
1678 /// exhausted), this returns a pooled buffer that will be returned to the
1679 /// pool when dropped. Requests smaller than [`BufferPoolConfig::pool_min_size`]
1680 /// bypass pooling and return an untracked aligned allocation. Oversized or
1681 /// exhausted requests also fall back to an untracked aligned heap allocation
1682 /// that is deallocated when dropped.
1683 ///
1684 /// Use this for read APIs that require an initialized `&mut [u8]`. This
1685 /// avoids `unsafe set_len` at callsites.
1686 ///
1687 /// Use [`Self::try_alloc_zeroed`] if eligible requests must fail instead of
1688 /// falling back to direct allocation.
1689 ///
1690 /// # Initialization
1691 ///
1692 /// Bytes in `0..len` are initialized to zero. Bytes in `len..capacity`
1693 /// may be uninitialized.
1694 pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
1695 self.try_alloc_zeroed(len).unwrap_or_else(|_| {
1696 // Pool exhausted or oversized: allocate untracked zeroed memory.
1697 let size = len.max(self.inner.config.min_size.get());
1698 let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
1699 buf.truncate(len);
1700 buf
1701 })
1702 }
1703
1704 /// Returns the pool configuration.
1705 pub fn config(&self) -> &BufferPoolConfig {
1706 &self.inner.config
1707 }
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712 use super::*;
1713 use crate::{
1714 iobuf::{cache_line_size, freelist, IoBuf},
1715 telemetry::metrics::Registry,
1716 };
1717 use bytes::{Buf, BufMut};
1718 use commonware_utils::NZU32;
1719 use std::{
1720 sync::{mpsc, Arc},
1721 thread,
1722 };
1723
1724 fn test_size_class(size: usize, alignment: usize) -> SizeClassHandle {
1725 SizeClassHandle::new(
1726 NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
1727 size,
1728 alignment,
1729 NZU32!(8),
1730 NZUsize!(4),
1731 4,
1732 false,
1733 )
1734 }
1735
1736 fn test_pool(config: BufferPoolConfig) -> BufferPool {
1737 let mut registry = Registry::default();
1738 BufferPool::new(config, &mut registry)
1739 }
1740
1741 /// Creates a test config with page alignment.
1742 fn test_config(min_size: usize, max_size: usize, max_per_class: u32) -> BufferPoolConfig {
1743 BufferPoolConfig {
1744 pool_min_size: 0,
1745 min_size: NZUsize!(min_size),
1746 max_size: NZUsize!(max_size),
1747 max_per_class: NZU32!(max_per_class),
1748 parallelism: NZUsize!(1),
1749 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
1750 prefill: false,
1751 alignment: NZUsize!(page_size()),
1752 }
1753 }
1754
1755 /// Returns the current strong count without changing it after the helper
1756 /// returns.
1757 fn size_class_strong_count(class: &SizeClassHandle) -> usize {
1758 // SAFETY: the borrowed handle owns one strong reference for `class.token`
1759 // for the duration of this call.
1760 unsafe { class.token.retain() };
1761 // SAFETY: the increment above created the strong reference consumed by
1762 // this temporary Arc.
1763 let arc = unsafe { Arc::from_raw(class.token.ptr.as_ptr()) };
1764 Arc::strong_count(&arc) - 1
1765 }
1766
1767 /// Helper to get the number of caller-owned tracked buffers for a size class.
1768 ///
1769 /// With TLS enabled, tracked buffers can be free in either the shared
1770 /// freelist or the current thread's local cache.
1771 fn get_allocated(pool: &BufferPool, size: usize) -> usize {
1772 let class_index = pool.class_index(size).unwrap();
1773 let class = &pool.inner.classes[class_index];
1774 get_global_created(class) - get_global_len(class) - get_local_len(class)
1775 }
1776
1777 /// Helper to get the number of free buffers visible to the current thread.
1778 fn get_available(pool: &BufferPool, size: usize) -> i64 {
1779 let class_index = pool.class_index(size).unwrap();
1780 let class = &pool.inner.classes[class_index];
1781 (get_global_len(class) + get_local_len(class)) as i64
1782 }
1783
1784 /// Helper to get the number of free buffers parked in the global freelist.
1785 fn get_global_len(class: &SizeClass) -> usize {
1786 freelist::tests::len(&class.global)
1787 }
1788
1789 /// Helper to get the number of buffers created by the global freelist.
1790 fn get_global_created(class: &SizeClass) -> usize {
1791 freelist::tests::created(&class.global)
1792 }
1793
1794 /// Helper to get the number of free buffers parked in the current thread's
1795 /// local cache for a size class.
1796 fn get_local_len(class: &SizeClass) -> usize {
1797 BufferPoolThreadCache::TLS_SIZE_CLASS_CACHES.with(|caches| {
1798 // SAFETY: this TLS value is only ever accessed by the current thread.
1799 let caches = unsafe { &*caches.get() };
1800 caches
1801 .bins
1802 .get(class.class_id)
1803 .and_then(Option::as_ref)
1804 .map_or(0, |cache| cache.len)
1805 })
1806 }
1807
1808 #[test]
1809 fn test_page_size() {
1810 let size = page_size();
1811 assert!(size >= 4096);
1812 assert!(size.is_power_of_two());
1813 }
1814
1815 #[test]
1816 fn test_config_validation() {
1817 let page = page_size();
1818 let config = test_config(page, page * 4, 10);
1819 config.validate();
1820 }
1821
1822 #[test]
1823 #[should_panic(expected = "thread_cache_capacity (11) must be <= max_per_class (10)")]
1824 fn test_config_invalid_thread_cache_capacity() {
1825 let page = page_size();
1826 let config = test_config(page, page * 4, 10).with_thread_cache_capacity(NZUsize!(11));
1827 config.validate();
1828 }
1829
1830 #[test]
1831 #[should_panic(expected = "min_size must be a power of two")]
1832 fn test_config_invalid_min_size() {
1833 let config = BufferPoolConfig {
1834 pool_min_size: 0,
1835 min_size: NZUsize!(3000),
1836 max_size: NZUsize!(8192),
1837 max_per_class: NZU32!(10),
1838 parallelism: NZUsize!(1),
1839 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
1840 prefill: false,
1841 alignment: NZUsize!(page_size()),
1842 };
1843 config.validate();
1844 }
1845
1846 #[test]
1847 fn test_pool_class_index() {
1848 let page = page_size();
1849 let pool = test_pool(test_config(page, page * 8, 10));
1850
1851 // Classes: page, page*2, page*4, page*8
1852 assert_eq!(pool.inner.classes.len(), 4);
1853
1854 assert_eq!(pool.class_index(1), Some(0));
1855 assert_eq!(pool.class_index(page), Some(0));
1856 assert_eq!(pool.class_index(page + 1), Some(1));
1857 assert_eq!(pool.class_index(page * 2), Some(1));
1858 assert_eq!(pool.class_index(page * 4 + 1), Some(3));
1859 assert_eq!(pool.class_index(page * 8 - 1), Some(3));
1860 assert_eq!(pool.class_index(page * 8), Some(3));
1861 assert_eq!(pool.class_index(page * 8 + 1), None);
1862 }
1863
1864 #[test]
1865 fn test_pool_alloc_and_return() {
1866 let page = page_size();
1867 let pool = test_pool(test_config(page, page * 4, 2));
1868
1869 // Allocate a buffer - returns buffer with len=0, capacity >= requested
1870 let buf = pool.try_alloc(page).unwrap();
1871 assert!(buf.capacity() >= page);
1872 assert_eq!(buf.len(), 0);
1873
1874 // Drop returns to pool
1875 drop(buf);
1876
1877 // Can allocate again
1878 let buf2 = pool.try_alloc(page).unwrap();
1879 assert!(buf2.capacity() >= page);
1880 assert_eq!(buf2.len(), 0);
1881 }
1882
1883 #[test]
1884 fn test_alloc_len_sets_len() {
1885 let page = page_size();
1886 let pool = test_pool(test_config(page, page * 4, 2));
1887
1888 // SAFETY: we immediately initialize all bytes before reading.
1889 let mut buf = unsafe { pool.alloc_len(100) };
1890 assert_eq!(buf.len(), 100);
1891 buf.as_mut().fill(0xAB);
1892 let frozen = buf.freeze();
1893 assert_eq!(frozen.as_ref(), &[0xAB; 100]);
1894 }
1895
1896 #[test]
1897 fn test_alloc_zeroed_sets_len_and_zeros() {
1898 let page = page_size();
1899 let pool = test_pool(test_config(page, page * 4, 2));
1900
1901 let buf = pool.alloc_zeroed(100);
1902 assert_eq!(buf.len(), 100);
1903 assert!(buf.as_ref().iter().all(|&b| b == 0));
1904 }
1905
1906 #[test]
1907 fn test_try_alloc_zeroed_sets_len_and_zeros() {
1908 let page = page_size();
1909 let pool = test_pool(test_config(page, page * 4, 2));
1910
1911 let buf = pool.try_alloc_zeroed(page).unwrap();
1912 assert!(buf.is_pooled());
1913 assert_eq!(buf.len(), page);
1914 assert!(buf.as_ref().iter().all(|&b| b == 0));
1915 }
1916
1917 #[test]
1918 fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
1919 let page = page_size();
1920 let pool = test_pool(test_config(page, page, 1));
1921
1922 // Exhaust pooled capacity for this class.
1923 let _pooled = pool.try_alloc(page).unwrap();
1924
1925 let buf = pool.alloc_zeroed(100);
1926 assert!(!buf.is_pooled());
1927 assert_eq!(buf.len(), 100);
1928 assert!(buf.as_ref().iter().all(|&b| b == 0));
1929 }
1930
1931 #[test]
1932 fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
1933 let page = page_size();
1934 let pool = test_pool(test_config(page, page, 1));
1935
1936 let mut first = pool.alloc_zeroed(page);
1937 assert!(first.is_pooled());
1938 assert!(first.as_ref().iter().all(|&b| b == 0));
1939
1940 // Dirty the buffer before returning it to the pool.
1941 first.as_mut().fill(0xAB);
1942 drop(first);
1943
1944 let second = pool.alloc_zeroed(page);
1945 assert!(second.is_pooled());
1946 assert_eq!(second.len(), page);
1947 assert!(second.as_ref().iter().all(|&b| b == 0));
1948 }
1949
1950 #[test]
1951 fn test_requests_smaller_than_pool_min_size_bypass_pool() {
1952 let pool = test_pool(BufferPoolConfig {
1953 pool_min_size: 512,
1954 min_size: NZUsize!(512),
1955 max_size: NZUsize!(1024),
1956 max_per_class: NZU32!(2),
1957 parallelism: NZUsize!(1),
1958 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
1959 prefill: false,
1960 alignment: NZUsize!(128),
1961 });
1962
1963 let buf = pool.try_alloc(200).unwrap();
1964 assert!(!buf.is_pooled());
1965 assert_eq!(buf.capacity(), 200);
1966
1967 let zeroed = pool.try_alloc_zeroed(200).unwrap();
1968 assert!(!zeroed.is_pooled());
1969 assert_eq!(zeroed.len(), 200);
1970 assert!(zeroed.as_ref().iter().all(|&b| b == 0));
1971
1972 let pooled = pool.try_alloc(512).unwrap();
1973 assert!(pooled.is_pooled());
1974 assert_eq!(pooled.capacity(), 512);
1975 }
1976
1977 #[test]
1978 fn test_pool_size_classes() {
1979 let page = page_size();
1980 let pool = test_pool(test_config(page, page * 4, 10));
1981
1982 // Small request gets smallest class
1983 let buf1 = pool.try_alloc(page).unwrap();
1984 assert_eq!(buf1.capacity(), page);
1985
1986 // Larger request gets appropriate class
1987 let buf2 = pool.try_alloc(page + 1).unwrap();
1988 assert_eq!(buf2.capacity(), page * 2);
1989
1990 let buf3 = pool.try_alloc(page * 3).unwrap();
1991 assert_eq!(buf3.capacity(), page * 4);
1992 }
1993
1994 #[test]
1995 fn test_prefill() {
1996 let page = NZUsize!(page_size());
1997 let pool = test_pool(BufferPoolConfig {
1998 pool_min_size: 0,
1999 min_size: page,
2000 max_size: page,
2001 max_per_class: NZU32!(5),
2002 parallelism: NZUsize!(1),
2003 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
2004 prefill: true,
2005 alignment: page,
2006 });
2007
2008 // Should be able to allocate max_per_class buffers immediately
2009 let mut bufs = Vec::new();
2010 for _ in 0..5 {
2011 bufs.push(pool.try_alloc(page.get()).expect("alloc should succeed"));
2012 }
2013
2014 // Next allocation should fail
2015 assert!(pool.try_alloc(page.get()).is_err());
2016 }
2017
2018 #[test]
2019 fn test_config_for_network() {
2020 let config = BufferPoolConfig::for_network();
2021 config.validate();
2022 assert_eq!(config.pool_min_size, 0);
2023 assert_eq!(config.min_size.get(), 1024);
2024 assert_eq!(config.max_size.get(), 128 * 1024);
2025 assert_eq!(config.max_per_class.get(), 4096);
2026 assert_eq!(config.parallelism, NZUsize!(1));
2027 assert_eq!(
2028 config.thread_cache_config,
2029 BufferPoolThreadCacheConfig::Enabled(None)
2030 );
2031 assert!(!config.prefill);
2032 assert_eq!(config.alignment.get(), 1);
2033 }
2034
2035 #[test]
2036 fn test_config_for_storage() {
2037 let config = BufferPoolConfig::for_storage();
2038 config.validate();
2039 assert_eq!(config.pool_min_size, 0);
2040 assert_eq!(config.min_size.get(), page_size());
2041 assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
2042 assert_eq!(config.max_per_class.get(), 64);
2043 assert_eq!(config.parallelism, NZUsize!(1));
2044 assert_eq!(
2045 config.thread_cache_config,
2046 BufferPoolThreadCacheConfig::Enabled(None)
2047 );
2048 assert!(!config.prefill);
2049 assert_eq!(config.alignment.get(), 1);
2050 }
2051
2052 #[test]
2053 fn test_storage_config_supports_default_allocations() {
2054 // The storage preset's max_size (8 MB) should be allocatable out of the box.
2055 let pool = test_pool(BufferPoolConfig::for_storage());
2056
2057 let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
2058 assert_eq!(buf.capacity(), 8 * 1024 * 1024);
2059 }
2060
2061 #[test]
2062 fn test_config_builders() {
2063 let page = NZUsize!(page_size());
2064 let config = BufferPoolConfig::for_storage()
2065 .with_pool_min_size(1024)
2066 .with_max_per_class(NZU32!(64))
2067 .with_parallelism(NZUsize!(4))
2068 .with_thread_cache_capacity(NZUsize!(8))
2069 .with_prefill(true)
2070 .with_min_size(page)
2071 .with_max_size(NZUsize!(128 * 1024));
2072
2073 config.validate();
2074 assert_eq!(config.pool_min_size, 1024);
2075 assert_eq!(config.min_size, page);
2076 assert_eq!(config.max_size.get(), 128 * 1024);
2077 assert_eq!(config.max_per_class.get(), 64);
2078 assert_eq!(config.parallelism, NZUsize!(4));
2079 assert_eq!(
2080 config.thread_cache_config,
2081 BufferPoolThreadCacheConfig::Enabled(Some(NZUsize!(8)))
2082 );
2083 assert!(config.prefill);
2084 assert_eq!(config.alignment.get(), 1);
2085
2086 // Alignment can be tuned explicitly as long as min_size is also adjusted.
2087 let aligned = BufferPoolConfig::for_network()
2088 .with_pool_min_size(256)
2089 .with_parallelism(NZUsize!(4))
2090 .with_alignment(NZUsize!(256))
2091 .with_min_size(NZUsize!(256));
2092 aligned.validate();
2093 assert_eq!(aligned.parallelism, NZUsize!(4));
2094 assert_eq!(
2095 aligned.thread_cache_config,
2096 BufferPoolThreadCacheConfig::Enabled(None)
2097 );
2098 assert_eq!(aligned.alignment.get(), 256);
2099 assert_eq!(aligned.min_size.get(), 256);
2100 }
2101
2102 #[test]
2103 fn test_parallelism_policy_resolves_thread_cache_capacity() {
2104 let page = page_size();
2105
2106 // Half the class budget is divided across expected threads.
2107 let pool = test_pool(test_config(page, page, 64).with_parallelism(NZUsize!(8)));
2108 let class_index = pool.class_index(page).unwrap();
2109 assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 4);
2110
2111 // Large classes scale past the previous eight-slot cap.
2112 let pool = test_pool(test_config(page, page, 4096).with_parallelism(NZUsize!(8)));
2113 let class_index = pool.class_index(page).unwrap();
2114 assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 256);
2115 }
2116
2117 #[test]
2118 fn test_auto_thread_cache_disables_when_parallelism_exceeds_budget() {
2119 let page = page_size();
2120
2121 // With only two buffers and eight expected threads, the auto policy's
2122 // per-thread share is zero: 2 / (2 * min(8, 2)) == 0. In that case the
2123 // pool should disable TLS instead of forcing every thread to retain at
2124 // least one buffer.
2125 let pool = test_pool(test_config(page, page, 2).with_parallelism(NZUsize!(8)));
2126 let class_index = pool.class_index(page).unwrap();
2127 let class = &pool.inner.classes[class_index];
2128 assert_eq!(class.thread_cache_capacity, 0);
2129
2130 // Exhaust the size class so the only way the main thread can allocate
2131 // again is if the worker's returned buffers are globally visible.
2132 let first = pool.try_alloc(page).expect("first tracked allocation");
2133 let second = pool.try_alloc(page).expect("second tracked allocation");
2134
2135 let pool_for_thread = pool.clone();
2136 let (returned_tx, returned_rx) = mpsc::channel();
2137 let (release_tx, release_rx) = mpsc::channel();
2138 let handle = thread::spawn(move || {
2139 // Return both buffers from another thread. The thread stays alive
2140 // after the drops, so any TLS entries it retained would remain
2141 // invisible to the main thread until `release_rx` fires.
2142 drop(first);
2143 drop(second);
2144 returned_tx.send(()).expect("signal returned buffers");
2145 release_rx.recv().expect("release worker");
2146 drop(pool_for_thread);
2147 });
2148
2149 returned_rx.recv().expect("wait for returned buffers");
2150
2151 // Both allocations must succeed while the worker thread is still
2152 // alive. Before auto capacity could resolve to zero, one returned
2153 // buffer could remain stranded in the worker's TLS cache and this
2154 // second allocation would report exhaustion.
2155 let _first = pool.try_alloc(page).expect("first global reuse");
2156 let _second = pool.try_alloc(page).expect("second global reuse");
2157
2158 release_tx.send(()).expect("release worker");
2159 handle.join().expect("worker should not panic");
2160 }
2161
2162 #[test]
2163 fn test_parallelism_policy_resolves_freelist_stripes() {
2164 let page = page_size();
2165 let pool = test_pool(test_config(page, page, 64).with_parallelism(NZUsize!(16)));
2166
2167 let class_index = pool.class_index(page).unwrap();
2168 assert_eq!(
2169 freelist::tests::num_words(&pool.inner.classes[class_index].global),
2170 16
2171 );
2172
2173 // When expected parallelism rounds above capacity, the freelist caps
2174 // stripes so every word can contain at least one slot.
2175 let pool = test_pool(test_config(page, page, 12).with_parallelism(NZUsize!(9)));
2176
2177 let class_index = pool.class_index(page).unwrap();
2178 assert_eq!(
2179 freelist::tests::num_words(&pool.inner.classes[class_index].global),
2180 8
2181 );
2182
2183 // Disabling thread-local caches should not change global striping.
2184 let pool = test_pool(
2185 test_config(page, page, 64)
2186 .with_parallelism(NZUsize!(16))
2187 .with_thread_cache_disabled(),
2188 );
2189
2190 let class_index = pool.class_index(page).unwrap();
2191 assert_eq!(
2192 freelist::tests::num_words(&pool.inner.classes[class_index].global),
2193 16
2194 );
2195 }
2196
2197 #[test]
2198 fn test_fixed_thread_cache_capacity_overrides_auto_capacity() {
2199 let page = page_size();
2200 let pool = test_pool(
2201 test_config(page, page, 64)
2202 .with_parallelism(NZUsize!(8))
2203 .with_thread_cache_capacity(NZUsize!(7)),
2204 );
2205 let class_index = pool.class_index(page).unwrap();
2206
2207 // Fixed capacity should bypass the derived parallelism heuristic.
2208 assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 7);
2209 assert_eq!(
2210 freelist::tests::num_words(&pool.inner.classes[class_index].global),
2211 8
2212 );
2213 }
2214
2215 #[test]
2216 fn test_disabled_thread_cache_does_not_retain_buffers_locally() {
2217 let page = page_size();
2218 let pool = test_pool(test_config(page, page, 2).with_thread_cache_disabled());
2219 let class_index = pool.class_index(page).unwrap();
2220 let class = &pool.inner.classes[class_index];
2221
2222 let tracked = pool.try_alloc(page).expect("tracked allocation");
2223 drop(tracked);
2224
2225 // Disabled thread caching still routes returns through the global
2226 // freelist, but should never retain buffers in the current thread.
2227 assert_eq!(class.thread_cache_capacity, 0);
2228 assert_eq!(get_local_len(class), 0);
2229 assert_eq!(get_global_len(class), 1);
2230 }
2231
2232 #[test]
2233 fn test_thread_cache_flush_moves_local_entries_to_global() {
2234 let page = page_size();
2235 let pool =
2236 test_pool(test_config(page, page * 2, 8).with_thread_cache_capacity(NZUsize!(4)));
2237
2238 // Use two distinct size classes so the test exercises the whole TLS
2239 // registry, not just a single per-class cache entry.
2240 let small_index = pool.class_index(page).unwrap();
2241 let large_index = pool.class_index(page + 1).unwrap();
2242 let small_class = &pool.inner.classes[small_index];
2243 let large_class = &pool.inner.classes[large_index];
2244
2245 // Return one buffer from each class to the current thread. With local
2246 // caching enabled, both drops should stay in the thread-local bins.
2247 let small = pool.try_alloc(page).expect("tracked allocation");
2248 let large = pool.try_alloc(page + 1).expect("tracked allocation");
2249 drop(small);
2250 drop(large);
2251
2252 // Before flushing, both buffers are only visible via the current
2253 // thread's local caches, nothing has been pushed to the global queues.
2254 assert_eq!(get_local_len(small_class), 1);
2255 assert_eq!(get_local_len(large_class), 1);
2256 assert_eq!(get_global_len(small_class), 0);
2257 assert_eq!(get_global_len(large_class), 0);
2258
2259 // Flushing should walk the entire TLS registry, drop every local cache,
2260 // and let each cache's drop implementation return its buffers to the
2261 // shared global freelists.
2262 BufferPoolThreadCache::flush();
2263
2264 // After flush, the current thread retains nothing locally and both
2265 // buffers are once again visible through their class-global queues.
2266 assert_eq!(get_local_len(small_class), 0);
2267 assert_eq!(get_local_len(large_class), 0);
2268 assert_eq!(get_global_len(small_class), 1);
2269 assert_eq!(get_global_len(large_class), 1);
2270 }
2271
2272 #[test]
2273 fn test_config_with_budget_bytes() {
2274 // Classes: 4, 8, 16 (sum = 28). Budget 280 => max_per_class = 10.
2275 let config = BufferPoolConfig {
2276 pool_min_size: 0,
2277 min_size: NZUsize!(4),
2278 max_size: NZUsize!(16),
2279 max_per_class: NZU32!(1),
2280 parallelism: NZUsize!(1),
2281 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
2282 prefill: false,
2283 alignment: NZUsize!(4),
2284 }
2285 .with_budget_bytes(NZUsize!(280));
2286 assert_eq!(config.max_per_class.get(), 10);
2287
2288 // Budget 10 rounds up to one buffer per class.
2289 let small_budget = BufferPoolConfig {
2290 pool_min_size: 0,
2291 min_size: NZUsize!(4),
2292 max_size: NZUsize!(16),
2293 max_per_class: NZU32!(1),
2294 parallelism: NZUsize!(1),
2295 thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
2296 prefill: false,
2297 alignment: NZUsize!(4),
2298 }
2299 .with_budget_bytes(NZUsize!(10));
2300 assert_eq!(small_budget.max_per_class.get(), 1);
2301 }
2302
2303 #[test]
2304 fn test_pool_error_display() {
2305 assert_eq!(
2306 PoolError::Oversized.to_string(),
2307 "requested capacity exceeds maximum buffer size"
2308 );
2309 assert_eq!(
2310 PoolError::Exhausted.to_string(),
2311 "pool exhausted for required size class"
2312 );
2313 }
2314
2315 #[test]
2316 fn test_pool_debug_and_config_accessor() {
2317 // Debug formatting and config accessor should be consistent.
2318 let page = page_size();
2319 let pool = test_pool(test_config(page, page, 2));
2320
2321 let debug = format!("{pool:?}");
2322 assert!(debug.contains("BufferPool"));
2323 assert!(debug.contains("num_classes"));
2324 assert_eq!(pool.config().min_size.get(), page);
2325 }
2326
2327 #[test]
2328 fn test_return_buffer_local_overflow_spills_to_global() {
2329 let page = page_size();
2330 let pool = test_pool(test_config(page, page, 2));
2331 let class_index = pool
2332 .class_index(page)
2333 .expect("class exists for page-sized buffer");
2334
2335 let tracked1 = pool.try_alloc(page).expect("first tracked allocation");
2336 let tracked2 = pool.try_alloc(page).expect("second tracked allocation");
2337
2338 // The first return should stay entirely in the current thread's local cache.
2339 drop(tracked1);
2340 assert_eq!(get_global_len(&pool.inner.classes[class_index]), 0);
2341 assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
2342
2343 // Returning another tracked buffer should route overflow to the global
2344 // freelist and retain one in the current thread's local bin.
2345 drop(tracked2);
2346 assert_eq!(get_global_len(&pool.inner.classes[class_index]), 1);
2347 assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
2348 assert_eq!(get_available(&pool, page), 2);
2349 }
2350
2351 #[test]
2352 fn test_small_local_cache_overflow_preserves_locality() {
2353 let page = page_size();
2354 let pool = test_pool(test_config(page, page, 2));
2355
2356 // With `thread_cache_capacity == 1`, the first return stays local and the
2357 // second overflows directly to global instead of spilling the hot
2358 // local entry through the shared queue.
2359 let mut tracked1 = pool.try_alloc(page).expect("first tracked allocation");
2360 let ptr1 = tracked1.as_mut_ptr();
2361 let mut tracked2 = pool.try_alloc(page).expect("second tracked allocation");
2362 let ptr2 = tracked2.as_mut_ptr();
2363
2364 drop(tracked1);
2365 drop(tracked2);
2366
2367 let mut reused_local = pool.try_alloc(page).expect("reuse from local cache");
2368 assert_eq!(reused_local.as_mut_ptr(), ptr1);
2369
2370 let mut reused_global = pool.try_alloc(page).expect("reuse from global freelist");
2371 assert_eq!(reused_global.as_mut_ptr(), ptr2);
2372 }
2373
2374 #[test]
2375 fn test_large_local_cache_batches_overflow_and_refill() {
2376 let page = page_size();
2377 let threads = std::thread::available_parallelism().map_or(1, NonZeroUsize::get);
2378 let max_per_class =
2379 u32::try_from(threads * 8).expect("test capacity must fit in u32 slot ids");
2380 let pool = test_pool(test_config(page, page, max_per_class));
2381 let class_index = pool
2382 .class_index(page)
2383 .expect("class exists for page-sized buffer");
2384 let class = &pool.inner.classes[class_index];
2385
2386 assert!(class.thread_cache_capacity >= MIN_TLS_BATCH_CAPACITY);
2387
2388 // Drop enough distinct pooled buffers to force an overflow from a
2389 // full local cache. Large bins should spill half the entries to global
2390 // and keep the remainder local for fast same-thread reuse.
2391 let mut bufs = Vec::new();
2392 for _ in 0..class.thread_cache_capacity + 1 {
2393 bufs.push(pool.try_alloc(page).expect("tracked allocation"));
2394 }
2395 for buf in bufs {
2396 drop(buf);
2397 }
2398
2399 assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 + 1);
2400 assert_eq!(get_global_len(class), class.thread_cache_capacity / 2);
2401
2402 // Drain the local half, then hit global once. That global take should
2403 // batch-refill the local cache back up to the configured target.
2404 let mut reused = Vec::new();
2405 for _ in 0..class.thread_cache_capacity / 2 + 1 {
2406 reused.push(pool.try_alloc(page).expect("local reuse"));
2407 }
2408 assert_eq!(get_local_len(class), 0);
2409 assert_eq!(get_global_len(class), class.thread_cache_capacity / 2);
2410
2411 let _global = pool.try_alloc(page).expect("global reuse with refill");
2412 assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 - 1);
2413 assert_eq!(get_global_len(class), 0);
2414 }
2415
2416 #[test]
2417 fn test_global_batch_alloc_stops_when_global_runs_empty() {
2418 let class = test_size_class(64, 64);
2419 let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2420
2421 // A short global freelist should return the allocation and stop
2422 // without filling the local cache to its batch target.
2423 class.global.put(slot, buffer);
2424 let (buffer, lease, slot) = BufferPoolThreadCache::pop(&class).expect("global allocation");
2425
2426 assert_eq!(get_local_len(&class), 0);
2427 assert_eq!(get_global_len(&class), 0);
2428
2429 // Return the manually popped entry so the freelist owns and deallocates
2430 // the buffer at test teardown.
2431 lease.return_global(slot, buffer);
2432 }
2433
2434 #[test]
2435 fn test_size_class_leases_use_raw_arc_tokens_across_cache_paths() {
2436 let class = test_size_class(64, 64);
2437 let mut cache = TlsSizeClassCache::new(class.token, MIN_TLS_BATCH_CAPACITY);
2438 assert_eq!(size_class_strong_count(&class), 1);
2439
2440 let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2441 let lease = SizeClassLease::retain(&class);
2442 assert_eq!(size_class_strong_count(&class), 2);
2443
2444 // Moving a pooled-buffer lease into the local cache banks the same strong
2445 // reference, it should not clone the class.
2446 cache.push(lease, slot, buffer);
2447 assert_eq!(size_class_strong_count(&class), 2);
2448
2449 let (entry, lease) = cache.pop(&class).expect("local cache pop");
2450 assert_eq!(size_class_strong_count(&class), 2);
2451 lease.return_global(entry.slot, entry.buffer);
2452 assert_eq!(size_class_strong_count(&class), 1);
2453
2454 for _ in 0..2 {
2455 let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2456 class.global.put(slot, buffer);
2457 }
2458
2459 let (entry, lease) = cache.pop(&class).expect("global refill");
2460 assert_eq!(size_class_strong_count(&class), 3);
2461
2462 lease.return_global(entry.slot, entry.buffer);
2463 assert_eq!(size_class_strong_count(&class), 2);
2464
2465 // Dropping the cache returns the banked refill entry and releases its
2466 // size-class reference.
2467 drop(cache);
2468 assert_eq!(size_class_strong_count(&class), 1);
2469 }
2470
2471 #[test]
2472 fn test_tls_size_class_cache_push_tolerates_empty_spill() {
2473 let class = test_size_class(64, 64);
2474 let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
2475 let lease = SizeClassLease::retain(&class);
2476 let mut cache = TlsSizeClassCache::new(class.token, 0);
2477
2478 // Small local capacities should bypass batching and push straight to
2479 // global. The retained reference above is represented by this lease and
2480 // transferred into `cache.push`.
2481 cache.push(lease, slot, buffer);
2482 assert_eq!(cache.len, 0);
2483 drop(cache);
2484 }
2485
2486 #[test]
2487 fn test_global_freelist_returns_each_slot_once() {
2488 // Use a two-slot class with TLS capacity one so this test can exercise
2489 // the class-global freelist directly without involving local-cache
2490 // refill or spill behavior.
2491 let class = SizeClassHandle::new(
2492 NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
2493 64,
2494 64,
2495 NZU32!(2),
2496 NZUsize!(1),
2497 1,
2498 false,
2499 );
2500
2501 // Create both slot ids and keep each allocation's pointer so we can
2502 // verify that the freelist returns the same buffer parked for that slot.
2503 let (slot0, buffer0) = class.global.try_create(false).expect("first slot");
2504 let ptr0 = buffer0.as_ptr();
2505 let (slot1, buffer1) = class.global.try_create(false).expect("second slot");
2506 let ptr1 = buffer1.as_ptr();
2507
2508 class.global.put(slot0, buffer0);
2509 class.global.put(slot1, buffer1);
2510
2511 // The freelist does not preserve insertion order, so normalize by slot
2512 // before asserting identity. The important property is that each slot is
2513 // returned exactly once with its original parked buffer.
2514 let mut popped = [
2515 class.global.take().expect("first pop"),
2516 class.global.take().expect("second pop"),
2517 ];
2518 popped.sort_by_key(|(slot, _)| *slot);
2519
2520 assert_eq!(popped[0].0, slot0);
2521 assert_eq!(popped[0].1.as_ptr(), ptr0);
2522 assert_eq!(popped[1].0, slot1);
2523 assert_eq!(popped[1].1.as_ptr(), ptr1);
2524
2525 // Both slots were claimed above, so the global freelist is empty.
2526 assert!(class.global.take().is_none());
2527
2528 // Return the buffers so the freelist owns and deallocates them when the
2529 // test size class is dropped.
2530 for (slot, buffer) in popped {
2531 class.global.put(slot, buffer);
2532 }
2533 }
2534
2535 #[test]
2536 fn test_pooled_debug_and_empty_into_bytes_paths() {
2537 // Debug formatting for pooled mutable/immutable wrappers, and empty
2538 // into_bytes should detach without retaining the pool allocation.
2539 let page = page_size();
2540 let class = test_size_class(page, page);
2541 let (slot0, buffer0, class0) = class.try_create(false).expect("first slot");
2542 let (slot1, buffer1, class1) = class.try_create(false).expect("second slot");
2543 let (slot2, buffer2, class2) = class.try_create(false).expect("third slot");
2544
2545 // Mutable pooled debug should include cursor position.
2546 let pooled_mut_debug = {
2547 let pooled_mut = PooledBufMut::new(buffer0, class0, slot0);
2548 format!("{pooled_mut:?}")
2549 };
2550 assert!(pooled_mut_debug.contains("PooledBufMut"));
2551 assert!(pooled_mut_debug.contains("cursor"));
2552
2553 // Empty mutable buffer converts to empty Bytes without retaining pool memory.
2554 let empty_from_mut = PooledBufMut::new(buffer1, class1, slot1);
2555 assert!(empty_from_mut.into_bytes().is_empty());
2556
2557 // Immutable pooled debug should include capacity.
2558 let pooled = PooledBufMut::new(buffer2, class2, slot2).into_pooled();
2559 let pooled_debug = format!("{pooled:?}");
2560 assert!(pooled_debug.contains("PooledBuf"));
2561 assert!(pooled_debug.contains("capacity"));
2562 assert!(pooled.into_bytes().is_empty());
2563
2564 BufferPoolThreadCache::flush();
2565 }
2566
2567 #[test]
2568 fn test_freeze_returns_buffer_to_pool() {
2569 let page = page_size();
2570 let pool = test_pool(test_config(page, page, 2));
2571
2572 // Initially: 0 allocated, 0 available
2573 assert_eq!(get_allocated(&pool, page), 0);
2574 assert_eq!(get_available(&pool, page), 0);
2575
2576 // Allocate and freeze
2577 let buf = pool.try_alloc(page).unwrap();
2578 assert_eq!(get_allocated(&pool, page), 1);
2579 assert_eq!(get_available(&pool, page), 0);
2580
2581 let iobuf = buf.freeze();
2582 // Still allocated (held by IoBuf)
2583 assert_eq!(get_allocated(&pool, page), 1);
2584
2585 // Drop the IoBuf - buffer should return to pool
2586 drop(iobuf);
2587 assert_eq!(get_allocated(&pool, page), 0);
2588 assert_eq!(get_available(&pool, page), 1);
2589 }
2590
2591 #[test]
2592 fn test_refcount_and_copy_to_bytes_paths() {
2593 let page = page_size();
2594 let pool = test_pool(test_config(page, page, 2));
2595
2596 // Refcount behavior:
2597 // - clone/slice keep the pooled allocation alive
2598 // - empty slice does not keep ownership
2599 {
2600 let mut buf = pool.try_alloc(page).unwrap();
2601 buf.put_slice(&[0xAA; 100]);
2602 let iobuf = buf.freeze();
2603 let clone = iobuf.clone();
2604 let slice = iobuf.slice(10..40);
2605 let empty = iobuf.slice(10..10);
2606 assert!(empty.is_empty());
2607 drop(iobuf);
2608 assert_eq!(get_allocated(&pool, page), 1);
2609 drop(slice);
2610 assert_eq!(get_allocated(&pool, page), 1);
2611 drop(clone);
2612 assert_eq!(get_allocated(&pool, page), 0);
2613 }
2614
2615 // IoBuf::copy_to_bytes behavior:
2616 // - zero-length copy is empty and non-advancing
2617 // - partial copy advances while keeping ownership alive
2618 // - full drain transfers ownership out of source
2619 // - zero-length copy on already-empty source stays detached
2620 {
2621 let mut buf = pool.try_alloc(page).unwrap();
2622 buf.put_slice(&[0x42; 100]);
2623 let mut iobuf = buf.freeze();
2624
2625 let zero = iobuf.copy_to_bytes(0);
2626 assert!(zero.is_empty());
2627 assert_eq!(iobuf.remaining(), 100);
2628
2629 let partial = iobuf.copy_to_bytes(30);
2630 assert_eq!(&partial[..], &[0x42; 30]);
2631 assert_eq!(iobuf.remaining(), 70);
2632
2633 let rest = iobuf.copy_to_bytes(70);
2634 assert_eq!(&rest[..], &[0x42; 70]);
2635 assert_eq!(iobuf.remaining(), 0);
2636
2637 // Zero-length copy on empty should not transfer ownership.
2638 let empty = iobuf.copy_to_bytes(0);
2639 assert!(empty.is_empty());
2640
2641 drop(iobuf);
2642 assert_eq!(get_allocated(&pool, page), 1);
2643 drop(zero);
2644 drop(partial);
2645 assert_eq!(get_allocated(&pool, page), 1);
2646 drop(rest);
2647 assert_eq!(get_allocated(&pool, page), 0);
2648 }
2649
2650 // IoBufMut::copy_to_bytes mirrors the immutable ownership semantics.
2651 {
2652 let buf = pool.try_alloc(page).unwrap();
2653 let mut iobufmut = buf;
2654 iobufmut.put_slice(&[0x7E; 100]);
2655
2656 let zero = iobufmut.copy_to_bytes(0);
2657 assert!(zero.is_empty());
2658 assert_eq!(iobufmut.remaining(), 100);
2659
2660 let partial = iobufmut.copy_to_bytes(30);
2661 assert_eq!(&partial[..], &[0x7E; 30]);
2662 assert_eq!(iobufmut.remaining(), 70);
2663
2664 let rest = iobufmut.copy_to_bytes(70);
2665 assert_eq!(&rest[..], &[0x7E; 70]);
2666 assert_eq!(iobufmut.remaining(), 0);
2667
2668 drop(iobufmut);
2669 assert_eq!(get_allocated(&pool, page), 1);
2670 drop(zero);
2671 drop(partial);
2672 assert_eq!(get_allocated(&pool, page), 1);
2673 drop(rest);
2674 assert_eq!(get_allocated(&pool, page), 0);
2675 }
2676 }
2677
2678 #[test]
2679 fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
2680 // IoBuf -> IoBufMut should recover pooled ownership for unique non-full views.
2681 let page = page_size();
2682 let pool = test_pool(test_config(page, page, 2));
2683
2684 let buf = pool.try_alloc(page).unwrap();
2685 assert_eq!(get_allocated(&pool, page), 1);
2686
2687 let iobuf = buf.freeze();
2688 assert_eq!(get_allocated(&pool, page), 1);
2689
2690 let iobufmut: IoBufMut = iobuf.into();
2691
2692 // Conversion reused pooled storage instead of copying.
2693 assert_eq!(
2694 get_allocated(&pool, page),
2695 1,
2696 "pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
2697 );
2698 assert_eq!(get_available(&pool, page), 0);
2699
2700 // Dropping returns the pooled buffer.
2701 drop(iobufmut);
2702 assert_eq!(get_allocated(&pool, page), 0);
2703 assert_eq!(get_available(&pool, page), 1);
2704 }
2705
2706 #[test]
2707 fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
2708 // IoBuf -> IoBufMut via From should preserve data and keep pooled
2709 // ownership for a fully-written unique view.
2710 let page = page_size();
2711 let pool = test_pool(test_config(page, page, 2));
2712
2713 // Fill a pooled buffer completely and freeze.
2714 let mut buf = pool.try_alloc(page).unwrap();
2715 buf.put_slice(&vec![0xEE; page]);
2716 let iobuf = buf.freeze();
2717
2718 // Convert back to mutable; should reuse pooled storage.
2719 let iobufmut: IoBufMut = iobuf.into();
2720 assert_eq!(iobufmut.len(), page);
2721 assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
2722 assert_eq!(get_allocated(&pool, page), 1);
2723 assert_eq!(get_available(&pool, page), 0);
2724
2725 // Dropping returns the buffer to the pool.
2726 drop(iobufmut);
2727 assert_eq!(get_allocated(&pool, page), 0);
2728 assert_eq!(get_available(&pool, page), 1);
2729 }
2730
2731 #[test]
2732 fn test_iobuf_try_into_mut_recycles_full_unique_view() {
2733 // try_into_mut on a uniquely-owned full-view pooled IoBuf should recover
2734 // mutable ownership without copying, preserving data and pool tracking.
2735 let page = page_size();
2736 let pool = test_pool(test_config(page, page, 2));
2737
2738 let mut buf = pool.try_alloc(page).unwrap();
2739 buf.put_slice(&vec![0xAB; page]);
2740 let iobuf = buf.freeze();
2741 assert_eq!(get_allocated(&pool, page), 1);
2742
2743 // Unique full view should recycle.
2744 let recycled = iobuf
2745 .try_into_mut()
2746 .expect("unique full-view pooled buffer should recycle");
2747 assert_eq!(recycled.len(), page);
2748 assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
2749 assert_eq!(recycled.capacity(), page);
2750 assert_eq!(get_allocated(&pool, page), 1);
2751
2752 drop(recycled);
2753 assert_eq!(get_allocated(&pool, page), 0);
2754 assert_eq!(get_available(&pool, page), 1);
2755 }
2756
2757 #[test]
2758 fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
2759 let page = page_size();
2760 let pool = test_pool(test_config(page, page, 2));
2761
2762 // Unique sliced views can recover mutable ownership without copying.
2763 let mut buf = pool.try_alloc(page).unwrap();
2764 buf.put_slice(&vec![0xCD; page]);
2765 let iobuf = buf.freeze();
2766 let sliced = iobuf.slice(1..page);
2767 drop(iobuf);
2768 let recycled = sliced
2769 .try_into_mut()
2770 .expect("unique sliced pooled buffer should recycle");
2771 assert_eq!(recycled.len(), page - 1);
2772 assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
2773 assert_eq!(recycled.capacity(), page - 1);
2774 assert_eq!(get_allocated(&pool, page), 1);
2775 drop(recycled);
2776 assert_eq!(get_allocated(&pool, page), 0);
2777 assert_eq!(get_available(&pool, page), 1);
2778
2779 // Shared views still cannot recover mutable ownership.
2780 let mut buf = pool.try_alloc(page).unwrap();
2781 buf.put_slice(&vec![0xEF; page]);
2782 let iobuf = buf.freeze();
2783 let cloned = iobuf.clone();
2784 let iobuf = iobuf
2785 .try_into_mut()
2786 .expect_err("shared pooled buffer must not convert to mutable");
2787
2788 drop(cloned);
2789 drop(iobuf);
2790 assert_eq!(get_allocated(&pool, page), 0);
2791 assert!(get_available(&pool, page) >= 1);
2792 }
2793
2794 #[test]
2795 fn test_multithreaded_alloc_freeze_return() {
2796 let page = page_size();
2797 let pool = Arc::new(test_pool(test_config(page, page, 100)));
2798
2799 let mut handles = vec![];
2800
2801 // Reduce iterations under miri (atomics are slow)
2802 cfg_if::cfg_if! {
2803 if #[cfg(miri)] {
2804 let iterations = 100;
2805 } else {
2806 let iterations = 1000;
2807 }
2808 }
2809
2810 // Spawn multiple threads that allocate, freeze, clone, and drop
2811 for _ in 0..10 {
2812 let pool = pool.clone();
2813 let handle = thread::spawn(move || {
2814 for _ in 0..iterations {
2815 let buf = pool.try_alloc(page).unwrap();
2816 let iobuf = buf.freeze();
2817
2818 // Clone a few times
2819 let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
2820 drop(iobuf);
2821
2822 // Drop clones
2823 for clone in clones {
2824 drop(clone);
2825 }
2826 }
2827 });
2828 handles.push(handle);
2829 }
2830
2831 // Wait for all threads
2832 for handle in handles {
2833 handle.join().unwrap();
2834 }
2835
2836 // Worker threads may retain free buffers in their own local caches, so
2837 // the main thread cannot assert that all of them are visible here.
2838 // It should still be able to allocate successfully once the workers finish.
2839 let _buf = pool
2840 .try_alloc(page)
2841 .expect("pool should remain usable after multithreaded test");
2842 }
2843
2844 #[test]
2845 fn test_cross_thread_buffer_return() {
2846 // Allocate on one thread, freeze, send to another thread, drop there
2847 let page = page_size();
2848 let pool = test_pool(test_config(page, page, 100));
2849
2850 let (tx, rx) = mpsc::channel();
2851
2852 // Allocate and freeze on main thread
2853 for _ in 0..50 {
2854 let buf = pool.try_alloc(page).unwrap();
2855 let iobuf = buf.freeze();
2856 tx.send(iobuf).unwrap();
2857 }
2858 drop(tx);
2859
2860 // Receive and drop on another thread. Cross-thread returns initialize
2861 // the dropping thread's local cache, so the buffers remain local to that
2862 // thread instead of bouncing through the global freelist.
2863 let handle = thread::spawn(move || {
2864 while let Ok(iobuf) = rx.recv() {
2865 drop(iobuf);
2866 }
2867
2868 let class_index = pool
2869 .class_index(page)
2870 .expect("class exists for page-sized buffer");
2871 assert_eq!(get_local_len(&pool.inner.classes[class_index]), 50);
2872 assert_eq!(get_global_len(&pool.inner.classes[class_index]), 0);
2873
2874 for _ in 0..50 {
2875 let _buf = pool
2876 .try_alloc(page)
2877 .expect("dropping thread should be able to reuse locally returned buffers");
2878 }
2879 });
2880
2881 handle.join().unwrap();
2882 }
2883
2884 #[test]
2885 fn test_thread_exit_flushes_local_bin() {
2886 // When a thread exits, its TLS cache Drop flushes buffers back to the
2887 // global freelist, making them available to other threads.
2888 let page = page_size();
2889 let pool = Arc::new(test_pool(test_config(page, page, 1)));
2890
2891 // Allocate and return a buffer on a worker thread, then let it exit.
2892 let worker_pool = pool.clone();
2893 thread::spawn(move || {
2894 let buf = worker_pool
2895 .try_alloc(page)
2896 .expect("worker should allocate tracked buffer");
2897 drop(buf);
2898 })
2899 .join()
2900 .expect("worker thread should exit cleanly");
2901
2902 // After thread exit, the buffer should be in the global freelist (not
2903 // stuck in a dead thread's local cache).
2904 let class_index = pool
2905 .class_index(page)
2906 .expect("class exists for page-sized buffer");
2907 assert_eq!(get_global_len(&pool.inner.classes[class_index]), 1);
2908 assert_eq!(get_local_len(&pool.inner.classes[class_index]), 0);
2909
2910 // The flushed buffer should be reusable from the main thread.
2911 let _buf = pool
2912 .try_alloc(page)
2913 .expect("thread-exited local buffer should be reusable");
2914 }
2915
2916 #[test]
2917 fn test_pool_drop_drains_global_freelist() {
2918 // Dropping the pool should immediately reclaim globally-visible free
2919 // tracked buffers, while leaving TLS-cached buffers alone.
2920 let page = page_size();
2921 let pool = test_pool(test_config(page, page, 2));
2922 let class_index = pool
2923 .class_index(page)
2924 .expect("class exists for page-sized buffer");
2925 let class = &pool.inner.classes[class_index];
2926 // Keep a test-owned handle so the class remains inspectable after
2927 // dropping the public pool below.
2928 // SAFETY: `class` owns one strong reference for `class.token`.
2929 unsafe { class.token.retain() };
2930 let class = SizeClassHandle { token: class.token };
2931
2932 // Return one buffer to the current thread's local cache and overflow
2933 // the other into the shared global freelist.
2934 let buf1 = pool.try_alloc(page).unwrap();
2935 let buf2 = pool.try_alloc(page).unwrap();
2936 drop(buf1);
2937 drop(buf2);
2938
2939 assert_eq!(get_global_len(&class), 1);
2940 assert_eq!(get_local_len(&class), 1);
2941
2942 // Pool drop should drain only the global freelist. The thread-local
2943 // cache remains untouched until thread exit.
2944 drop(pool);
2945
2946 assert_eq!(get_global_len(&class), 0);
2947 assert_eq!(get_local_len(&class), 1);
2948 assert_eq!(get_global_created(&class), 2);
2949 }
2950
2951 #[test]
2952 fn test_pool_dropped_before_buffer() {
2953 // What happens if the pool is dropped while buffers are still in use?
2954 // The size class remains alive until the last tracked buffer is dropped.
2955
2956 let page = page_size();
2957 let pool = test_pool(test_config(page, page, 2));
2958
2959 let mut buf = pool.try_alloc(page).unwrap();
2960 buf.put_slice(&[0u8; 100]);
2961 let iobuf = buf.freeze();
2962
2963 // Drop the pool while buffer is still alive
2964 drop(pool);
2965
2966 // Buffer should still be usable
2967 assert_eq!(iobuf.len(), 100);
2968
2969 // Dropping the buffer should not panic and should return to the retained size class.
2970 drop(iobuf);
2971 // No assertion here - we just want to make sure it doesn't panic
2972 }
2973
2974 #[test]
2975 fn test_pool_exhaustion_and_recovery() {
2976 // Test pool exhaustion and recovery.
2977 let page = page_size();
2978 let pool = test_pool(test_config(page, page, 3));
2979
2980 // Exhaust the pool
2981 let buf1 = pool.try_alloc(page).expect("first alloc");
2982 let buf2 = pool.try_alloc(page).expect("second alloc");
2983 let buf3 = pool.try_alloc(page).expect("third alloc");
2984 assert!(pool.try_alloc(page).is_err(), "pool should be exhausted");
2985
2986 // Return one buffer
2987 drop(buf1);
2988
2989 // Should be able to allocate again
2990 let buf4 = pool.try_alloc(page).expect("alloc after return");
2991 assert!(pool.try_alloc(page).is_err(), "pool exhausted again");
2992
2993 // Return all and verify freelist reuse
2994 drop(buf2);
2995 drop(buf3);
2996 drop(buf4);
2997
2998 assert_eq!(get_allocated(&pool, page), 0);
2999 assert_eq!(get_available(&pool, page), 3);
3000
3001 // Allocate again - should reuse from freelist
3002 let _buf5 = pool.try_alloc(page).expect("reuse from freelist");
3003 assert_eq!(get_available(&pool, page), 2);
3004 }
3005
3006 #[test]
3007 fn test_try_alloc_errors() {
3008 // Test try_alloc error variants.
3009 let page = page_size();
3010 let pool = test_pool(test_config(page, page, 2));
3011
3012 // Oversized request
3013 let result = pool.try_alloc(page * 10);
3014 assert_eq!(result.unwrap_err(), PoolError::Oversized);
3015
3016 // Exhaust pool
3017 let _buf1 = pool.try_alloc(page).unwrap();
3018 let _buf2 = pool.try_alloc(page).unwrap();
3019 let result = pool.try_alloc(page);
3020 assert_eq!(result.unwrap_err(), PoolError::Exhausted);
3021 }
3022
3023 #[test]
3024 fn test_try_alloc_zeroed_errors() {
3025 // try_alloc_zeroed should return the same error variants as try_alloc.
3026 let page = page_size();
3027 let pool = test_pool(test_config(page, page, 2));
3028
3029 // Oversized request.
3030 let result = pool.try_alloc_zeroed(page * 10);
3031 assert_eq!(result.unwrap_err(), PoolError::Oversized);
3032
3033 // Exhaust pool, then verify Exhausted error.
3034 let _buf1 = pool.try_alloc_zeroed(page).unwrap();
3035 let _buf2 = pool.try_alloc_zeroed(page).unwrap();
3036 let result = pool.try_alloc_zeroed(page);
3037 assert_eq!(result.unwrap_err(), PoolError::Exhausted);
3038 }
3039
3040 #[test]
3041 fn test_fallback_allocation() {
3042 // Test fallback allocation when pool is exhausted or oversized.
3043 let page = page_size();
3044 let pool = test_pool(test_config(page, page, 2));
3045
3046 // Exhaust the pool
3047 let buf1 = pool.try_alloc(page).unwrap();
3048 let buf2 = pool.try_alloc(page).unwrap();
3049 assert!(buf1.is_pooled());
3050 assert!(buf2.is_pooled());
3051
3052 // Fallback via alloc() when exhausted - still aligned, but untracked
3053 let mut fallback_exhausted = pool.alloc(page);
3054 assert!(!fallback_exhausted.is_pooled());
3055 assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
3056
3057 // Fallback via alloc() when oversized - still aligned, but untracked
3058 let mut fallback_oversized = pool.alloc(page * 10);
3059 assert!(!fallback_oversized.is_pooled());
3060 assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
3061
3062 // Verify pool counters unchanged by fallback allocations
3063 assert_eq!(get_allocated(&pool, page), 2);
3064
3065 // Drop fallback buffers - should not affect pool counters
3066 drop(fallback_exhausted);
3067 drop(fallback_oversized);
3068 assert_eq!(get_allocated(&pool, page), 2);
3069
3070 // Drop tracked buffers - counters should decrease
3071 drop(buf1);
3072 drop(buf2);
3073 assert_eq!(get_allocated(&pool, page), 0);
3074 }
3075
3076 #[test]
3077 fn test_is_pooled() {
3078 // IoBufMut from the pool should report is_pooled, while heap-backed
3079 // buffers should not.
3080 let page = page_size();
3081 let pool = test_pool(test_config(page, page, 10));
3082
3083 let pooled = pool.try_alloc(page).unwrap();
3084 assert!(pooled.is_pooled());
3085
3086 let owned = IoBufMut::with_capacity(100);
3087 assert!(!owned.is_pooled());
3088 }
3089
3090 #[test]
3091 fn test_iobuf_is_pooled() {
3092 let page = page_size();
3093 let pool = test_pool(test_config(page, page, 2));
3094
3095 let pooled = pool.try_alloc(page).unwrap().freeze();
3096 assert!(pooled.is_pooled());
3097
3098 // Oversized alloc uses untracked fallback allocation.
3099 let fallback = pool.alloc(page * 10).freeze();
3100 assert!(!fallback.is_pooled());
3101
3102 let bytes = IoBuf::copy_from_slice(b"hello");
3103 assert!(!bytes.is_pooled());
3104 }
3105
3106 #[test]
3107 fn test_buffer_alignment() {
3108 let page = page_size();
3109 let cache_line = cache_line_size();
3110
3111 // Reduce max_per_class under miri (atomics are slow)
3112 cfg_if::cfg_if! {
3113 if #[cfg(miri)] {
3114 let storage_config = BufferPoolConfig {
3115 max_per_class: NZU32!(32),
3116 ..BufferPoolConfig::for_storage().with_alignment(NZUsize!(page))
3117 };
3118 let network_config = BufferPoolConfig {
3119 max_per_class: NZU32!(32),
3120 ..BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line))
3121 };
3122 } else {
3123 let storage_config =
3124 BufferPoolConfig::for_storage().with_alignment(NZUsize!(page));
3125 let network_config =
3126 BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line));
3127 }
3128 }
3129
3130 // Storage preset - page aligned
3131 let storage_buffer_pool = test_pool(storage_config);
3132 let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
3133 assert_eq!(
3134 buf.as_mut_ptr() as usize % page,
3135 0,
3136 "storage buffer not page-aligned"
3137 );
3138
3139 // Network preset - cache-line aligned
3140 let network_buffer_pool = test_pool(network_config);
3141 let mut buf = network_buffer_pool.try_alloc(100).unwrap();
3142 assert_eq!(
3143 buf.as_mut_ptr() as usize % cache_line,
3144 0,
3145 "network buffer not cache-line aligned"
3146 );
3147 }
3148}