use super::IoBufMut;
use crate::iobuf::aligned::{AlignedBuffer, PooledBufMut};
use commonware_utils::NZUsize;
use crossbeam_queue::ArrayQueue;
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{counter::Counter, family::Family, gauge::Gauge},
registry::Registry,
};
use std::{
cell::UnsafeCell,
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
const MIN_THREAD_CACHE_BATCHING_CAPACITY: usize = 4;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolError {
Oversized,
Exhausted,
}
impl std::fmt::Display for PoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
Self::Exhausted => write!(f, "pool exhausted for required size class"),
}
}
}
impl std::error::Error for PoolError {}
#[cfg(unix)]
fn page_size() -> usize {
let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if size <= 0 {
4096 } else {
size as usize
}
}
#[cfg(not(unix))]
#[allow(clippy::missing_const_for_fn)]
fn page_size() -> usize {
4096
}
const fn cache_line_size() -> usize {
cfg_if::cfg_if! {
if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
128
} else {
64
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BufferPoolThreadCacheConfig {
Disabled,
Fixed(NonZeroUsize),
ForParallelism(NonZeroUsize),
}
#[derive(Debug, Clone)]
pub struct BufferPoolConfig {
pub pool_min_size: usize,
pub min_size: NonZeroUsize,
pub max_size: NonZeroUsize,
pub max_per_class: NonZeroUsize,
pub prefill: bool,
pub alignment: NonZeroUsize,
pub(crate) thread_cache_config: BufferPoolThreadCacheConfig,
}
impl BufferPoolConfig {
pub const fn for_network() -> Self {
let cache_line = NZUsize!(cache_line_size());
Self {
pool_min_size: 1024,
min_size: NZUsize!(1024),
max_size: NZUsize!(64 * 1024),
max_per_class: NZUsize!(4096),
prefill: false,
alignment: cache_line,
thread_cache_config: BufferPoolThreadCacheConfig::Disabled,
}
}
pub fn for_storage() -> Self {
let page = NZUsize!(page_size());
Self {
pool_min_size: 1024,
min_size: page,
max_size: NZUsize!(8 * 1024 * 1024),
max_per_class: NZUsize!(64),
prefill: false,
alignment: page,
thread_cache_config: BufferPoolThreadCacheConfig::Disabled,
}
}
pub const fn with_pool_min_size(mut self, pool_min_size: usize) -> Self {
self.pool_min_size = pool_min_size;
self
}
pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
self.min_size = min_size;
self
}
pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
self.max_size = max_size;
self
}
pub const fn with_max_per_class(mut self, max_per_class: NonZeroUsize) -> Self {
self.max_per_class = max_per_class;
self
}
pub const fn with_thread_cache_capacity(mut self, thread_cache_capacity: NonZeroUsize) -> Self {
self.thread_cache_config = BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity);
self
}
pub const fn with_thread_cache_for_parallelism(mut self, parallelism: NonZeroUsize) -> Self {
self.thread_cache_config = BufferPoolThreadCacheConfig::ForParallelism(parallelism);
self
}
pub const fn with_thread_cache_disabled(mut self) -> Self {
self.thread_cache_config = BufferPoolThreadCacheConfig::Disabled;
self
}
pub const fn with_prefill(mut self, prefill: bool) -> Self {
self.prefill = prefill;
self
}
pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
self.alignment = alignment;
self
}
pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
let mut class_bytes = 0usize;
for i in 0..self.num_classes() {
class_bytes = class_bytes.saturating_add(self.class_size(i));
}
if class_bytes == 0 {
return self;
}
self.max_per_class = NZUsize!(budget_bytes.get().div_ceil(class_bytes));
self
}
fn validate(&self) {
assert!(
self.alignment.is_power_of_two(),
"alignment must be a power of two"
);
assert!(
self.min_size.is_power_of_two(),
"min_size must be a power of two"
);
assert!(
self.max_size.is_power_of_two(),
"max_size must be a power of two"
);
assert!(
self.min_size >= self.alignment,
"min_size ({}) must be >= alignment ({})",
self.min_size,
self.alignment
);
assert!(
self.max_size >= self.min_size,
"max_size must be >= min_size"
);
assert!(
self.pool_min_size <= self.min_size.get(),
"pool_min_size ({}) must be <= min_size ({})",
self.pool_min_size,
self.min_size
);
if let BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity) = self.thread_cache_config
{
assert!(
thread_cache_capacity <= self.max_per_class,
"thread_cache_capacity ({}) must be <= max_per_class ({})",
thread_cache_capacity,
self.max_per_class
);
}
}
#[inline]
fn num_classes(&self) -> usize {
if self.max_size < self.min_size {
return 0;
}
(self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
}
#[inline]
fn class_index(&self, size: usize) -> Option<usize> {
if size > self.max_size.get() {
return None;
}
if size <= self.min_size.get() {
return Some(0);
}
let size_class = size.next_power_of_two();
let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
if index < self.num_classes() {
Some(index)
} else {
None
}
}
const fn class_size(&self, index: usize) -> usize {
self.min_size.get() << index
}
fn resolve_thread_cache_capacity(&self) -> usize {
match self.thread_cache_config {
BufferPoolThreadCacheConfig::Disabled => 0,
BufferPoolThreadCacheConfig::Fixed(thread_cache_capacity) => {
thread_cache_capacity.get()
}
BufferPoolThreadCacheConfig::ForParallelism(parallelism) => {
let max_per_class = self.max_per_class.get();
let effective_threads = parallelism.get().min(max_per_class);
(max_per_class / (2 * effective_threads)).clamp(1, 8)
}
}
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct SizeClassLabel {
size_class: u64,
}
struct PoolMetrics {
created: Family<SizeClassLabel, Gauge>,
exhausted_total: Family<SizeClassLabel, Counter>,
oversized_total: Counter,
}
impl PoolMetrics {
fn new(registry: &mut Registry) -> Self {
let metrics = Self {
created: Family::default(),
exhausted_total: Family::default(),
oversized_total: Counter::default(),
};
registry.register(
"buffer_pool_created",
"Number of tracked buffers currently created for the pool",
metrics.created.clone(),
);
registry.register(
"buffer_pool_exhausted_total",
"Total number of failed allocations due to pool exhaustion",
metrics.exhausted_total.clone(),
);
registry.register(
"buffer_pool_oversized_total",
"Total number of allocation requests exceeding max buffer size",
metrics.oversized_total.clone(),
);
metrics
}
}
pub(super) struct SizeClass {
class_id: usize,
size: usize,
alignment: usize,
max: usize,
global: ArrayQueue<AlignedBuffer>,
created: AtomicUsize,
thread_cache_capacity: usize,
}
unsafe impl Send for SizeClass {}
unsafe impl Sync for SizeClass {}
impl SizeClass {
fn new(
class_id: usize,
size: usize,
alignment: usize,
max: usize,
thread_cache_capacity: usize,
prefill: bool,
) -> Self {
let freelist = ArrayQueue::new(max);
let mut created = 0;
if prefill {
for _ in 0..max {
let _ = freelist.push(AlignedBuffer::new(size, alignment));
}
created = max;
}
Self {
class_id,
size,
alignment,
max,
global: freelist,
created: AtomicUsize::new(created),
thread_cache_capacity,
}
}
#[inline]
fn push_global(&self, buffer: AlignedBuffer) {
self.global.push(buffer).unwrap_or_else(|_| {
unreachable!("tracked buffer should always fit in the global pool")
});
}
fn try_reserve(&self) -> bool {
self.created
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |created| {
(created < self.max).then_some(created + 1)
})
.is_ok()
}
}
struct TlsSizeClassCacheEntry {
buffer: AlignedBuffer,
class: Arc<SizeClass>,
}
struct TlsSizeClassCache {
entries: Vec<TlsSizeClassCacheEntry>,
capacity: usize,
}
impl TlsSizeClassCache {
fn new(capacity: usize) -> Self {
Self {
entries: Vec::with_capacity(capacity),
capacity,
}
}
#[inline]
const fn len(&self) -> usize {
self.entries.len()
}
#[inline]
fn pop(&mut self) -> Option<TlsSizeClassCacheEntry> {
self.entries.pop()
}
fn push(&mut self, entry: TlsSizeClassCacheEntry) {
if self.entries.len() < self.capacity {
self.entries.push(entry);
return;
}
if self.capacity < MIN_THREAD_CACHE_BATCHING_CAPACITY {
entry.class.push_global(entry.buffer);
return;
}
let spill = self.entries.len().min(self.capacity / 2).max(1);
for _ in 0..spill {
let spilled = self
.entries
.pop()
.expect("spill count must not exceed cached entries");
spilled.class.push_global(spilled.buffer);
}
self.entries.push(entry);
}
}
impl Drop for TlsSizeClassCache {
fn drop(&mut self) {
for entry in self.entries.drain(..) {
entry.class.push_global(entry.buffer);
}
}
}
thread_local! {
static TLS_SIZE_CLASS_CACHES: UnsafeCell<Vec<Option<TlsSizeClassCache>>> =
const { UnsafeCell::new(Vec::new()) };
}
static NEXT_SIZE_CLASS_ID: AtomicUsize = AtomicUsize::new(0);
pub struct BufferPoolThreadCache;
impl BufferPoolThreadCache {
pub fn flush() {
TLS_SIZE_CLASS_CACHES.with(|bins| {
let bins = unsafe { &mut *bins.get() };
for cache in bins.iter_mut() {
let _ = cache.take();
}
});
}
#[inline]
fn pop(class: &Arc<SizeClass>) -> Option<TlsSizeClassCacheEntry> {
Self::with_cache(class.class_id, class.thread_cache_capacity, |cache| {
cache.pop()
})
}
#[inline]
pub(super) fn push(class: Arc<SizeClass>, buffer: AlignedBuffer) {
let class_id = class.class_id;
let thread_cache_capacity = class.thread_cache_capacity;
Self::with_cache(class_id, thread_cache_capacity, |cache| {
cache.push(TlsSizeClassCacheEntry { buffer, class });
});
}
#[inline]
fn refill(class: &Arc<SizeClass>, target: usize) {
Self::with_cache(class.class_id, class.thread_cache_capacity, |cache| {
while cache.len() + 1 < target {
let Some(buffer) = class.global.pop() else {
break;
};
cache.push(TlsSizeClassCacheEntry {
buffer,
class: class.clone(),
});
}
});
}
#[inline]
fn with_cache<R>(
class_id: usize,
capacity: usize,
f: impl FnOnce(&mut TlsSizeClassCache) -> R,
) -> R {
TLS_SIZE_CLASS_CACHES.with(|bins| {
let bins = unsafe { &mut *bins.get() };
if class_id >= bins.len() {
bins.resize_with(class_id + 1, || None);
}
let cache = bins[class_id].get_or_insert_with(|| TlsSizeClassCache::new(capacity));
f(cache)
})
}
}
struct Allocation {
buffer: AlignedBuffer,
is_new: bool,
class: Arc<SizeClass>,
}
pub(crate) struct BufferPoolInner {
config: BufferPoolConfig,
classes: Vec<Arc<SizeClass>>,
metrics: PoolMetrics,
}
impl Drop for BufferPoolInner {
fn drop(&mut self) {
for class in &self.classes {
while let Some(buffer) = class.global.pop() {
class.created.fetch_sub(1, Ordering::Relaxed);
drop(buffer);
}
}
}
}
impl BufferPoolInner {
fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
let class = &self.classes[class_index];
if let Some(entry) = BufferPoolThreadCache::pop(class) {
return Some(Allocation {
buffer: entry.buffer,
is_new: false,
class: entry.class,
});
}
let target = (class.thread_cache_capacity / 2).max(1);
if let Some(buffer) = class.global.pop() {
BufferPoolThreadCache::refill(class, target);
return Some(Allocation {
buffer,
is_new: false,
class: class.clone(),
});
}
let label = SizeClassLabel {
size_class: class.size as u64,
};
if !class.try_reserve() {
self.metrics.exhausted_total.get_or_create(&label).inc();
return None;
}
self.metrics.created.get_or_create(&label).inc();
let buffer = if zero_on_new {
AlignedBuffer::new_zeroed(class.size, class.alignment)
} else {
AlignedBuffer::new(class.size, class.alignment)
};
Some(Allocation {
buffer,
is_new: true,
class: class.clone(),
})
}
}
#[derive(Clone)]
pub struct BufferPool {
inner: Arc<BufferPoolInner>,
}
impl std::fmt::Debug for BufferPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufferPool")
.field("config", &self.inner.config)
.field("num_classes", &self.inner.classes.len())
.finish()
}
}
impl BufferPool {
pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
config.validate();
let metrics = PoolMetrics::new(registry);
let mut classes = Vec::with_capacity(config.num_classes());
let thread_cache_capacity = config.resolve_thread_cache_capacity();
for i in 0..config.num_classes() {
let size = config.class_size(i);
let class_id = NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed);
let class = Arc::new(SizeClass::new(
class_id,
size,
config.alignment.get(),
config.max_per_class.get(),
thread_cache_capacity,
config.prefill,
));
classes.push(class);
}
if config.prefill {
for class in &classes {
let label = SizeClassLabel {
size_class: class.size as u64,
};
let created = class.global.len() as i64;
metrics.created.get_or_create(&label).set(created);
}
}
Self {
inner: Arc::new(BufferPoolInner {
config,
classes,
metrics,
}),
}
}
#[inline]
fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
let class_index = self.inner.config.class_index(capacity);
if class_index.is_none() {
self.inner.metrics.oversized_total.inc();
}
class_index
}
pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
if capacity < self.inner.config.pool_min_size {
let size = capacity.max(1);
return Ok(IoBufMut::with_alignment(size, self.inner.config.alignment));
}
let class_index = self
.class_index_or_record_oversized(capacity)
.ok_or(PoolError::Oversized)?;
let buffer = self
.inner
.try_alloc(class_index, false)
.map(|allocation| PooledBufMut::new(allocation.buffer, allocation.class))
.ok_or(PoolError::Exhausted)?;
Ok(IoBufMut::from_pooled(buffer))
}
pub fn alloc(&self, capacity: usize) -> IoBufMut {
self.try_alloc(capacity).unwrap_or_else(|_| {
let size = capacity.max(self.inner.config.min_size.get());
IoBufMut::with_alignment(size, self.inner.config.alignment)
})
}
pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
let mut buf = self.alloc(len);
unsafe { buf.set_len(len) };
buf
}
pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
if len < self.inner.config.pool_min_size {
let size = len.max(1);
let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
buf.truncate(len);
return Ok(buf);
}
let class_index = self
.class_index_or_record_oversized(len)
.ok_or(PoolError::Oversized)?;
let allocation = self
.inner
.try_alloc(class_index, true)
.ok_or(PoolError::Exhausted)?;
let mut buf = IoBufMut::from_pooled(PooledBufMut::new(allocation.buffer, allocation.class));
if allocation.is_new {
unsafe { buf.set_len(len) };
} else {
unsafe {
std::ptr::write_bytes(buf.as_mut_ptr(), 0, len);
buf.set_len(len);
}
}
Ok(buf)
}
pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
self.try_alloc_zeroed(len).unwrap_or_else(|_| {
let size = len.max(self.inner.config.min_size.get());
let mut buf = IoBufMut::zeroed_with_alignment(size, self.inner.config.alignment);
buf.truncate(len);
buf
})
}
pub fn config(&self) -> &BufferPoolConfig {
&self.inner.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::iobuf::IoBuf;
use bytes::{Buf, BufMut};
use std::{
sync::{mpsc, Arc},
thread,
};
fn test_size_class(size: usize, alignment: usize) -> Arc<SizeClass> {
Arc::new(SizeClass::new(
NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
size,
alignment,
8,
4,
false,
))
}
fn test_registry() -> Registry {
Registry::default()
}
fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(min_size),
max_size: NZUsize!(max_size),
max_per_class: NZUsize!(max_per_class),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: false,
alignment: NZUsize!(page_size()),
}
}
fn get_allocated(pool: &BufferPool, size: usize) -> usize {
let class_index = pool.inner.config.class_index(size).unwrap();
let class = &pool.inner.classes[class_index];
class.created.load(Ordering::Relaxed) - class.global.len() - get_local_len(class)
}
fn get_available(pool: &BufferPool, size: usize) -> i64 {
let class_index = pool.inner.config.class_index(size).unwrap();
let class = &pool.inner.classes[class_index];
(class.global.len() + get_local_len(class)) as i64
}
fn get_local_len(class: &SizeClass) -> usize {
TLS_SIZE_CLASS_CACHES.with(|bins| {
let bins = unsafe { &*bins.get() };
bins.get(class.class_id)
.and_then(Option::as_ref)
.map_or(0, TlsSizeClassCache::len)
})
}
#[test]
fn test_page_size() {
let size = page_size();
assert!(size >= 4096);
assert!(size.is_power_of_two());
}
#[test]
fn test_config_validation() {
let page = page_size();
let config = test_config(page, page * 4, 10);
config.validate();
}
#[test]
#[should_panic(expected = "thread_cache_capacity (11) must be <= max_per_class (10)")]
fn test_config_invalid_thread_cache_capacity() {
let page = page_size();
let config = test_config(page, page * 4, 10).with_thread_cache_capacity(NZUsize!(11));
config.validate();
}
#[test]
#[should_panic(expected = "min_size must be a power of two")]
fn test_config_invalid_min_size() {
let config = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(3000),
max_size: NZUsize!(8192),
max_per_class: NZUsize!(10),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: false,
alignment: NZUsize!(page_size()),
};
config.validate();
}
#[test]
fn test_config_class_index() {
let page = page_size();
let config = test_config(page, page * 8, 10);
assert_eq!(config.num_classes(), 4);
assert_eq!(config.class_index(1), Some(0));
assert_eq!(config.class_index(page), Some(0));
assert_eq!(config.class_index(page + 1), Some(1));
assert_eq!(config.class_index(page * 2), Some(1));
assert_eq!(config.class_index(page * 8), Some(3));
assert_eq!(config.class_index(page * 8 + 1), None);
}
#[test]
fn test_pool_alloc_and_return() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let buf = pool.try_alloc(page).unwrap();
assert!(buf.capacity() >= page);
assert_eq!(buf.len(), 0);
drop(buf);
let buf2 = pool.try_alloc(page).unwrap();
assert!(buf2.capacity() >= page);
assert_eq!(buf2.len(), 0);
}
#[test]
fn test_alloc_len_sets_len() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let mut buf = unsafe { pool.alloc_len(100) };
assert_eq!(buf.len(), 100);
buf.as_mut().fill(0xAB);
let frozen = buf.freeze();
assert_eq!(frozen.as_ref(), &[0xAB; 100]);
}
#[test]
fn test_alloc_zeroed_sets_len_and_zeros() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let buf = pool.alloc_zeroed(100);
assert_eq!(buf.len(), 100);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_try_alloc_zeroed_sets_len_and_zeros() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let buf = pool.try_alloc_zeroed(page).unwrap();
assert!(buf.is_pooled());
assert_eq!(buf.len(), page);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
let _pooled = pool.try_alloc(page).unwrap();
let buf = pool.alloc_zeroed(100);
assert!(!buf.is_pooled());
assert_eq!(buf.len(), 100);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
let mut first = pool.alloc_zeroed(page);
assert!(first.is_pooled());
assert!(first.as_ref().iter().all(|&b| b == 0));
first.as_mut().fill(0xAB);
drop(first);
let second = pool.alloc_zeroed(page);
assert!(second.is_pooled());
assert_eq!(second.len(), page);
assert!(second.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_requests_smaller_than_pool_min_size_bypass_pool() {
let mut registry = test_registry();
let pool = BufferPool::new(
BufferPoolConfig {
pool_min_size: 512,
min_size: NZUsize!(512),
max_size: NZUsize!(1024),
max_per_class: NZUsize!(2),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: false,
alignment: NZUsize!(128),
},
&mut registry,
);
let buf = pool.try_alloc(200).unwrap();
assert!(!buf.is_pooled());
assert_eq!(buf.capacity(), 200);
let zeroed = pool.try_alloc_zeroed(200).unwrap();
assert!(!zeroed.is_pooled());
assert_eq!(zeroed.len(), 200);
assert!(zeroed.as_ref().iter().all(|&b| b == 0));
let pooled = pool.try_alloc(512).unwrap();
assert!(pooled.is_pooled());
assert_eq!(pooled.capacity(), 512);
}
#[test]
fn test_pool_size_classes() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
let buf1 = pool.try_alloc(page).unwrap();
assert_eq!(buf1.capacity(), page);
let buf2 = pool.try_alloc(page + 1).unwrap();
assert_eq!(buf2.capacity(), page * 2);
let buf3 = pool.try_alloc(page * 3).unwrap();
assert_eq!(buf3.capacity(), page * 4);
}
#[test]
fn test_prefill() {
let page = NZUsize!(page_size());
let mut registry = test_registry();
let pool = BufferPool::new(
BufferPoolConfig {
pool_min_size: 0,
min_size: page,
max_size: page,
max_per_class: NZUsize!(5),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: true,
alignment: page,
},
&mut registry,
);
let mut bufs = Vec::new();
for _ in 0..5 {
bufs.push(pool.try_alloc(page.get()).expect("alloc should succeed"));
}
assert!(pool.try_alloc(page.get()).is_err());
}
#[test]
fn test_config_for_network() {
let config = BufferPoolConfig::for_network();
config.validate();
assert_eq!(config.pool_min_size, 1024);
assert_eq!(config.min_size.get(), 1024);
assert_eq!(config.max_size.get(), 64 * 1024);
assert_eq!(config.max_per_class.get(), 4096);
assert_eq!(
config.thread_cache_config,
BufferPoolThreadCacheConfig::Disabled
);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), cache_line_size());
}
#[test]
fn test_config_for_storage() {
let config = BufferPoolConfig::for_storage();
config.validate();
assert_eq!(config.pool_min_size, 1024);
assert_eq!(config.min_size.get(), page_size());
assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
assert_eq!(config.max_per_class.get(), 64);
assert_eq!(
config.thread_cache_config,
BufferPoolThreadCacheConfig::Disabled
);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), page_size());
}
#[test]
fn test_storage_config_supports_default_allocations() {
let mut registry = test_registry();
let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
assert_eq!(buf.capacity(), 8 * 1024 * 1024);
}
#[test]
fn test_config_builders() {
let page = NZUsize!(page_size());
let config = BufferPoolConfig::for_storage()
.with_pool_min_size(1024)
.with_max_per_class(NZUsize!(64))
.with_thread_cache_capacity(NZUsize!(8))
.with_prefill(true)
.with_min_size(page)
.with_max_size(NZUsize!(128 * 1024));
config.validate();
assert_eq!(config.pool_min_size, 1024);
assert_eq!(config.min_size, page);
assert_eq!(config.max_size.get(), 128 * 1024);
assert_eq!(config.max_per_class.get(), 64);
assert_eq!(
config.thread_cache_config,
BufferPoolThreadCacheConfig::Fixed(NZUsize!(8))
);
assert!(config.prefill);
assert_eq!(config.alignment.get(), page_size());
let aligned = BufferPoolConfig::for_network()
.with_pool_min_size(256)
.with_thread_cache_for_parallelism(NZUsize!(4))
.with_alignment(NZUsize!(256))
.with_min_size(NZUsize!(256));
aligned.validate();
assert_eq!(
aligned.thread_cache_config,
BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(4))
);
assert_eq!(aligned.alignment.get(), 256);
assert_eq!(aligned.min_size.get(), 256);
}
#[test]
fn test_parallelism_policy_resolves_thread_cache_capacity() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(
test_config(page, page, 64).with_thread_cache_for_parallelism(NZUsize!(8)),
&mut registry,
);
let class_index = pool.inner.config.class_index(page).unwrap();
assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 4);
}
#[test]
fn test_fixed_thread_cache_capacity_overrides_runtime_parallelism() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(
test_config(page, page, 64).with_thread_cache_capacity(NZUsize!(7)),
&mut registry,
);
let class_index = pool.inner.config.class_index(page).unwrap();
assert_eq!(pool.inner.classes[class_index].thread_cache_capacity, 7);
}
#[test]
fn test_disabled_thread_cache_does_not_retain_buffers_locally() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(
test_config(page, page, 2).with_thread_cache_disabled(),
&mut registry,
);
let class_index = pool.inner.config.class_index(page).unwrap();
let class = &pool.inner.classes[class_index];
let tracked = pool.try_alloc(page).expect("tracked allocation");
drop(tracked);
assert_eq!(class.thread_cache_capacity, 0);
assert_eq!(get_local_len(class), 0);
assert_eq!(class.global.len(), 1);
}
#[test]
fn test_thread_cache_flush_moves_local_entries_to_global() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(
test_config(page, page * 2, 8).with_thread_cache_capacity(NZUsize!(4)),
&mut registry,
);
let small_index = pool.inner.config.class_index(page).unwrap();
let large_index = pool.inner.config.class_index(page + 1).unwrap();
let small_class = &pool.inner.classes[small_index];
let large_class = &pool.inner.classes[large_index];
let small = pool.try_alloc(page).expect("tracked allocation");
let large = pool.try_alloc(page + 1).expect("tracked allocation");
drop(small);
drop(large);
assert_eq!(get_local_len(small_class), 1);
assert_eq!(get_local_len(large_class), 1);
assert_eq!(small_class.global.len(), 0);
assert_eq!(large_class.global.len(), 0);
BufferPoolThreadCache::flush();
assert_eq!(get_local_len(small_class), 0);
assert_eq!(get_local_len(large_class), 0);
assert_eq!(small_class.global.len(), 1);
assert_eq!(large_class.global.len(), 1);
}
#[test]
fn test_config_with_budget_bytes() {
let config = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(4),
max_size: NZUsize!(16),
max_per_class: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: false,
alignment: NZUsize!(4),
}
.with_budget_bytes(NZUsize!(280));
assert_eq!(config.max_per_class.get(), 10);
let small_budget = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(4),
max_size: NZUsize!(16),
max_per_class: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: false,
alignment: NZUsize!(4),
}
.with_budget_bytes(NZUsize!(10));
assert_eq!(small_budget.max_per_class.get(), 1);
}
#[test]
fn test_pool_error_display() {
assert_eq!(
PoolError::Oversized.to_string(),
"requested capacity exceeds maximum buffer size"
);
assert_eq!(
PoolError::Exhausted.to_string(),
"pool exhausted for required size class"
);
}
#[test]
fn test_config_invalid_range_edge_paths() {
let invalid_order = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(8),
max_size: NZUsize!(4),
max_per_class: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: false,
alignment: NZUsize!(4),
};
assert_eq!(invalid_order.num_classes(), 0);
let unchanged = invalid_order.clone().with_budget_bytes(NZUsize!(128));
assert_eq!(unchanged.max_per_class, invalid_order.max_per_class);
let non_power_two_max = BufferPoolConfig {
pool_min_size: 0,
min_size: NZUsize!(8),
max_size: NZUsize!(12),
max_per_class: NZUsize!(1),
thread_cache_config: BufferPoolThreadCacheConfig::ForParallelism(NZUsize!(1)),
prefill: false,
alignment: NZUsize!(4),
};
assert_eq!(non_power_two_max.class_index(12), None);
}
#[test]
fn test_pool_debug_and_config_accessor() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let debug = format!("{pool:?}");
assert!(debug.contains("BufferPool"));
assert!(debug.contains("num_classes"));
assert_eq!(pool.config().min_size.get(), page);
}
#[test]
fn test_return_buffer_local_overflow_spills_to_global() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let class_index = pool
.inner
.config
.class_index(page)
.expect("class exists for page-sized buffer");
let tracked1 = pool.try_alloc(page).expect("first tracked allocation");
let tracked2 = pool.try_alloc(page).expect("second tracked allocation");
drop(tracked1);
assert_eq!(pool.inner.classes[class_index].global.len(), 0);
assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
drop(tracked2);
assert_eq!(pool.inner.classes[class_index].global.len(), 1);
assert_eq!(get_local_len(&pool.inner.classes[class_index]), 1);
assert_eq!(get_available(&pool, page), 2);
}
#[test]
fn test_small_local_cache_overflow_preserves_locality() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut tracked1 = pool.try_alloc(page).expect("first tracked allocation");
let ptr1 = tracked1.as_mut_ptr();
let mut tracked2 = pool.try_alloc(page).expect("second tracked allocation");
let ptr2 = tracked2.as_mut_ptr();
drop(tracked1);
drop(tracked2);
let mut reused_local = pool.try_alloc(page).expect("reuse from local cache");
assert_eq!(reused_local.as_mut_ptr(), ptr1);
let mut reused_global = pool.try_alloc(page).expect("reuse from global freelist");
assert_eq!(reused_global.as_mut_ptr(), ptr2);
}
#[test]
fn test_large_local_cache_batches_overflow_and_refill() {
let page = page_size();
let mut registry = test_registry();
let threads = std::thread::available_parallelism().map_or(1, NonZeroUsize::get);
let max_per_class = threads * 8;
let pool = BufferPool::new(test_config(page, page, max_per_class), &mut registry);
let class_index = pool
.inner
.config
.class_index(page)
.expect("class exists for page-sized buffer");
let class = &pool.inner.classes[class_index];
assert!(class.thread_cache_capacity >= MIN_THREAD_CACHE_BATCHING_CAPACITY);
let mut bufs = Vec::new();
for _ in 0..class.thread_cache_capacity + 1 {
bufs.push(pool.try_alloc(page).expect("tracked allocation"));
}
for buf in bufs {
drop(buf);
}
assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 + 1);
assert_eq!(class.global.len(), class.thread_cache_capacity / 2);
let mut reused = Vec::new();
for _ in 0..class.thread_cache_capacity / 2 + 1 {
reused.push(pool.try_alloc(page).expect("local reuse"));
}
assert_eq!(get_local_len(class), 0);
assert_eq!(class.global.len(), class.thread_cache_capacity / 2);
let _global = pool.try_alloc(page).expect("global reuse with refill");
assert_eq!(get_local_len(class), class.thread_cache_capacity / 2 - 1);
assert_eq!(class.global.len(), 0);
}
#[test]
fn test_tls_refill_stops_when_global_runs_empty() {
let class = test_size_class(64, 64);
class.push_global(AlignedBuffer::new(class.size, class.alignment));
BufferPoolThreadCache::refill(&class, MIN_THREAD_CACHE_BATCHING_CAPACITY);
assert_eq!(get_local_len(&class), 1);
assert_eq!(class.global.len(), 0);
}
#[test]
fn test_tls_size_class_cache_push_tolerates_empty_spill() {
let class = test_size_class(64, 64);
let mut cache = TlsSizeClassCache {
entries: Vec::new(),
capacity: 0,
};
cache.push(TlsSizeClassCacheEntry {
buffer: AlignedBuffer::new(class.size, class.alignment),
class,
});
drop(cache);
}
#[test]
#[should_panic(expected = "tracked buffer should always fit in the global pool")]
fn test_push_global_panics_when_global_queue_is_inconsistently_full() {
let class = Arc::new(SizeClass::new(
NEXT_SIZE_CLASS_ID.fetch_add(1, Ordering::Relaxed),
64,
64,
1,
1,
false,
));
class.push_global(AlignedBuffer::new(64, 64));
class.push_global(AlignedBuffer::new(64, 64));
}
#[test]
fn test_pooled_debug_and_empty_into_bytes_paths() {
let page = page_size();
let class = test_size_class(page, page);
let pooled_mut_debug = {
let pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Arc::clone(&class));
format!("{pooled_mut:?}")
};
assert!(pooled_mut_debug.contains("PooledBufMut"));
assert!(pooled_mut_debug.contains("cursor"));
let empty_from_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Arc::clone(&class));
assert!(empty_from_mut.into_bytes().is_empty());
let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), class).into_pooled();
let pooled_debug = format!("{pooled:?}");
assert!(pooled_debug.contains("PooledBuf"));
assert!(pooled_debug.contains("capacity"));
assert!(pooled.into_bytes().is_empty());
}
#[test]
fn test_freeze_returns_buffer_to_pool() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 0);
let buf = pool.try_alloc(page).unwrap();
assert_eq!(get_allocated(&pool, page), 1);
assert_eq!(get_available(&pool, page), 0);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_refcount_and_copy_to_bytes_paths() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
{
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&[0xAA; 100]);
let iobuf = buf.freeze();
let clone = iobuf.clone();
let slice = iobuf.slice(10..40);
let empty = iobuf.slice(10..10);
assert!(empty.is_empty());
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 1);
drop(slice);
assert_eq!(get_allocated(&pool, page), 1);
drop(clone);
assert_eq!(get_allocated(&pool, page), 0);
}
{
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&[0x42; 100]);
let mut iobuf = buf.freeze();
let zero = iobuf.copy_to_bytes(0);
assert!(zero.is_empty());
assert_eq!(iobuf.remaining(), 100);
let partial = iobuf.copy_to_bytes(30);
assert_eq!(&partial[..], &[0x42; 30]);
assert_eq!(iobuf.remaining(), 70);
let rest = iobuf.copy_to_bytes(70);
assert_eq!(&rest[..], &[0x42; 70]);
assert_eq!(iobuf.remaining(), 0);
let empty = iobuf.copy_to_bytes(0);
assert!(empty.is_empty());
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 1);
drop(zero);
drop(partial);
assert_eq!(get_allocated(&pool, page), 1);
drop(rest);
assert_eq!(get_allocated(&pool, page), 0);
}
{
let buf = pool.try_alloc(page).unwrap();
let mut iobufmut = buf;
iobufmut.put_slice(&[0x7E; 100]);
let zero = iobufmut.copy_to_bytes(0);
assert!(zero.is_empty());
assert_eq!(iobufmut.remaining(), 100);
let partial = iobufmut.copy_to_bytes(30);
assert_eq!(&partial[..], &[0x7E; 30]);
assert_eq!(iobufmut.remaining(), 70);
let rest = iobufmut.copy_to_bytes(70);
assert_eq!(&rest[..], &[0x7E; 70]);
assert_eq!(iobufmut.remaining(), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 1);
drop(zero);
drop(partial);
assert_eq!(get_allocated(&pool, page), 1);
drop(rest);
assert_eq!(get_allocated(&pool, page), 0);
}
}
#[test]
fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let buf = pool.try_alloc(page).unwrap();
assert_eq!(get_allocated(&pool, page), 1);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
let iobufmut: IoBufMut = iobuf.into();
assert_eq!(
get_allocated(&pool, page),
1,
"pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
);
assert_eq!(get_available(&pool, page), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xEE; page]);
let iobuf = buf.freeze();
let iobufmut: IoBufMut = iobuf.into();
assert_eq!(iobufmut.len(), page);
assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
assert_eq!(get_allocated(&pool, page), 1);
assert_eq!(get_available(&pool, page), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_try_into_mut_recycles_full_unique_view() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xAB; page]);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
let recycled = iobuf
.try_into_mut()
.expect("unique full-view pooled buffer should recycle");
assert_eq!(recycled.len(), page);
assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
assert_eq!(recycled.capacity(), page);
assert_eq!(get_allocated(&pool, page), 1);
drop(recycled);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xCD; page]);
let iobuf = buf.freeze();
let sliced = iobuf.slice(1..page);
drop(iobuf);
let recycled = sliced
.try_into_mut()
.expect("unique sliced pooled buffer should recycle");
assert_eq!(recycled.len(), page - 1);
assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
assert_eq!(recycled.capacity(), page - 1);
assert_eq!(get_allocated(&pool, page), 1);
drop(recycled);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xEF; page]);
let iobuf = buf.freeze();
let cloned = iobuf.clone();
let iobuf = iobuf
.try_into_mut()
.expect_err("shared pooled buffer must not convert to mutable");
drop(cloned);
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 0);
assert!(get_available(&pool, page) >= 1);
}
#[test]
fn test_multithreaded_alloc_freeze_return() {
let page = page_size();
let mut registry = test_registry();
let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
let mut handles = vec![];
cfg_if::cfg_if! {
if #[cfg(miri)] {
let iterations = 100;
} else {
let iterations = 1000;
}
}
for _ in 0..10 {
let pool = pool.clone();
let handle = thread::spawn(move || {
for _ in 0..iterations {
let buf = pool.try_alloc(page).unwrap();
let iobuf = buf.freeze();
let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
drop(iobuf);
for clone in clones {
drop(clone);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let _buf = pool
.try_alloc(page)
.expect("pool should remain usable after multithreaded test");
}
#[test]
fn test_cross_thread_buffer_return() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
let (tx, rx) = mpsc::channel();
for _ in 0..50 {
let buf = pool.try_alloc(page).unwrap();
let iobuf = buf.freeze();
tx.send(iobuf).unwrap();
}
drop(tx);
let handle = thread::spawn(move || {
while let Ok(iobuf) = rx.recv() {
drop(iobuf);
}
let class_index = pool
.inner
.config
.class_index(page)
.expect("class exists for page-sized buffer");
assert!(
get_local_len(&pool.inner.classes[class_index]) >= 1,
"dropping thread should retain returned buffers in its local cache"
);
for _ in 0..50 {
let _buf = pool
.try_alloc(page)
.expect("dropping thread should be able to reuse returned buffers");
}
});
handle.join().unwrap();
}
#[test]
fn test_thread_exit_flushes_local_bin() {
let page = page_size();
let mut registry = test_registry();
let pool = Arc::new(BufferPool::new(test_config(page, page, 1), &mut registry));
let worker_pool = pool.clone();
thread::spawn(move || {
let buf = worker_pool
.try_alloc(page)
.expect("worker should allocate tracked buffer");
drop(buf);
})
.join()
.expect("worker thread should exit cleanly");
let class_index = pool
.inner
.config
.class_index(page)
.expect("class exists for page-sized buffer");
assert_eq!(pool.inner.classes[class_index].global.len(), 1);
assert_eq!(get_local_len(&pool.inner.classes[class_index]), 0);
let _buf = pool
.try_alloc(page)
.expect("thread-exited local buffer should be reusable");
}
#[test]
fn test_pool_drop_drains_global_freelist() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let class_index = pool
.inner
.config
.class_index(page)
.expect("class exists for page-sized buffer");
let class = Arc::clone(&pool.inner.classes[class_index]);
let buf1 = pool.try_alloc(page).unwrap();
let buf2 = pool.try_alloc(page).unwrap();
drop(buf1);
drop(buf2);
assert_eq!(class.global.len(), 1);
assert_eq!(get_local_len(&class), 1);
drop(pool);
assert_eq!(class.global.len(), 0);
assert_eq!(get_local_len(&class), 1);
assert_eq!(class.created.load(Ordering::Relaxed), 1);
}
#[test]
fn test_pool_dropped_before_buffer() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&[0u8; 100]);
let iobuf = buf.freeze();
drop(pool);
assert_eq!(iobuf.len(), 100);
drop(iobuf);
}
#[test]
fn test_pool_exhaustion_and_recovery() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
let buf1 = pool.try_alloc(page).expect("first alloc");
let buf2 = pool.try_alloc(page).expect("second alloc");
let buf3 = pool.try_alloc(page).expect("third alloc");
assert!(pool.try_alloc(page).is_err(), "pool should be exhausted");
drop(buf1);
let buf4 = pool.try_alloc(page).expect("alloc after return");
assert!(pool.try_alloc(page).is_err(), "pool exhausted again");
drop(buf2);
drop(buf3);
drop(buf4);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 3);
let _buf5 = pool.try_alloc(page).expect("reuse from freelist");
assert_eq!(get_available(&pool, page), 2);
}
#[test]
fn test_try_alloc_errors() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let result = pool.try_alloc(page * 10);
assert_eq!(result.unwrap_err(), PoolError::Oversized);
let _buf1 = pool.try_alloc(page).unwrap();
let _buf2 = pool.try_alloc(page).unwrap();
let result = pool.try_alloc(page);
assert_eq!(result.unwrap_err(), PoolError::Exhausted);
}
#[test]
fn test_try_alloc_zeroed_errors() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let result = pool.try_alloc_zeroed(page * 10);
assert_eq!(result.unwrap_err(), PoolError::Oversized);
let _buf1 = pool.try_alloc_zeroed(page).unwrap();
let _buf2 = pool.try_alloc_zeroed(page).unwrap();
let result = pool.try_alloc_zeroed(page);
assert_eq!(result.unwrap_err(), PoolError::Exhausted);
}
#[test]
fn test_fallback_allocation() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let buf1 = pool.try_alloc(page).unwrap();
let buf2 = pool.try_alloc(page).unwrap();
assert!(buf1.is_pooled());
assert!(buf2.is_pooled());
let mut fallback_exhausted = pool.alloc(page);
assert!(!fallback_exhausted.is_pooled());
assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
let mut fallback_oversized = pool.alloc(page * 10);
assert!(!fallback_oversized.is_pooled());
assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
assert_eq!(get_allocated(&pool, page), 2);
drop(fallback_exhausted);
drop(fallback_oversized);
assert_eq!(get_allocated(&pool, page), 2);
drop(buf1);
drop(buf2);
assert_eq!(get_allocated(&pool, page), 0);
}
#[test]
fn test_is_pooled() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let pooled = pool.try_alloc(page).unwrap();
assert!(pooled.is_pooled());
let owned = IoBufMut::with_capacity(100);
assert!(!owned.is_pooled());
}
#[test]
fn test_iobuf_is_pooled() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let pooled = pool.try_alloc(page).unwrap().freeze();
assert!(pooled.is_pooled());
let fallback = pool.alloc(page * 10).freeze();
assert!(!fallback.is_pooled());
let bytes = IoBuf::copy_from_slice(b"hello");
assert!(!bytes.is_pooled());
}
#[test]
fn test_buffer_alignment() {
let page = page_size();
let cache_line = cache_line_size();
let mut registry = test_registry();
cfg_if::cfg_if! {
if #[cfg(miri)] {
let storage_config = BufferPoolConfig {
max_per_class: NZUsize!(32),
..BufferPoolConfig::for_storage()
};
let network_config = BufferPoolConfig {
max_per_class: NZUsize!(32),
..BufferPoolConfig::for_network()
};
} else {
let storage_config = BufferPoolConfig::for_storage();
let network_config = BufferPoolConfig::for_network();
}
}
let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
assert_eq!(
buf.as_mut_ptr() as usize % page,
0,
"storage buffer not page-aligned"
);
let network_buffer_pool = BufferPool::new(network_config, &mut registry);
let mut buf = network_buffer_pool.try_alloc(100).unwrap();
assert_eq!(
buf.as_mut_ptr() as usize % cache_line,
0,
"network buffer not cache-line aligned"
);
}
}