use super::{freelist::Freelist, page_size, IoBufMut};
use crate::{
iobuf::buffer::{PooledBufMut, PooledBuffer},
telemetry::metrics::{raw, Counter, CounterFamily, EncodeLabelSet, GaugeFamily, Register},
};
use commonware_utils::{NZUsize, NZU32};
use std::{
alloc::Layout,
cell::{Cell, UnsafeCell},
mem::MaybeUninit,
num::{NonZeroU32, NonZeroUsize},
ptr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
const MIN_TLS_BATCH_CAPACITY: usize = 4;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolError {
Oversized,
Exhausted,
}
impl std::fmt::Display for PoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
Self::Exhausted => write!(f, "pool exhausted for required size class"),
}
}
}
impl std::error::Error for PoolError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BufferPoolThreadCacheConfig {
Enabled(Option<NonZeroUsize>),
Disabled,
}
#[derive(Debug, Clone)]
pub struct BufferPoolConfig {
pub pool_min_size: usize,
pub min_size: NonZeroUsize,
pub max_size: NonZeroUsize,
pub max_per_class: NonZeroU32,
pub prefill: bool,
pub alignment: NonZeroUsize,
pub parallelism: NonZeroUsize,
pub(crate) thread_cache_config: BufferPoolThreadCacheConfig,
}
impl BufferPoolConfig {
pub const fn for_network() -> Self {
Self {
pool_min_size: 0,
min_size: NZUsize!(1024),
max_size: NZUsize!(128 * 1024),
max_per_class: NZU32!(4096),
prefill: false,
alignment: NZUsize!(1),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
}
}
pub fn for_storage() -> Self {
let page = NZUsize!(page_size());
Self {
pool_min_size: 0,
min_size: page,
max_size: NZUsize!(8 * 1024 * 1024),
max_per_class: NZU32!(64),
prefill: false,
alignment: NZUsize!(1),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
}
}
pub const fn with_pool_min_size(mut self, pool_min_size: usize) -> Self {
self.pool_min_size = pool_min_size;
self
}
pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
self.min_size = min_size;
self
}
pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
self.max_size = max_size;
self
}
pub const fn with_max_per_class(mut self, max_per_class: NonZeroU32) -> Self {
self.max_per_class = max_per_class;
self
}
pub const fn with_parallelism(mut self, parallelism: NonZeroUsize) -> Self {
self.parallelism = parallelism;
self
}
pub const fn with_thread_cache_capacity(mut self, thread_cache_capacity: NonZeroUsize) -> Self {
self.thread_cache_config =
BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity));
self
}
pub const fn with_thread_cache_disabled(mut self) -> Self {
self.thread_cache_config = BufferPoolThreadCacheConfig::Disabled;
self
}
pub const fn with_prefill(mut self, prefill: bool) -> Self {
self.prefill = prefill;
self
}
pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
self.alignment = alignment;
self
}
pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
self.validate_size_class_bounds();
let mut class_bytes = 0usize;
let min_size = self.min_size.get();
for i in 0..Self::num_classes(min_size, self.max_size.get()) {
class_bytes = class_bytes.saturating_add(Self::class_size(min_size, i));
}
if class_bytes == 0 {
return self;
}
let max_per_class = u32::try_from(budget_bytes.get().div_ceil(class_bytes))
.expect("max_per_class must fit in u32 slot ids");
self.max_per_class =
NonZeroU32::new(max_per_class).expect("max_per_class must be non-zero");
self
}
fn validate_size_class_bounds(&self) {
let min_size = self.min_size.get();
let max_size = self.max_size.get();
assert!(
min_size.is_power_of_two(),
"min_size must be a power of two"
);
assert!(
max_size.is_power_of_two(),
"max_size must be a power of two"
);
assert!(max_size >= min_size, "max_size must be >= min_size");
}
fn validate(&self) {
self.validate_size_class_bounds();
assert!(
self.alignment.is_power_of_two(),
"alignment must be a power of two"
);
assert!(
self.min_size >= self.alignment,
"min_size ({}) must be >= alignment ({})",
self.min_size,
self.alignment
);
assert!(
self.pool_min_size <= self.min_size.get(),
"pool_min_size ({}) must be <= min_size ({})",
self.pool_min_size,
self.min_size
);
if let BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity)) =
self.thread_cache_config
{
assert!(
thread_cache_capacity.get() <= self.max_per_class.get() as usize,
"thread_cache_capacity ({}) must be <= max_per_class ({})",
thread_cache_capacity,
self.max_per_class
);
}
}
#[inline]
const fn num_classes(min_size: usize, max_size: usize) -> usize {
(max_size.trailing_zeros() - min_size.trailing_zeros() + 1) as usize
}
#[inline]
const fn class_size(min_size: usize, index: usize) -> usize {
min_size << index
}
fn resolve_thread_cache_capacity(&self) -> usize {
match self.thread_cache_config {
BufferPoolThreadCacheConfig::Enabled(None) => {
let max_per_class = self.max_per_class.get() as usize;
let effective_threads = self.parallelism.get().min(max_per_class);
max_per_class / (2 * effective_threads)
}
BufferPoolThreadCacheConfig::Enabled(Some(thread_cache_capacity)) => {
thread_cache_capacity.get()
}
BufferPoolThreadCacheConfig::Disabled => 0,
}
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct SizeClassLabel {
size_class: u64,
}
struct PoolMetrics {
created: GaugeFamily<SizeClassLabel>,
exhausted_total: CounterFamily<SizeClassLabel>,
oversized_total: Counter,
}
impl PoolMetrics {
fn new(registry: &mut impl Register) -> Self {
Self {
created: registry.register(
"buffer_pool_created",
"Number of tracked buffers created for the pool",
raw::Family::default(),
),
exhausted_total: registry.register(
"buffer_pool_exhausted_total",
"Total number of failed allocations due to pool exhaustion",
raw::Family::default(),
),
oversized_total: registry.register(
"buffer_pool_oversized_total",
"Total number of allocation requests exceeding max buffer size",
raw::Counter::default(),
),
}
}
}
pub(super) struct SizeClass {
class_id: usize,
size: usize,
global: Freelist,
thread_cache_capacity: usize,
}
unsafe impl Send for SizeClass {}
unsafe impl Sync for SizeClass {}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct SizeClassToken {
ptr: ptr::NonNull<SizeClass>,
}
impl SizeClassToken {
fn new(class: SizeClass) -> Self {
let ptr = Arc::into_raw(Arc::new(class)).cast_mut();
let ptr = unsafe { ptr::NonNull::new_unchecked(ptr) };
Self { ptr }
}
#[inline(always)]
const unsafe fn as_ref(&self) -> &SizeClass {
unsafe { self.ptr.as_ref() }
}
#[inline(always)]
unsafe fn retain(self) {
unsafe { Arc::increment_strong_count(self.ptr.as_ptr()) };
}
#[inline(always)]
unsafe fn release(self) {
unsafe { Arc::decrement_strong_count(self.ptr.as_ptr()) };
}
}
struct SizeClassHandle {
token: SizeClassToken,
}
unsafe impl Send for SizeClassHandle {}
unsafe impl Sync for SizeClassHandle {}
impl SizeClassHandle {
fn new(
class_id: usize,
size: usize,
alignment: usize,
max: NonZeroU32,
parallelism: NonZeroUsize,
thread_cache_capacity: usize,
prefill: bool,
) -> Self {
let layout = Layout::from_size_align(size, alignment).expect("alignment is a power of two");
let freelist = Freelist::new(max, parallelism, layout, prefill);
let class = SizeClass {
class_id,
size,
global: freelist,
thread_cache_capacity,
};
Self {
token: SizeClassToken::new(class),
}
}
#[inline(always)]
fn try_create(&self, zeroed: bool) -> Option<(u32, PooledBuffer, SizeClassLease)> {
let (slot, buffer) = self.global.try_create(zeroed)?;
let class = SizeClassLease::retain(self);
Some((slot, buffer, class))
}
}
impl Drop for SizeClassHandle {
fn drop(&mut self) {
unsafe { self.token.release() };
}
}
impl std::ops::Deref for SizeClassHandle {
type Target = SizeClass;
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { self.token.as_ref() }
}
}
#[must_use]
pub(crate) struct SizeClassLease {
token: SizeClassToken,
}
unsafe impl Send for SizeClassLease {}
unsafe impl Sync for SizeClassLease {}
impl SizeClassLease {
#[inline(always)]
const unsafe fn from_banked(class: &SizeClassHandle) -> Self {
Self { token: class.token }
}
#[inline(always)]
fn retain(class: &SizeClassHandle) -> Self {
let token = class.token;
unsafe { token.retain() };
Self { token }
}
#[inline(always)]
const fn into_banked(self) {}
#[inline(always)]
const fn class(&self) -> &SizeClass {
unsafe { self.token.as_ref() }
}
#[inline(always)]
pub(crate) const fn size(&self) -> usize {
self.class().size
}
#[inline(always)]
fn return_global(self, slot: u32, buffer: PooledBuffer) {
self.class().global.put(slot, buffer);
unsafe { self.token.release() };
}
}
struct TlsSizeClassCacheEntry {
buffer: PooledBuffer,
slot: u32,
}
struct TlsSizeClassCache {
class: SizeClassToken,
entries: Box<[MaybeUninit<TlsSizeClassCacheEntry>]>,
len: usize,
capacity: usize,
}
impl TlsSizeClassCache {
fn new(class: SizeClassToken, capacity: usize) -> Self {
let entries = (0..capacity)
.map(|_| MaybeUninit::uninit())
.collect::<Vec<_>>()
.into_boxed_slice();
Self {
class,
entries,
len: 0,
capacity,
}
}
#[inline(always)]
fn pop(&mut self, class: &SizeClassHandle) -> Option<(TlsSizeClassCacheEntry, SizeClassLease)> {
if let Some(entry) = self.pop_local() {
let lease = unsafe { SizeClassLease::from_banked(class) };
return Some((entry, lease));
}
self.pop_global(class)
}
#[inline(always)]
fn pop_local(&mut self) -> Option<TlsSizeClassCacheEntry> {
if self.len == 0 {
return None;
}
self.len -= 1;
Some(unsafe { self.entries.get_unchecked(self.len).assume_init_read() })
}
#[inline(never)]
fn pop_global(
&mut self,
class: &SizeClassHandle,
) -> Option<(TlsSizeClassCacheEntry, SizeClassLease)> {
if self.capacity < MIN_TLS_BATCH_CAPACITY {
return class.global.take().map(|(slot, buffer)| {
let lease = SizeClassLease::retain(class);
(TlsSizeClassCacheEntry { buffer, slot }, lease)
});
}
let mut entry = None;
let take = self.capacity / 2;
class.global.take_batch(take, |slot, buffer| {
let cache_entry = TlsSizeClassCacheEntry { buffer, slot };
let lease = SizeClassLease::retain(class);
if entry.is_none() {
entry = Some((cache_entry, lease));
} else {
self.push_local(cache_entry, lease);
}
});
entry
}
#[inline(always)]
fn push(&mut self, lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
let entry = TlsSizeClassCacheEntry { buffer, slot };
if self.len < self.capacity {
self.push_local(entry, lease);
return;
}
self.push_full(entry, lease);
}
#[inline(always)]
fn push_local(&mut self, entry: TlsSizeClassCacheEntry, lease: SizeClassLease) {
lease.into_banked();
unsafe {
self.entries.get_unchecked_mut(self.len).write(entry);
}
self.len += 1;
}
#[inline(never)]
fn push_full(&mut self, entry: TlsSizeClassCacheEntry, lease: SizeClassLease) {
if self.capacity < MIN_TLS_BATCH_CAPACITY {
lease.return_global(entry.slot, entry.buffer);
return;
}
let spill = self.len.min(self.capacity / 2).max(1);
let end = self.len;
let start = end - spill;
self.len = start;
lease
.class()
.global
.put_batch((start..end).rev().map(|index| {
let entry = unsafe { self.entries.as_mut_ptr().add(index).read().assume_init() };
unsafe { self.class.release() };
(entry.slot, entry.buffer)
}));
self.push_local(entry, lease);
}
}
impl Drop for TlsSizeClassCache {
fn drop(&mut self) {
if self.len == 0 {
return;
}
let entries = self.entries.as_mut_ptr();
unsafe { self.class.as_ref() }
.global
.put_batch((0..self.len).rev().map(move |index| {
let entry = unsafe { entries.add(index).read().assume_init() };
(entry.slot, entry.buffer)
}));
for _ in 0..self.len {
unsafe { self.class.release() };
}
}
}
struct TlsSizeClassCaches {
bins: Vec<Option<TlsSizeClassCache>>,
}
impl TlsSizeClassCaches {
const fn new() -> Self {
Self { bins: Vec::new() }
}
#[inline(always)]
fn get_or_init(
&mut self,
class: SizeClassToken,
class_id: usize,
capacity: usize,
) -> &mut TlsSizeClassCache {
if class_id < self.bins.len() && self.bins[class_id].is_some() {
return self.bins[class_id]
.as_mut()
.expect("class cache was checked as initialized");
}
self.init(class, class_id, capacity)
}
#[inline(never)]
fn init(
&mut self,
class: SizeClassToken,
class_id: usize,
capacity: usize,
) -> &mut TlsSizeClassCache {
if class_id >= self.bins.len() {
self.bins.resize_with(class_id + 1, || None);
}
self.bins[class_id].get_or_insert_with(|| TlsSizeClassCache::new(class, capacity))
}
#[inline(always)]
fn get(&mut self, class_id: usize) -> Option<&mut TlsSizeClassCache> {
self.bins.get_mut(class_id).and_then(Option::as_mut)
}
}
impl Drop for TlsSizeClassCaches {
fn drop(&mut self) {
let this = self as *mut Self;
BufferPoolThreadCache::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| {
if fast.get() == this {
fast.set(ptr::null_mut());
}
});
}
}
pub struct BufferPoolThreadCache;
impl BufferPoolThreadCache {
thread_local! {
static TLS_SIZE_CLASS_CACHES: UnsafeCell<TlsSizeClassCaches> =
const { UnsafeCell::new(TlsSizeClassCaches::new()) };
static TLS_SIZE_CLASS_CACHES_FAST: Cell<*mut TlsSizeClassCaches> =
const { Cell::new(ptr::null_mut()) };
}
pub fn flush() {
let _ = Self::TLS_SIZE_CLASS_CACHES.try_with(|caches| {
let caches = unsafe { &mut *caches.get() };
for cache in caches.bins.iter_mut() {
let _ = cache.take();
}
});
}
#[inline(always)]
pub(super) fn push(lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
let class = lease.class();
if class.thread_cache_capacity == 0 {
lease.return_global(slot, buffer);
return;
}
let caches = Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.get());
if !caches.is_null() {
if let Some(cache) = unsafe { (&mut *caches).get(class.class_id) } {
cache.push(lease, slot, buffer);
return;
}
}
Self::push_slow(lease, slot, buffer);
}
#[inline(never)]
fn push_slow(lease: SizeClassLease, slot: u32, buffer: PooledBuffer) {
match Self::TLS_SIZE_CLASS_CACHES
.try_with(|caches| {
let class = lease.class();
let caches = caches.get();
Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.set(caches));
ptr::NonNull::from(unsafe {
(&mut *caches).get_or_init(
lease.token,
class.class_id,
class.thread_cache_capacity,
)
})
})
.ok()
{
Some(mut cache) => {
unsafe { cache.as_mut().push(lease, slot, buffer) };
}
None => lease.return_global(slot, buffer),
}
}
#[inline(always)]
fn pop(class: &SizeClassHandle) -> Option<(PooledBuffer, SizeClassLease, u32)> {
if class.thread_cache_capacity == 0 {
return class
.global
.take()
.map(|(slot, buffer)| (buffer, SizeClassLease::retain(class), slot));
}
let caches = Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.get());
if !caches.is_null() {
if let Some(cache) = unsafe { (&mut *caches).get(class.class_id) } {
return cache
.pop(class)
.map(|(entry, lease)| (entry.buffer, lease, entry.slot));
}
}
let Some(mut cache) = Self::cache_slow(class) else {
return class
.global
.take()
.map(|(slot, buffer)| (buffer, SizeClassLease::retain(class), slot));
};
unsafe { cache.as_mut() }
.pop(class)
.map(|(entry, lease)| (entry.buffer, lease, entry.slot))
}
#[inline(never)]
fn cache_slow(class: &SizeClassHandle) -> Option<ptr::NonNull<TlsSizeClassCache>> {
Self::TLS_SIZE_CLASS_CACHES
.try_with(|caches| {
let caches = caches.get();
Self::TLS_SIZE_CLASS_CACHES_FAST.with(|fast| fast.set(caches));
ptr::NonNull::from(unsafe {
(&mut *caches).get_or_init(
class.token,
class.class_id,
class.thread_cache_capacity,
)
})
})
.ok()
}
}
struct Allocation {
buffer: PooledBuffer,
is_new: bool,
lease: SizeClassLease,
slot: u32,
}
pub(crate) struct BufferPoolInner {
config: BufferPoolConfig,
classes: Vec<SizeClassHandle>,
metrics: PoolMetrics,
}
impl Drop for BufferPoolInner {
fn drop(&mut self) {
for class in &self.classes {
class.global.drain();
}
}
}
impl BufferPoolInner {
#[inline(always)]
fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
let class = &self.classes[class_index];
if let Some((buffer, lease, slot)) = BufferPoolThreadCache::pop(class) {
return Some(Allocation {
buffer,
is_new: false,
lease,
slot,
});
}
self.try_alloc_new(class, zero_on_new)
}
#[inline(never)]
fn try_alloc_new(&self, class: &SizeClassHandle, zeroed: bool) -> Option<Allocation> {
let label = SizeClassLabel {
size_class: class.size as u64,
};
let Some((slot, buffer, lease)) = class.try_create(zeroed) else {
self.metrics.exhausted_total.get_or_create(&label).inc();
return None;
};
self.metrics.created.get_or_create(&label).inc();
Some(Allocation {
buffer,
is_new: true,
lease,
slot,
})
}
}
#[derive(Clone)]
pub struct BufferPool {
inner: Arc<BufferPoolInner>,
}
impl std::fmt::Debug for BufferPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufferPool")
.field("config", &self.inner.config)
.field("num_classes", &self.inner.classes.len())
.finish()
}
}
static NEXT_SIZE_CLASS_ID: AtomicUsize = AtomicUsize::new(0);
impl BufferPool {
pub(crate) fn new(config: BufferPoolConfig, registry: &mut impl Register) -> Self {
config.validate();
let metrics = PoolMetrics::new(registry);
let num_classes =
BufferPoolConfig::num_classes(config.min_size.get(), config.max_size.get());
let mut classes = Vec::with_capacity(num_classes);
let thread_cache_capacity = config.resolve_thread_cache_capacity();
for i in 0..num_classes {
let size = BufferPoolConfig::class_size(config.min_size.get(), i);
let class_id = NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed);
let class = SizeClassHandle::new(
class_id,
size,
config.alignment.get(),
config.max_per_class,
config.parallelism,
thread_cache_capacity,
config.prefill,
);
classes.push(class);
}
if config.prefill {
for class in &classes {
let label = SizeClassLabel {
size_class: class.size as u64,
};
metrics
.created
.get_or_create(&label)
.set(config.max_per_class.get() as i64);
}
}
Self {
inner: Arc::new(BufferPoolInner {
config,
classes,
metrics,
}),
}
}
#[inline(always)]
fn class_index(&self, size: usize) -> Option<usize> {
let min_size = self.inner.config.min_size.get();
let max_size = self.inner.config.max_size.get();
if size > max_size {
return None;
}
if size <= min_size {
return Some(0);
}
Some(
size.next_power_of_two()
.trailing_zeros()
.wrapping_sub(min_size.trailing_zeros()) as usize,
)
}
#[inline]
fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
let class_index = self.class_index(capacity);
if class_index.is_none() {
self.inner.metrics.oversized_total.inc();
}
class_index
}
#[inline(always)]
pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
if capacity < self.inner.config.pool_min_size {
let size = capacity.max(1);
return Ok(IoBufMut::with_alignment(size, self.inner.config.alignment));
}
let class_index = self
.class_index_or_record_oversized(capacity)
.ok_or(PoolError::Oversized)?;
let buffer = self
.inner
.try_alloc(class_index, false)
.map(|allocation| {
PooledBufMut::new(allocation.buffer, allocation.lease, allocation.slot)
})
.ok_or(PoolError::Exhausted)?;
Ok(IoBufMut::from_pooled(buffer))
}
pub fn alloc(&self, capacity: usize) -> IoBufMut {
self.try_alloc(capacity).unwrap_or_else(|_| {
let size = capacity.max(self.inner.config.min_size.get());
IoBufMut::with_alignment(size, self.inner.config.alignment)
})
}
pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
let mut buf = self.alloc(len);
unsafe { buf.set_len(len) };
buf
}
pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
if len < self.inner.config.pool_min_size {
let size = len.max(1);
let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
buf.truncate(len);
return Ok(buf);
}
let class_index = self
.class_index_or_record_oversized(len)
.ok_or(PoolError::Oversized)?;
let allocation = self
.inner
.try_alloc(class_index, true)
.ok_or(PoolError::Exhausted)?;
let mut buf = IoBufMut::from_pooled(PooledBufMut::new(
allocation.buffer,
allocation.lease,
allocation.slot,
));
if allocation.is_new {
unsafe { buf.set_len(len) };
} else {
unsafe {
std::ptr::write_bytes(buf.as_mut_ptr(), 0, len);
buf.set_len(len);
}
}
Ok(buf)
}
pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
self.try_alloc_zeroed(len).unwrap_or_else(|_| {
let size = len.max(self.inner.config.min_size.get());
let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
buf.truncate(len);
buf
})
}
pub fn config(&self) -> &BufferPoolConfig {
&self.inner.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
iobuf::{cache_line_size, freelist, IoBuf},
telemetry::metrics::Registry,
};
use bytes::{Buf, BufMut};
use commonware_utils::NZU32;
use std::{
sync::{mpsc, Arc},
thread,
};
fn test_size_class(size: usize, alignment: usize) -> SizeClassHandle {
SizeClassHandle::new(
NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
size,
alignment,
NZU32!(8),
NZUsize!(4),
4,
false,
)
}
fn test_pool(config: BufferPoolConfig) -> BufferPool {
let mut registry = Registry::default();
BufferPool::new(config, &mut registry)
}
fn test_config(min_size: usize, max_size: usize, max_per_class: u32) -> BufferPoolConfig {
BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(min_size),
max_size: NZUsize!(max_size),
max_per_class: NZU32!(max_per_class),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
prefill: false,
alignment: NZUsize!(page_size()),
}
}
fn size_class_strong_count(class: &SizeClassHandle) -> usize {
unsafe { class.token.retain() };
let arc = unsafe { Arc::from_raw(class.token.ptr.as_ptr()) };
Arc::strong_count(&arc) - 1
}
fn get_allocated(pool: &BufferPool, size: usize) -> usize {
let class_index = pool.class_index(size).unwrap();
let class = &pool.inner.classes[class_index];
get_global_created(class) - get_global_len(class) - get_local_len(class)
}
fn get_available(pool: &BufferPool, size: usize) -> i64 {
let class_index = pool.class_index(size).unwrap();
let class = &pool.inner.classes[class_index];
(get_global_len(class) + get_local_len(class)) as i64
}
fn get_global_len(class: &SizeClass) -> usize {
freelist::tests::len(&class.global)
}
fn get_global_created(class: &SizeClass) -> usize {
freelist::tests::created(&class.global)
}
fn get_local_len(class: &SizeClass) -> usize {
BufferPoolThreadCache::TLS_SIZE_CLASS_CACHES.with(|caches| {
let caches = unsafe { &*caches.get() };
caches
.bins
.get(class.class_id)
.and_then(Option::as_ref)
.map_or(0, |cache| cache.len)
})
}
#[test]
fn test_page_size() {
let size = page_size();
assert!(size >= 4096);
assert!(size.is_power_of_two());
}
#[test]
fn test_config_validation() {
let page = page_size();
let config = test_config(page, page * 4, 10);
config.validate();
}
#[test]
#[should_panic(expected = "thread_cache_capacity (11) must be <= max_per_class (10)")]
fn test_config_invalid_thread_cache_capacity() {
let page = page_size();
let config = test_config(page, page * 4, 10).with_thread_cache_capacity(NZUsize!(11));
config.validate();
}
#[test]
#[should_panic(expected = "min_size must be a power of two")]
fn test_config_invalid_min_size() {
let config = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(3000),
max_size: NZUsize!(8192),
max_per_class: NZU32!(10),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
prefill: false,
alignment: NZUsize!(page_size()),
};
config.validate();
}
#[test]
fn test_pool_class_index() {
let page = page_size();
let pool = test_pool(test_config(page, page * 8, 10));
assert_eq!(pool.inner.classes.len(), 4);
assert_eq!(pool.class_index(1), Some(0));
assert_eq!(pool.class_index(page), Some(0));
assert_eq!(pool.class_index(page + 1), Some(1));
assert_eq!(pool.class_index(page * 2), Some(1));
assert_eq!(pool.class_index(page * 4 + 1), Some(3));
assert_eq!(pool.class_index(page * 8 - 1), Some(3));
assert_eq!(pool.class_index(page * 8), Some(3));
assert_eq!(pool.class_index(page * 8 + 1), None);
}
#[test]
fn test_pool_alloc_and_return() {
let page = page_size();
let pool = test_pool(test_config(page, page * 4, 2));
let buf = pool.try_alloc(page).unwrap();
assert!(buf.capacity() >= page);
assert_eq!(buf.len(), 0);
drop(buf);
let buf2 = pool.try_alloc(page).unwrap();
assert!(buf2.capacity() >= page);
assert_eq!(buf2.len(), 0);
}
#[test]
fn test_alloc_len_sets_len() {
let page = page_size();
let pool = test_pool(test_config(page, page * 4, 2));
let mut buf = unsafe { pool.alloc_len(100) };
assert_eq!(buf.len(), 100);
buf.as_mut().fill(0xAB);
let frozen = buf.freeze();
assert_eq!(frozen.as_ref(), &[0xAB; 100]);
}
#[test]
fn test_alloc_zeroed_sets_len_and_zeros() {
let page = page_size();
let pool = test_pool(test_config(page, page * 4, 2));
let buf = pool.alloc_zeroed(100);
assert_eq!(buf.len(), 100);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_try_alloc_zeroed_sets_len_and_zeros() {
let page = page_size();
let pool = test_pool(test_config(page, page * 4, 2));
let buf = pool.try_alloc_zeroed(page).unwrap();
assert!(buf.is_pooled());
assert_eq!(buf.len(), page);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
let page = page_size();
let pool = test_pool(test_config(page, page, 1));
let _pooled = pool.try_alloc(page).unwrap();
let buf = pool.alloc_zeroed(100);
assert!(!buf.is_pooled());
assert_eq!(buf.len(), 100);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
let page = page_size();
let pool = test_pool(test_config(page, page, 1));
let mut first = pool.alloc_zeroed(page);
assert!(first.is_pooled());
assert!(first.as_ref().iter().all(|&b| b == 0));
first.as_mut().fill(0xAB);
drop(first);
let second = pool.alloc_zeroed(page);
assert!(second.is_pooled());
assert_eq!(second.len(), page);
assert!(second.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_requests_smaller_than_pool_min_size_bypass_pool() {
let pool = test_pool(BufferPoolConfig {
pool_min_size: 512,
min_size: NZUsize!(512),
max_size: NZUsize!(1024),
max_per_class: NZU32!(2),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
prefill: false,
alignment: NZUsize!(128),
});
let buf = pool.try_alloc(200).unwrap();
assert!(!buf.is_pooled());
assert_eq!(buf.capacity(), 200);
let zeroed = pool.try_alloc_zeroed(200).unwrap();
assert!(!zeroed.is_pooled());
assert_eq!(zeroed.len(), 200);
assert!(zeroed.as_ref().iter().all(|&b| b == 0));
let pooled = pool.try_alloc(512).unwrap();
assert!(pooled.is_pooled());
assert_eq!(pooled.capacity(), 512);
}
#[test]
fn test_pool_size_classes() {
let page = page_size();
let pool = test_pool(test_config(page, page * 4, 10));
let buf1 = pool.try_alloc(page).unwrap();
assert_eq!(buf1.capacity(), page);
let buf2 = pool.try_alloc(page + 1).unwrap();
assert_eq!(buf2.capacity(), page * 2);
let buf3 = pool.try_alloc(page * 3).unwrap();
assert_eq!(buf3.capacity(), page * 4);
}
#[test]
fn test_prefill() {
let page = NZUsize!(page_size());
let pool = test_pool(BufferPoolConfig {
pool_min_size: 0,
min_size: page,
max_size: page,
max_per_class: NZU32!(5),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
prefill: true,
alignment: page,
});
let mut bufs = Vec::new();
for _ in 0..5 {
bufs.push(pool.try_alloc(page.get()).expect("alloc should succeed"));
}
assert!(pool.try_alloc(page.get()).is_err());
}
#[test]
fn test_config_for_network() {
let config = BufferPoolConfig::for_network();
config.validate();
assert_eq!(config.pool_min_size, 0);
assert_eq!(config.min_size.get(), 1024);
assert_eq!(config.max_size.get(), 128 * 1024);
assert_eq!(config.max_per_class.get(), 4096);
assert_eq!(config.parallelism, NZUsize!(1));
assert_eq!(
config.thread_cache_config,
BufferPoolThreadCacheConfig::Enabled(None)
);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), 1);
}
#[test]
fn test_config_for_storage() {
let config = BufferPoolConfig::for_storage();
config.validate();
assert_eq!(config.pool_min_size, 0);
assert_eq!(config.min_size.get(), page_size());
assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
assert_eq!(config.max_per_class.get(), 64);
assert_eq!(config.parallelism, NZUsize!(1));
assert_eq!(
config.thread_cache_config,
BufferPoolThreadCacheConfig::Enabled(None)
);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), 1);
}
#[test]
fn test_storage_config_supports_default_allocations() {
let pool = test_pool(BufferPoolConfig::for_storage());
let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
assert_eq!(buf.capacity(), 8 * 1024 * 1024);
}
#[test]
fn test_config_builders() {
let page = NZUsize!(page_size());
let config = BufferPoolConfig::for_storage()
.with_pool_min_size(1024)
.with_max_per_class(NZU32!(64))
.with_parallelism(NZUsize!(4))
.with_thread_cache_capacity(NZUsize!(8))
.with_prefill(true)
.with_min_size(page)
.with_max_size(NZUsize!(128 * 1024));
config.validate();
assert_eq!(config.pool_min_size, 1024);
assert_eq!(config.min_size, page);
assert_eq!(config.max_size.get(), 128 * 1024);
assert_eq!(config.max_per_class.get(), 64);
assert_eq!(config.parallelism, NZUsize!(4));
assert_eq!(
config.thread_cache_config,
BufferPoolThreadCacheConfig::Enabled(Some(NZUsize!(8)))
);
assert!(config.prefill);
assert_eq!(config.alignment.get(), 1);
let aligned = BufferPoolConfig::for_network()
.with_pool_min_size(256)
.with_parallelism(NZUsize!(4))
.with_alignment(NZUsize!(256))
.with_min_size(NZUsize!(256));
aligned.validate();
assert_eq!(aligned.parallelism, NZUsize!(4));
assert_eq!(
aligned.thread_cache_config,
BufferPoolThreadCacheConfig::Enabled(None)
);
assert_eq!(aligned.alignment.get(), 256);
assert_eq!(aligned.min_size.get(), 256);
}
#[test]
fn test_parallelism_policy_resolves_thread_cache_capacity() {
let page = page_size();
let pool = test_pool(test_config(page, page, 64).with_parallelism(NZUsize!(8)));
let class_index = pool.class_index(page).unwrap();
assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 4);
let pool = test_pool(test_config(page, page, 4096).with_parallelism(NZUsize!(8)));
let class_index = pool.class_index(page).unwrap();
assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 256);
}
#[test]
fn test_auto_thread_cache_disables_when_parallelism_exceeds_budget() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2).with_parallelism(NZUsize!(8)));
let class_index = pool.class_index(page).unwrap();
let class = &pool.inner.classes[class_index];
assert_eq!(class.thread_cache_capacity, 0);
let first = pool.try_alloc(page).expect("first tracked allocation");
let second = pool.try_alloc(page).expect("second tracked allocation");
let pool_for_thread = pool.clone();
let (returned_tx, returned_rx) = mpsc::channel();
let (release_tx, release_rx) = mpsc::channel();
let handle = thread::spawn(move || {
drop(first);
drop(second);
returned_tx.send(()).expect("signal returned buffers");
release_rx.recv().expect("release worker");
drop(pool_for_thread);
});
returned_rx.recv().expect("wait for returned buffers");
let _first = pool.try_alloc(page).expect("first global reuse");
let _second = pool.try_alloc(page).expect("second global reuse");
release_tx.send(()).expect("release worker");
handle.join().expect("worker should not panic");
}
#[test]
fn test_parallelism_policy_resolves_freelist_stripes() {
let page = page_size();
let pool = test_pool(test_config(page, page, 64).with_parallelism(NZUsize!(16)));
let class_index = pool.class_index(page).unwrap();
assert_eq!(
freelist::tests::num_words(&pool.inner.classes[class_index].global),
16
);
let pool = test_pool(test_config(page, page, 12).with_parallelism(NZUsize!(9)));
let class_index = pool.class_index(page).unwrap();
assert_eq!(
freelist::tests::num_words(&pool.inner.classes[class_index].global),
8
);
let pool = test_pool(
test_config(page, page, 64)
.with_parallelism(NZUsize!(16))
.with_thread_cache_disabled(),
);
let class_index = pool.class_index(page).unwrap();
assert_eq!(
freelist::tests::num_words(&pool.inner.classes[class_index].global),
16
);
}
#[test]
fn test_fixed_thread_cache_capacity_overrides_auto_capacity() {
let page = page_size();
let pool = test_pool(
test_config(page, page, 64)
.with_parallelism(NZUsize!(8))
.with_thread_cache_capacity(NZUsize!(7)),
);
let class_index = pool.class_index(page).unwrap();
assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 7);
assert_eq!(
freelist::tests::num_words(&pool.inner.classes[class_index].global),
8
);
}
#[test]
fn test_disabled_thread_cache_does_not_retain_buffers_locally() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2).with_thread_cache_disabled());
let class_index = pool.class_index(page).unwrap();
let class = &pool.inner.classes[class_index];
let tracked = pool.try_alloc(page).expect("tracked allocation");
drop(tracked);
assert_eq!(class.thread_cache_capacity, 0);
assert_eq!(get_local_len(class), 0);
assert_eq!(get_global_len(class), 1);
}
#[test]
fn test_thread_cache_flush_moves_local_entries_to_global() {
let page = page_size();
let pool =
test_pool(test_config(page, page * 2, 8).with_thread_cache_capacity(NZUsize!(4)));
let small_index = pool.class_index(page).unwrap();
let large_index = pool.class_index(page + 1).unwrap();
let small_class = &pool.inner.classes[small_index];
let large_class = &pool.inner.classes[large_index];
let small = pool.try_alloc(page).expect("tracked allocation");
let large = pool.try_alloc(page + 1).expect("tracked allocation");
drop(small);
drop(large);
assert_eq!(get_local_len(small_class), 1);
assert_eq!(get_local_len(large_class), 1);
assert_eq!(get_global_len(small_class), 0);
assert_eq!(get_global_len(large_class), 0);
BufferPoolThreadCache::flush();
assert_eq!(get_local_len(small_class), 0);
assert_eq!(get_local_len(large_class), 0);
assert_eq!(get_global_len(small_class), 1);
assert_eq!(get_global_len(large_class), 1);
}
#[test]
fn test_config_with_budget_bytes() {
let config = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(4),
max_size: NZUsize!(16),
max_per_class: NZU32!(1),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
prefill: false,
alignment: NZUsize!(4),
}
.with_budget_bytes(NZUsize!(280));
assert_eq!(config.max_per_class.get(), 10);
let small_budget = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(4),
max_size: NZUsize!(16),
max_per_class: NZU32!(1),
parallelism: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::Enabled(None),
prefill: false,
alignment: NZUsize!(4),
}
.with_budget_bytes(NZUsize!(10));
assert_eq!(small_budget.max_per_class.get(), 1);
}
#[test]
fn test_pool_error_display() {
assert_eq!(
PoolError::Oversized.to_string(),
"requested capacity exceeds maximum buffer size"
);
assert_eq!(
PoolError::Exhausted.to_string(),
"pool exhausted for required size class"
);
}
#[test]
fn test_pool_debug_and_config_accessor() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let debug = format!("{pool:?}");
assert!(debug.contains("BufferPool"));
assert!(debug.contains("num_classes"));
assert_eq!(pool.config().min_size.get(), page);
}
#[test]
fn test_return_buffer_local_overflow_spills_to_global() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let class_index = pool
.class_index(page)
.expect("class exists for page-sized buffer");
let tracked1 = pool.try_alloc(page).expect("first tracked allocation");
let tracked2 = pool.try_alloc(page).expect("second tracked allocation");
drop(tracked1);
assert_eq!(get_global_len(&pool.inner.classes[class_index]), 0);
assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
drop(tracked2);
assert_eq!(get_global_len(&pool.inner.classes[class_index]), 1);
assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
assert_eq!(get_available(&pool, page), 2);
}
#[test]
fn test_small_local_cache_overflow_preserves_locality() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let mut tracked1 = pool.try_alloc(page).expect("first tracked allocation");
let ptr1 = tracked1.as_mut_ptr();
let mut tracked2 = pool.try_alloc(page).expect("second tracked allocation");
let ptr2 = tracked2.as_mut_ptr();
drop(tracked1);
drop(tracked2);
let mut reused_local = pool.try_alloc(page).expect("reuse from local cache");
assert_eq!(reused_local.as_mut_ptr(), ptr1);
let mut reused_global = pool.try_alloc(page).expect("reuse from global freelist");
assert_eq!(reused_global.as_mut_ptr(), ptr2);
}
#[test]
fn test_large_local_cache_batches_overflow_and_refill() {
let page = page_size();
let threads = std::thread::available_parallelism().map_or(1, NonZeroUsize::get);
let max_per_class =
u32::try_from(threads * 8).expect("test capacity must fit in u32 slot ids");
let pool = test_pool(test_config(page, page, max_per_class));
let class_index = pool
.class_index(page)
.expect("class exists for page-sized buffer");
let class = &pool.inner.classes[class_index];
assert!(class.thread_cache_capacity >= MIN_TLS_BATCH_CAPACITY);
let mut bufs = Vec::new();
for _ in 0..class.thread_cache_capacity + 1 {
bufs.push(pool.try_alloc(page).expect("tracked allocation"));
}
for buf in bufs {
drop(buf);
}
assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 + 1);
assert_eq!(get_global_len(class), class.thread_cache_capacity / 2);
let mut reused = Vec::new();
for _ in 0..class.thread_cache_capacity / 2 + 1 {
reused.push(pool.try_alloc(page).expect("local reuse"));
}
assert_eq!(get_local_len(class), 0);
assert_eq!(get_global_len(class), class.thread_cache_capacity / 2);
let _global = pool.try_alloc(page).expect("global reuse with refill");
assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 - 1);
assert_eq!(get_global_len(class), 0);
}
#[test]
fn test_global_batch_alloc_stops_when_global_runs_empty() {
let class = test_size_class(64, 64);
let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
class.global.put(slot, buffer);
let (buffer, lease, slot) = BufferPoolThreadCache::pop(&class).expect("global allocation");
assert_eq!(get_local_len(&class), 0);
assert_eq!(get_global_len(&class), 0);
lease.return_global(slot, buffer);
}
#[test]
fn test_size_class_leases_use_raw_arc_tokens_across_cache_paths() {
let class = test_size_class(64, 64);
let mut cache = TlsSizeClassCache::new(class.token, MIN_TLS_BATCH_CAPACITY);
assert_eq!(size_class_strong_count(&class), 1);
let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
let lease = SizeClassLease::retain(&class);
assert_eq!(size_class_strong_count(&class), 2);
cache.push(lease, slot, buffer);
assert_eq!(size_class_strong_count(&class), 2);
let (entry, lease) = cache.pop(&class).expect("local cache pop");
assert_eq!(size_class_strong_count(&class), 2);
lease.return_global(entry.slot, entry.buffer);
assert_eq!(size_class_strong_count(&class), 1);
for _ in 0..2 {
let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
class.global.put(slot, buffer);
}
let (entry, lease) = cache.pop(&class).expect("global refill");
assert_eq!(size_class_strong_count(&class), 3);
lease.return_global(entry.slot, entry.buffer);
assert_eq!(size_class_strong_count(&class), 2);
drop(cache);
assert_eq!(size_class_strong_count(&class), 1);
}
#[test]
fn test_tls_size_class_cache_push_tolerates_empty_spill() {
let class = test_size_class(64, 64);
let (slot, buffer) = class.global.try_create(false).expect("slot reservation");
let lease = SizeClassLease::retain(&class);
let mut cache = TlsSizeClassCache::new(class.token, 0);
cache.push(lease, slot, buffer);
assert_eq!(cache.len, 0);
drop(cache);
}
#[test]
fn test_global_freelist_returns_each_slot_once() {
let class = SizeClassHandle::new(
NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
64,
64,
NZU32!(2),
NZUsize!(1),
1,
false,
);
let (slot0, buffer0) = class.global.try_create(false).expect("first slot");
let ptr0 = buffer0.as_ptr();
let (slot1, buffer1) = class.global.try_create(false).expect("second slot");
let ptr1 = buffer1.as_ptr();
class.global.put(slot0, buffer0);
class.global.put(slot1, buffer1);
let mut popped = [
class.global.take().expect("first pop"),
class.global.take().expect("second pop"),
];
popped.sort_by_key(|(slot, _)| *slot);
assert_eq!(popped[0].0, slot0);
assert_eq!(popped[0].1.as_ptr(), ptr0);
assert_eq!(popped[1].0, slot1);
assert_eq!(popped[1].1.as_ptr(), ptr1);
assert!(class.global.take().is_none());
for (slot, buffer) in popped {
class.global.put(slot, buffer);
}
}
#[test]
fn test_pooled_debug_and_empty_into_bytes_paths() {
let page = page_size();
let class = test_size_class(page, page);
let (slot0, buffer0, class0) = class.try_create(false).expect("first slot");
let (slot1, buffer1, class1) = class.try_create(false).expect("second slot");
let (slot2, buffer2, class2) = class.try_create(false).expect("third slot");
let pooled_mut_debug = {
let pooled_mut = PooledBufMut::new(buffer0, class0, slot0);
format!("{pooled_mut:?}")
};
assert!(pooled_mut_debug.contains("PooledBufMut"));
assert!(pooled_mut_debug.contains("cursor"));
let empty_from_mut = PooledBufMut::new(buffer1, class1, slot1);
assert!(empty_from_mut.into_bytes().is_empty());
let pooled = PooledBufMut::new(buffer2, class2, slot2).into_pooled();
let pooled_debug = format!("{pooled:?}");
assert!(pooled_debug.contains("PooledBuf"));
assert!(pooled_debug.contains("capacity"));
assert!(pooled.into_bytes().is_empty());
BufferPoolThreadCache::flush();
}
#[test]
fn test_freeze_returns_buffer_to_pool() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 0);
let buf = pool.try_alloc(page).unwrap();
assert_eq!(get_allocated(&pool, page), 1);
assert_eq!(get_available(&pool, page), 0);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_refcount_and_copy_to_bytes_paths() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
{
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&[0xAA; 100]);
let iobuf = buf.freeze();
let clone = iobuf.clone();
let slice = iobuf.slice(10..40);
let empty = iobuf.slice(10..10);
assert!(empty.is_empty());
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 1);
drop(slice);
assert_eq!(get_allocated(&pool, page), 1);
drop(clone);
assert_eq!(get_allocated(&pool, page), 0);
}
{
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&[0x42; 100]);
let mut iobuf = buf.freeze();
let zero = iobuf.copy_to_bytes(0);
assert!(zero.is_empty());
assert_eq!(iobuf.remaining(), 100);
let partial = iobuf.copy_to_bytes(30);
assert_eq!(&partial[..], &[0x42; 30]);
assert_eq!(iobuf.remaining(), 70);
let rest = iobuf.copy_to_bytes(70);
assert_eq!(&rest[..], &[0x42; 70]);
assert_eq!(iobuf.remaining(), 0);
let empty = iobuf.copy_to_bytes(0);
assert!(empty.is_empty());
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 1);
drop(zero);
drop(partial);
assert_eq!(get_allocated(&pool, page), 1);
drop(rest);
assert_eq!(get_allocated(&pool, page), 0);
}
{
let buf = pool.try_alloc(page).unwrap();
let mut iobufmut = buf;
iobufmut.put_slice(&[0x7E; 100]);
let zero = iobufmut.copy_to_bytes(0);
assert!(zero.is_empty());
assert_eq!(iobufmut.remaining(), 100);
let partial = iobufmut.copy_to_bytes(30);
assert_eq!(&partial[..], &[0x7E; 30]);
assert_eq!(iobufmut.remaining(), 70);
let rest = iobufmut.copy_to_bytes(70);
assert_eq!(&rest[..], &[0x7E; 70]);
assert_eq!(iobufmut.remaining(), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 1);
drop(zero);
drop(partial);
assert_eq!(get_allocated(&pool, page), 1);
drop(rest);
assert_eq!(get_allocated(&pool, page), 0);
}
}
#[test]
fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let buf = pool.try_alloc(page).unwrap();
assert_eq!(get_allocated(&pool, page), 1);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
let iobufmut: IoBufMut = iobuf.into();
assert_eq!(
get_allocated(&pool, page),
1,
"pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
);
assert_eq!(get_available(&pool, page), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xEE; page]);
let iobuf = buf.freeze();
let iobufmut: IoBufMut = iobuf.into();
assert_eq!(iobufmut.len(), page);
assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
assert_eq!(get_allocated(&pool, page), 1);
assert_eq!(get_available(&pool, page), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_try_into_mut_recycles_full_unique_view() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xAB; page]);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
let recycled = iobuf
.try_into_mut()
.expect("unique full-view pooled buffer should recycle");
assert_eq!(recycled.len(), page);
assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
assert_eq!(recycled.capacity(), page);
assert_eq!(get_allocated(&pool, page), 1);
drop(recycled);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xCD; page]);
let iobuf = buf.freeze();
let sliced = iobuf.slice(1..page);
drop(iobuf);
let recycled = sliced
.try_into_mut()
.expect("unique sliced pooled buffer should recycle");
assert_eq!(recycled.len(), page - 1);
assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
assert_eq!(recycled.capacity(), page - 1);
assert_eq!(get_allocated(&pool, page), 1);
drop(recycled);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xEF; page]);
let iobuf = buf.freeze();
let cloned = iobuf.clone();
let iobuf = iobuf
.try_into_mut()
.expect_err("shared pooled buffer must not convert to mutable");
drop(cloned);
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 0);
assert!(get_available(&pool, page) >= 1);
}
#[test]
fn test_multithreaded_alloc_freeze_return() {
let page = page_size();
let pool = Arc::new(test_pool(test_config(page, page, 100)));
let mut handles = vec![];
cfg_if::cfg_if! {
if #[cfg(miri)] {
let iterations = 100;
} else {
let iterations = 1000;
}
}
for _ in 0..10 {
let pool = pool.clone();
let handle = thread::spawn(move || {
for _ in 0..iterations {
let buf = pool.try_alloc(page).unwrap();
let iobuf = buf.freeze();
let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
drop(iobuf);
for clone in clones {
drop(clone);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let _buf = pool
.try_alloc(page)
.expect("pool should remain usable after multithreaded test");
}
#[test]
fn test_cross_thread_buffer_return() {
let page = page_size();
let pool = test_pool(test_config(page, page, 100));
let (tx, rx) = mpsc::channel();
for _ in 0..50 {
let buf = pool.try_alloc(page).unwrap();
let iobuf = buf.freeze();
tx.send(iobuf).unwrap();
}
drop(tx);
let handle = thread::spawn(move || {
while let Ok(iobuf) = rx.recv() {
drop(iobuf);
}
let class_index = pool
.class_index(page)
.expect("class exists for page-sized buffer");
assert_eq!(get_local_len(&pool.inner.classes[class_index]), 50);
assert_eq!(get_global_len(&pool.inner.classes[class_index]), 0);
for _ in 0..50 {
let _buf = pool
.try_alloc(page)
.expect("dropping thread should be able to reuse locally returned buffers");
}
});
handle.join().unwrap();
}
#[test]
fn test_thread_exit_flushes_local_bin() {
let page = page_size();
let pool = Arc::new(test_pool(test_config(page, page, 1)));
let worker_pool = pool.clone();
thread::spawn(move || {
let buf = worker_pool
.try_alloc(page)
.expect("worker should allocate tracked buffer");
drop(buf);
})
.join()
.expect("worker thread should exit cleanly");
let class_index = pool
.class_index(page)
.expect("class exists for page-sized buffer");
assert_eq!(get_global_len(&pool.inner.classes[class_index]), 1);
assert_eq!(get_local_len(&pool.inner.classes[class_index]), 0);
let _buf = pool
.try_alloc(page)
.expect("thread-exited local buffer should be reusable");
}
#[test]
fn test_pool_drop_drains_global_freelist() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let class_index = pool
.class_index(page)
.expect("class exists for page-sized buffer");
let class = &pool.inner.classes[class_index];
unsafe { class.token.retain() };
let class = SizeClassHandle { token: class.token };
let buf1 = pool.try_alloc(page).unwrap();
let buf2 = pool.try_alloc(page).unwrap();
drop(buf1);
drop(buf2);
assert_eq!(get_global_len(&class), 1);
assert_eq!(get_local_len(&class), 1);
drop(pool);
assert_eq!(get_global_len(&class), 0);
assert_eq!(get_local_len(&class), 1);
assert_eq!(get_global_created(&class), 2);
}
#[test]
fn test_pool_dropped_before_buffer() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&[0u8; 100]);
let iobuf = buf.freeze();
drop(pool);
assert_eq!(iobuf.len(), 100);
drop(iobuf);
}
#[test]
fn test_pool_exhaustion_and_recovery() {
let page = page_size();
let pool = test_pool(test_config(page, page, 3));
let buf1 = pool.try_alloc(page).expect("first alloc");
let buf2 = pool.try_alloc(page).expect("second alloc");
let buf3 = pool.try_alloc(page).expect("third alloc");
assert!(pool.try_alloc(page).is_err(), "pool should be exhausted");
drop(buf1);
let buf4 = pool.try_alloc(page).expect("alloc after return");
assert!(pool.try_alloc(page).is_err(), "pool exhausted again");
drop(buf2);
drop(buf3);
drop(buf4);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 3);
let _buf5 = pool.try_alloc(page).expect("reuse from freelist");
assert_eq!(get_available(&pool, page), 2);
}
#[test]
fn test_try_alloc_errors() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let result = pool.try_alloc(page * 10);
assert_eq!(result.unwrap_err(), PoolError::Oversized);
let _buf1 = pool.try_alloc(page).unwrap();
let _buf2 = pool.try_alloc(page).unwrap();
let result = pool.try_alloc(page);
assert_eq!(result.unwrap_err(), PoolError::Exhausted);
}
#[test]
fn test_try_alloc_zeroed_errors() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let result = pool.try_alloc_zeroed(page * 10);
assert_eq!(result.unwrap_err(), PoolError::Oversized);
let _buf1 = pool.try_alloc_zeroed(page).unwrap();
let _buf2 = pool.try_alloc_zeroed(page).unwrap();
let result = pool.try_alloc_zeroed(page);
assert_eq!(result.unwrap_err(), PoolError::Exhausted);
}
#[test]
fn test_fallback_allocation() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let buf1 = pool.try_alloc(page).unwrap();
let buf2 = pool.try_alloc(page).unwrap();
assert!(buf1.is_pooled());
assert!(buf2.is_pooled());
let mut fallback_exhausted = pool.alloc(page);
assert!(!fallback_exhausted.is_pooled());
assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
let mut fallback_oversized = pool.alloc(page * 10);
assert!(!fallback_oversized.is_pooled());
assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
assert_eq!(get_allocated(&pool, page), 2);
drop(fallback_exhausted);
drop(fallback_oversized);
assert_eq!(get_allocated(&pool, page), 2);
drop(buf1);
drop(buf2);
assert_eq!(get_allocated(&pool, page), 0);
}
#[test]
fn test_is_pooled() {
let page = page_size();
let pool = test_pool(test_config(page, page, 10));
let pooled = pool.try_alloc(page).unwrap();
assert!(pooled.is_pooled());
let owned = IoBufMut::with_capacity(100);
assert!(!owned.is_pooled());
}
#[test]
fn test_iobuf_is_pooled() {
let page = page_size();
let pool = test_pool(test_config(page, page, 2));
let pooled = pool.try_alloc(page).unwrap().freeze();
assert!(pooled.is_pooled());
let fallback = pool.alloc(page * 10).freeze();
assert!(!fallback.is_pooled());
let bytes = IoBuf::copy_from_slice(b"hello");
assert!(!bytes.is_pooled());
}
#[test]
fn test_buffer_alignment() {
let page = page_size();
let cache_line = cache_line_size();
cfg_if::cfg_if! {
if #[cfg(miri)] {
let storage_config = BufferPoolConfig {
max_per_class: NZU32!(32),
..BufferPoolConfig::for_storage().with_alignment(NZUsize!(page))
};
let network_config = BufferPoolConfig {
max_per_class: NZU32!(32),
..BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line))
};
} else {
let storage_config =
BufferPoolConfig::for_storage().with_alignment(NZUsize!(page));
let network_config =
BufferPoolConfig::for_network().with_alignment(NZUsize!(cache_line));
}
}
let storage_buffer_pool = test_pool(storage_config);
let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
assert_eq!(
buf.as_mut_ptr() as usize % page,
0,
"storage buffer not page-aligned"
);
let network_buffer_pool = test_pool(network_config);
let mut buf = network_buffer_pool.try_alloc(100).unwrap();
assert_eq!(
buf.as_mut_ptr() as usize % cache_line,
0,
"network buffer not cache-line aligned"
);
}
}