use super::{IoBuf, IoBufMut};
use bytes::{Buf, BufMut, Bytes};
use commonware_utils::NZUsize;
use crossbeam_queue::ArrayQueue;
use prometheus_client::{
encoding::EncodeLabelSet,
metrics::{counter::Counter, family::Family, gauge::Gauge},
registry::Registry,
};
use std::{
alloc::{alloc, alloc_zeroed, dealloc, handle_alloc_error, Layout},
mem::ManuallyDrop,
num::NonZeroUsize,
ops::{Bound, RangeBounds},
ptr::NonNull,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolError {
Oversized,
Exhausted,
}
impl std::fmt::Display for PoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Oversized => write!(f, "requested capacity exceeds maximum buffer size"),
Self::Exhausted => write!(f, "pool exhausted for required size class"),
}
}
}
impl std::error::Error for PoolError {}
#[cfg(unix)]
fn page_size() -> usize {
let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if size <= 0 {
4096 } else {
size as usize
}
}
#[cfg(not(unix))]
#[allow(clippy::missing_const_for_fn)]
fn page_size() -> usize {
4096
}
const fn cache_line_size() -> usize {
cfg_if::cfg_if! {
if #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] {
128
} else {
64
}
}
}
#[derive(Debug, Clone)]
pub struct BufferPoolConfig {
pub min_size: NonZeroUsize,
pub max_size: NonZeroUsize,
pub max_per_class: NonZeroUsize,
pub prefill: bool,
pub alignment: NonZeroUsize,
}
impl BufferPoolConfig {
pub const fn for_network() -> Self {
let cache_line = NZUsize!(cache_line_size());
Self {
min_size: cache_line,
max_size: NZUsize!(64 * 1024),
max_per_class: NZUsize!(4096),
prefill: false,
alignment: cache_line,
}
}
pub fn for_storage() -> Self {
let page = NZUsize!(page_size());
Self {
min_size: page,
max_size: NZUsize!(8 * 1024 * 1024),
max_per_class: NZUsize!(32),
prefill: false,
alignment: page,
}
}
pub const fn with_min_size(mut self, min_size: NonZeroUsize) -> Self {
self.min_size = min_size;
self
}
pub const fn with_max_size(mut self, max_size: NonZeroUsize) -> Self {
self.max_size = max_size;
self
}
pub const fn with_max_per_class(mut self, max_per_class: NonZeroUsize) -> Self {
self.max_per_class = max_per_class;
self
}
pub const fn with_prefill(mut self, prefill: bool) -> Self {
self.prefill = prefill;
self
}
pub const fn with_alignment(mut self, alignment: NonZeroUsize) -> Self {
self.alignment = alignment;
self
}
pub fn with_budget_bytes(mut self, budget_bytes: NonZeroUsize) -> Self {
let mut class_bytes = 0usize;
for i in 0..self.num_classes() {
class_bytes = class_bytes.saturating_add(self.class_size(i));
}
if class_bytes == 0 {
return self;
}
self.max_per_class = NZUsize!(budget_bytes.get().div_ceil(class_bytes));
self
}
fn validate(&self) {
assert!(
self.alignment.is_power_of_two(),
"alignment must be a power of two"
);
assert!(
self.min_size.is_power_of_two(),
"min_size must be a power of two"
);
assert!(
self.max_size.is_power_of_two(),
"max_size must be a power of two"
);
assert!(
self.min_size >= self.alignment,
"min_size ({}) must be >= alignment ({})",
self.min_size,
self.alignment
);
assert!(
self.max_size >= self.min_size,
"max_size must be >= min_size"
);
}
fn num_classes(&self) -> usize {
if self.max_size < self.min_size {
return 0;
}
(self.max_size.get() / self.min_size.get()).trailing_zeros() as usize + 1
}
fn class_index(&self, size: usize) -> Option<usize> {
if size > self.max_size.get() {
return None;
}
if size <= self.min_size.get() {
return Some(0);
}
let size_class = size.next_power_of_two();
let index = (size_class / self.min_size.get()).trailing_zeros() as usize;
if index < self.num_classes() {
Some(index)
} else {
None
}
}
const fn class_size(&self, index: usize) -> usize {
self.min_size.get() << index
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct SizeClassLabel {
size_class: u64,
}
struct PoolMetrics {
allocated: Family<SizeClassLabel, Gauge>,
available: Family<SizeClassLabel, Gauge>,
allocations_total: Family<SizeClassLabel, Counter>,
exhausted_total: Family<SizeClassLabel, Counter>,
oversized_total: Counter,
}
impl PoolMetrics {
fn new(registry: &mut Registry) -> Self {
let metrics = Self {
allocated: Family::default(),
available: Family::default(),
allocations_total: Family::default(),
exhausted_total: Family::default(),
oversized_total: Counter::default(),
};
registry.register(
"buffer_pool_allocated",
"Number of buffers currently allocated from the pool",
metrics.allocated.clone(),
);
registry.register(
"buffer_pool_available",
"Number of buffers available in the pool",
metrics.available.clone(),
);
registry.register(
"buffer_pool_allocations_total",
"Total number of successful buffer allocations",
metrics.allocations_total.clone(),
);
registry.register(
"buffer_pool_exhausted_total",
"Total number of failed allocations due to pool exhaustion",
metrics.exhausted_total.clone(),
);
registry.register(
"buffer_pool_oversized_total",
"Total number of allocation requests exceeding max buffer size",
metrics.oversized_total.clone(),
);
metrics
}
}
pub(crate) struct AlignedBuffer {
ptr: NonNull<u8>,
layout: Layout,
}
unsafe impl Send for AlignedBuffer {}
unsafe impl Sync for AlignedBuffer {}
impl AlignedBuffer {
fn new(capacity: usize, alignment: usize) -> Self {
assert!(capacity > 0, "capacity must be greater than zero");
let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
let ptr = unsafe { alloc(layout) };
let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout));
Self { ptr, layout }
}
fn new_zeroed(capacity: usize, alignment: usize) -> Self {
assert!(capacity > 0, "capacity must be greater than zero");
let layout = Layout::from_size_align(capacity, alignment).expect("invalid layout");
let ptr = unsafe { alloc_zeroed(layout) };
let ptr = NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout));
Self { ptr, layout }
}
#[inline]
const fn capacity(&self) -> usize {
self.layout.size()
}
#[inline]
const fn as_ptr(&self) -> *mut u8 {
self.ptr.as_ptr()
}
}
impl Drop for AlignedBuffer {
fn drop(&mut self) {
unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
}
}
struct SizeClass {
size: usize,
alignment: usize,
freelist: ArrayQueue<Option<AlignedBuffer>>,
allocated: AtomicUsize,
}
impl SizeClass {
fn new(size: usize, alignment: usize, max_buffers: usize, prefill: bool) -> Self {
let freelist = ArrayQueue::new(max_buffers);
for _ in 0..max_buffers {
let entry = if prefill {
Some(AlignedBuffer::new(size, alignment))
} else {
None
};
let _ = freelist.push(entry);
}
Self {
size,
alignment,
freelist,
allocated: AtomicUsize::new(0),
}
}
}
struct Allocation {
buffer: AlignedBuffer,
is_new: bool,
}
pub(crate) struct BufferPoolInner {
config: BufferPoolConfig,
classes: Vec<SizeClass>,
metrics: PoolMetrics,
}
impl BufferPoolInner {
fn try_alloc(&self, class_index: usize, zero_on_new: bool) -> Option<Allocation> {
let class = &self.classes[class_index];
let label = SizeClassLabel {
size_class: class.size as u64,
};
match class.freelist.pop() {
Some(Some(buffer)) => {
class.allocated.fetch_add(1, Ordering::Relaxed);
self.metrics.allocations_total.get_or_create(&label).inc();
self.metrics.allocated.get_or_create(&label).inc();
self.metrics.available.get_or_create(&label).dec();
Some(Allocation {
buffer,
is_new: false,
})
}
Some(None) => {
class.allocated.fetch_add(1, Ordering::Relaxed);
self.metrics.allocations_total.get_or_create(&label).inc();
self.metrics.allocated.get_or_create(&label).inc();
let buffer = if zero_on_new {
AlignedBuffer::new_zeroed(class.size, class.alignment)
} else {
AlignedBuffer::new(class.size, class.alignment)
};
Some(Allocation {
buffer,
is_new: true,
})
}
None => {
self.metrics.exhausted_total.get_or_create(&label).inc();
None
}
}
}
fn return_buffer(&self, buffer: AlignedBuffer) {
if let Some(class_index) = self.config.class_index(buffer.capacity()) {
let class = &self.classes[class_index];
let label = SizeClassLabel {
size_class: class.size as u64,
};
class.allocated.fetch_sub(1, Ordering::Relaxed);
self.metrics.allocated.get_or_create(&label).dec();
match class.freelist.push(Some(buffer)) {
Ok(()) => {
self.metrics.available.get_or_create(&label).inc();
}
Err(_buffer) => {
}
}
}
}
}
#[derive(Clone)]
pub struct BufferPool {
inner: Arc<BufferPoolInner>,
}
impl std::fmt::Debug for BufferPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufferPool")
.field("config", &self.inner.config)
.field("num_classes", &self.inner.classes.len())
.finish()
}
}
impl BufferPool {
pub(crate) fn new(config: BufferPoolConfig, registry: &mut Registry) -> Self {
config.validate();
let metrics = PoolMetrics::new(registry);
let mut classes = Vec::with_capacity(config.num_classes());
for i in 0..config.num_classes() {
let size = config.class_size(i);
let class = SizeClass::new(
size,
config.alignment.get(),
config.max_per_class.get(),
config.prefill,
);
classes.push(class);
}
if config.prefill {
for class in &classes {
let label = SizeClassLabel {
size_class: class.size as u64,
};
let available = class.freelist.len() as i64;
metrics.available.get_or_create(&label).set(available);
}
}
Self {
inner: Arc::new(BufferPoolInner {
config,
classes,
metrics,
}),
}
}
fn class_index_or_record_oversized(&self, capacity: usize) -> Option<usize> {
let class_index = self.inner.config.class_index(capacity);
if class_index.is_none() {
self.inner.metrics.oversized_total.inc();
}
class_index
}
pub fn try_alloc(&self, capacity: usize) -> Result<IoBufMut, PoolError> {
let class_index = self
.class_index_or_record_oversized(capacity)
.ok_or(PoolError::Oversized)?;
let buffer = self
.inner
.try_alloc(class_index, false)
.map(|allocation| allocation.buffer)
.ok_or(PoolError::Exhausted)?;
let pooled = PooledBufMut::new(buffer, Arc::downgrade(&self.inner));
Ok(IoBufMut::from_pooled(pooled))
}
pub fn alloc(&self, capacity: usize) -> IoBufMut {
self.try_alloc(capacity).unwrap_or_else(|_| {
let size = capacity.max(self.inner.config.min_size.get());
let buffer = AlignedBuffer::new(size, self.inner.config.alignment.get());
IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()))
})
}
pub unsafe fn alloc_len(&self, len: usize) -> IoBufMut {
let mut buf = self.alloc(len);
unsafe { buf.set_len(len) };
buf
}
pub fn try_alloc_zeroed(&self, len: usize) -> Result<IoBufMut, PoolError> {
let class_index = self
.class_index_or_record_oversized(len)
.ok_or(PoolError::Oversized)?;
let allocation = self
.inner
.try_alloc(class_index, true)
.ok_or(PoolError::Exhausted)?;
let mut buf = IoBufMut::from_pooled(PooledBufMut::new(
allocation.buffer,
Arc::downgrade(&self.inner),
));
if allocation.is_new {
unsafe { buf.set_len(len) };
} else {
buf.put_bytes(0, len);
}
Ok(buf)
}
pub fn alloc_zeroed(&self, len: usize) -> IoBufMut {
self.try_alloc_zeroed(len).unwrap_or_else(|_| {
let size = len.max(self.inner.config.min_size.get());
let buffer = AlignedBuffer::new_zeroed(size, self.inner.config.alignment.get());
let mut buf = IoBufMut::from_pooled(PooledBufMut::new(buffer, Weak::new()));
unsafe { buf.set_len(len) };
buf
})
}
pub fn config(&self) -> &BufferPoolConfig {
&self.inner.config
}
}
struct PooledBufInner {
buffer: ManuallyDrop<AlignedBuffer>,
pool: Weak<BufferPoolInner>,
}
impl PooledBufInner {
const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
Self {
buffer: ManuallyDrop::new(buffer),
pool,
}
}
#[inline]
fn capacity(&self) -> usize {
self.buffer.capacity()
}
}
impl Drop for PooledBufInner {
fn drop(&mut self) {
let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) };
if let Some(pool) = self.pool.upgrade() {
pool.return_buffer(buffer);
}
}
}
#[derive(Clone)]
pub(crate) struct PooledBuf {
inner: Arc<PooledBufInner>,
offset: usize,
len: usize,
}
impl std::fmt::Debug for PooledBuf {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledBuf")
.field("offset", &self.offset)
.field("len", &self.len)
.field("capacity", &self.inner.capacity())
.finish()
}
}
impl PooledBuf {
#[inline]
pub fn is_tracked(&self) -> bool {
self.inner.pool.strong_count() > 0
}
#[inline]
pub fn as_ptr(&self) -> *const u8 {
unsafe { self.inner.buffer.as_ptr().add(self.offset) }
}
pub fn slice(&self, range: impl RangeBounds<usize>) -> Option<Self> {
let start = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n.checked_add(1).expect("range start overflow"),
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.checked_add(1).expect("range end overflow"),
Bound::Excluded(&n) => n,
Bound::Unbounded => self.len,
};
assert!(start <= end, "slice start must be <= end");
assert!(end <= self.len, "slice out of bounds");
if start == end {
return None;
}
Some(Self {
inner: self.inner.clone(),
offset: self.offset + start,
len: end - start,
})
}
#[inline]
pub fn split_to(&mut self, at: usize) -> Self {
assert!(
at <= self.len,
"split_to out of bounds: {:?} <= {:?}",
at,
self.len,
);
let prefix = Self {
inner: self.inner.clone(),
offset: self.offset,
len: at,
};
self.offset += at;
self.len -= at;
prefix
}
pub fn try_into_mut(self) -> Result<PooledBufMut, Self> {
let Self { inner, offset, len } = self;
match Arc::try_unwrap(inner) {
Ok(inner) => Ok(PooledBufMut {
inner: ManuallyDrop::new(inner),
cursor: offset,
len: offset.checked_add(len).expect("slice end overflow"),
}),
Err(inner) => Err(Self { inner, offset, len }),
}
}
pub fn into_bytes(self) -> Bytes {
if self.len == 0 {
return Bytes::new();
}
Bytes::from_owner(self)
}
}
impl AsRef<[u8]> for PooledBuf {
#[inline]
fn as_ref(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.inner.buffer.as_ptr().add(self.offset), self.len) }
}
}
impl Buf for PooledBuf {
#[inline]
fn remaining(&self) -> usize {
self.len
}
#[inline]
fn chunk(&self) -> &[u8] {
self.as_ref()
}
#[inline]
fn advance(&mut self, cnt: usize) {
assert!(cnt <= self.len, "cannot advance past end of buffer");
self.offset += cnt;
self.len -= cnt;
}
#[inline]
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
assert!(len <= self.len, "copy_to_bytes out of bounds");
if len == 0 {
return Bytes::new();
}
let slice = Self {
inner: self.inner.clone(),
offset: self.offset,
len,
};
self.advance(len);
slice.into_bytes()
}
}
pub(crate) struct PooledBufMut {
inner: ManuallyDrop<PooledBufInner>,
cursor: usize,
len: usize,
}
impl std::fmt::Debug for PooledBufMut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledBufMut")
.field("cursor", &self.cursor)
.field("len", &self.len)
.field("capacity", &self.capacity())
.finish()
}
}
impl PooledBufMut {
const fn new(buffer: AlignedBuffer, pool: Weak<BufferPoolInner>) -> Self {
Self {
inner: ManuallyDrop::new(PooledBufInner::new(buffer, pool)),
cursor: 0,
len: 0,
}
}
#[inline]
pub fn is_tracked(&self) -> bool {
self.inner.pool.strong_count() > 0
}
#[inline]
pub const fn len(&self) -> usize {
self.len - self.cursor
}
#[inline]
pub const fn is_empty(&self) -> bool {
self.cursor == self.len
}
#[inline]
pub fn capacity(&self) -> usize {
self.inner.capacity() - self.cursor
}
#[inline]
fn raw_capacity(&self) -> usize {
self.inner.capacity()
}
#[inline]
pub fn as_mut_ptr(&mut self) -> *mut u8 {
unsafe { self.inner.buffer.as_ptr().add(self.cursor) }
}
#[inline]
pub const unsafe fn set_len(&mut self, len: usize) {
self.len = self.cursor + len;
}
#[inline]
pub const fn clear(&mut self) {
self.len = self.cursor;
}
#[inline]
pub const fn truncate(&mut self, len: usize) {
if len < self.len() {
self.len = self.cursor + len;
}
}
fn into_pooled(self) -> PooledBuf {
let mut me = ManuallyDrop::new(self);
let inner = unsafe { ManuallyDrop::take(&mut me.inner) };
PooledBuf {
inner: Arc::new(inner),
offset: me.cursor,
len: me.len - me.cursor,
}
}
pub fn freeze(self) -> IoBuf {
IoBuf::from_pooled(self.into_pooled())
}
pub fn into_bytes(self) -> Bytes {
if self.is_empty() {
return Bytes::new();
}
Bytes::from_owner(self.into_pooled())
}
}
impl AsRef<[u8]> for PooledBufMut {
#[inline]
fn as_ref(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(self.inner.buffer.as_ptr().add(self.cursor), self.len())
}
}
}
impl AsMut<[u8]> for PooledBufMut {
#[inline]
fn as_mut(&mut self) -> &mut [u8] {
let len = self.len();
unsafe { std::slice::from_raw_parts_mut(self.inner.buffer.as_ptr().add(self.cursor), len) }
}
}
impl Drop for PooledBufMut {
fn drop(&mut self) {
unsafe { ManuallyDrop::drop(&mut self.inner) };
}
}
impl Buf for PooledBufMut {
#[inline]
fn remaining(&self) -> usize {
self.len - self.cursor
}
#[inline]
fn chunk(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
self.inner.buffer.as_ptr().add(self.cursor),
self.len - self.cursor,
)
}
}
#[inline]
fn advance(&mut self, cnt: usize) {
let remaining = self.len - self.cursor;
assert!(cnt <= remaining, "cannot advance past end of buffer");
self.cursor += cnt;
}
}
unsafe impl BufMut for PooledBufMut {
#[inline]
fn remaining_mut(&self) -> usize {
self.raw_capacity() - self.len
}
#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
assert!(
cnt <= self.remaining_mut(),
"cannot advance past end of buffer"
);
self.len += cnt;
}
#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
let raw_cap = self.raw_capacity();
let len = self.len;
unsafe {
let ptr = self.inner.buffer.as_ptr().add(len);
bytes::buf::UninitSlice::from_raw_parts_mut(ptr, raw_cap - len)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
use std::{sync::mpsc, thread};
fn test_registry() -> Registry {
Registry::default()
}
fn test_config(min_size: usize, max_size: usize, max_per_class: usize) -> BufferPoolConfig {
BufferPoolConfig {
min_size: NZUsize!(min_size),
max_size: NZUsize!(max_size),
max_per_class: NZUsize!(max_per_class),
prefill: false,
alignment: NZUsize!(page_size()),
}
}
#[test]
fn test_page_size() {
let size = page_size();
assert!(size >= 4096);
assert!(size.is_power_of_two());
}
#[test]
fn test_aligned_buffer() {
let page = page_size();
let buf = AlignedBuffer::new(4096, page);
assert_eq!(buf.capacity(), 4096);
assert!((buf.as_ptr() as usize).is_multiple_of(page));
let cache_line = cache_line_size();
let buf2 = AlignedBuffer::new(4096, cache_line);
assert_eq!(buf2.capacity(), 4096);
assert!((buf2.as_ptr() as usize).is_multiple_of(cache_line));
}
#[test]
#[should_panic(expected = "capacity must be greater than zero")]
fn test_aligned_buffer_zero_capacity_panics() {
let _ = AlignedBuffer::new(0, page_size());
}
#[test]
#[should_panic(expected = "capacity must be greater than zero")]
fn test_aligned_buffer_zeroed_zero_capacity_panics() {
let _ = AlignedBuffer::new_zeroed(0, page_size());
}
#[test]
fn test_config_validation() {
let page = page_size();
let config = test_config(page, page * 4, 10);
config.validate();
}
#[test]
#[should_panic(expected = "min_size must be a power of two")]
fn test_config_invalid_min_size() {
let config = BufferPoolConfig {
min_size: NZUsize!(3000),
max_size: NZUsize!(8192),
max_per_class: NZUsize!(10),
prefill: false,
alignment: NZUsize!(page_size()),
};
config.validate();
}
#[test]
fn test_config_class_index() {
let page = page_size();
let config = test_config(page, page * 8, 10);
assert_eq!(config.num_classes(), 4);
assert_eq!(config.class_index(1), Some(0));
assert_eq!(config.class_index(page), Some(0));
assert_eq!(config.class_index(page + 1), Some(1));
assert_eq!(config.class_index(page * 2), Some(1));
assert_eq!(config.class_index(page * 8), Some(3));
assert_eq!(config.class_index(page * 8 + 1), None);
}
#[test]
fn test_pool_alloc_and_return() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let buf = pool.try_alloc(100).unwrap();
assert!(buf.capacity() >= page);
assert_eq!(buf.len(), 0);
drop(buf);
let buf2 = pool.try_alloc(100).unwrap();
assert!(buf2.capacity() >= page);
assert_eq!(buf2.len(), 0);
}
#[test]
fn test_alloc_len_sets_len() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let mut buf = unsafe { pool.alloc_len(100) };
assert_eq!(buf.len(), 100);
buf.as_mut().fill(0xAB);
let frozen = buf.freeze();
assert_eq!(frozen.as_ref(), &[0xAB; 100]);
}
#[test]
fn test_alloc_zeroed_sets_len_and_zeros() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let buf = pool.alloc_zeroed(100);
assert_eq!(buf.len(), 100);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_try_alloc_zeroed_sets_len_and_zeros() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 2), &mut registry);
let buf = pool.try_alloc_zeroed(100).unwrap();
assert!(buf.is_pooled());
assert_eq!(buf.len(), 100);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_alloc_zeroed_fallback_uses_untracked_zeroed_buffer() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
let _pooled = pool.try_alloc(100).unwrap();
let buf = pool.alloc_zeroed(100);
assert!(!buf.is_pooled());
assert_eq!(buf.len(), 100);
assert!(buf.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_alloc_zeroed_reuses_dirty_pooled_buffer() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
let mut first = pool.alloc_zeroed(100);
assert!(first.is_pooled());
assert!(first.as_ref().iter().all(|&b| b == 0));
first.as_mut().fill(0xAB);
drop(first);
let second = pool.alloc_zeroed(100);
assert!(second.is_pooled());
assert_eq!(second.len(), 100);
assert!(second.as_ref().iter().all(|&b| b == 0));
}
#[test]
fn test_pool_exhaustion() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let _buf1 = pool.try_alloc(100).expect("first alloc should succeed");
let _buf2 = pool.try_alloc(100).expect("second alloc should succeed");
assert!(pool.try_alloc(100).is_err());
}
#[test]
fn test_pool_oversized() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 2, 10), &mut registry);
assert!(pool.try_alloc(page * 4).is_err());
}
#[test]
fn test_pool_size_classes() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
let buf1 = pool.try_alloc(100).unwrap();
assert_eq!(buf1.capacity(), page);
let buf2 = pool.try_alloc(page + 1).unwrap();
assert_eq!(buf2.capacity(), page * 2);
let buf3 = pool.try_alloc(page * 3).unwrap();
assert_eq!(buf3.capacity(), page * 4);
}
#[test]
fn test_pooled_buf_mut_freeze() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(11).unwrap();
buf.put_slice(&[0u8; 11]);
assert_eq!(buf.len(), 11);
buf.as_mut()[..5].copy_from_slice(&[1, 2, 3, 4, 5]);
let iobuf = buf.freeze();
assert_eq!(iobuf.len(), 11);
assert_eq!(&iobuf.as_ref()[..5], &[1, 2, 3, 4, 5]);
let slice = iobuf.slice(0..5);
assert_eq!(slice.len(), 5);
}
#[test]
fn test_prefill() {
let page = NZUsize!(page_size());
let mut registry = test_registry();
let pool = BufferPool::new(
BufferPoolConfig {
min_size: page,
max_size: page,
max_per_class: NZUsize!(5),
prefill: true,
alignment: page,
},
&mut registry,
);
let mut bufs = Vec::new();
for _ in 0..5 {
bufs.push(pool.try_alloc(100).expect("alloc should succeed"));
}
assert!(pool.try_alloc(100).is_err());
}
#[test]
fn test_config_for_network() {
let config = BufferPoolConfig::for_network();
config.validate();
assert_eq!(config.min_size.get(), cache_line_size());
assert_eq!(config.max_size.get(), 64 * 1024);
assert_eq!(config.max_per_class.get(), 4096);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), cache_line_size());
}
#[test]
fn test_config_for_storage() {
let config = BufferPoolConfig::for_storage();
config.validate();
assert_eq!(config.min_size.get(), page_size());
assert_eq!(config.max_size.get(), 8 * 1024 * 1024);
assert_eq!(config.max_per_class.get(), 32);
assert!(!config.prefill);
assert_eq!(config.alignment.get(), page_size());
}
#[test]
fn test_storage_config_supports_default_allocations() {
let mut registry = test_registry();
let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
let buf = pool.try_alloc(8 * 1024 * 1024).unwrap();
assert_eq!(buf.capacity(), 8 * 1024 * 1024);
}
#[test]
fn test_config_builders() {
let page = NZUsize!(page_size());
let config = BufferPoolConfig::for_storage()
.with_max_per_class(NZUsize!(64))
.with_prefill(true)
.with_min_size(page)
.with_max_size(NZUsize!(128 * 1024));
config.validate();
assert_eq!(config.min_size, page);
assert_eq!(config.max_size.get(), 128 * 1024);
assert_eq!(config.max_per_class.get(), 64);
assert!(config.prefill);
assert_eq!(config.alignment.get(), page_size());
let aligned = BufferPoolConfig::for_network()
.with_alignment(NZUsize!(256))
.with_min_size(NZUsize!(256));
aligned.validate();
assert_eq!(aligned.alignment.get(), 256);
assert_eq!(aligned.min_size.get(), 256);
}
#[test]
fn test_config_with_budget_bytes() {
let config = BufferPoolConfig {
min_size: NZUsize!(4),
max_size: NZUsize!(16),
max_per_class: NZUsize!(1),
prefill: false,
alignment: NZUsize!(4),
}
.with_budget_bytes(NZUsize!(280));
assert_eq!(config.max_per_class.get(), 10);
let small_budget = BufferPoolConfig {
min_size: NZUsize!(4),
max_size: NZUsize!(16),
max_per_class: NZUsize!(1),
prefill: false,
alignment: NZUsize!(4),
}
.with_budget_bytes(NZUsize!(10));
assert_eq!(small_budget.max_per_class.get(), 1);
}
#[test]
fn test_pool_error_display() {
assert_eq!(
PoolError::Oversized.to_string(),
"requested capacity exceeds maximum buffer size"
);
assert_eq!(
PoolError::Exhausted.to_string(),
"pool exhausted for required size class"
);
}
#[test]
fn test_config_invalid_range_edge_paths() {
let invalid_order = BufferPoolConfig {
min_size: NZUsize!(8),
max_size: NZUsize!(4),
max_per_class: NZUsize!(1),
prefill: false,
alignment: NZUsize!(4),
};
assert_eq!(invalid_order.num_classes(), 0);
let unchanged = invalid_order.clone().with_budget_bytes(NZUsize!(128));
assert_eq!(unchanged.max_per_class, invalid_order.max_per_class);
let non_power_two_max = BufferPoolConfig {
min_size: NZUsize!(8),
max_size: NZUsize!(12),
max_per_class: NZUsize!(1),
prefill: false,
alignment: NZUsize!(4),
};
assert_eq!(non_power_two_max.class_index(12), None);
}
#[test]
fn test_pool_debug_and_config_accessor() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let debug = format!("{pool:?}");
assert!(debug.contains("BufferPool"));
assert!(debug.contains("num_classes"));
assert_eq!(pool.config().min_size.get(), page);
}
#[test]
fn test_return_buffer_freelist_full_drops_extra() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
let tracked = pool.try_alloc(page).expect("tracked allocation");
drop(tracked);
let class_index = pool
.inner
.config
.class_index(page)
.expect("class exists for page-sized buffer");
pool.inner.classes[class_index]
.allocated
.store(1, Ordering::Relaxed);
pool.inner
.return_buffer(AlignedBuffer::new(page, page_size()));
assert_eq!(
pool.inner.classes[class_index]
.allocated
.load(Ordering::Relaxed),
0
);
}
#[test]
fn test_return_buffer_ignores_unmatched_class() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 1), &mut registry);
pool.inner
.return_buffer(AlignedBuffer::new(page * 2, page_size()));
assert_eq!(get_allocated(&pool, page), 0);
}
#[test]
fn test_pooled_debug_and_empty_into_bytes_paths() {
let page = page_size();
let pooled_mut_debug = {
let pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
format!("{pooled_mut:?}")
};
assert!(pooled_mut_debug.contains("PooledBufMut"));
assert!(pooled_mut_debug.contains("cursor"));
let empty_from_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
assert!(empty_from_mut.into_bytes().is_empty());
let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new()).into_pooled();
let pooled_debug = format!("{pooled:?}");
assert!(pooled_debug.contains("PooledBuf"));
assert!(pooled_debug.contains("capacity"));
assert!(pooled.into_bytes().is_empty());
}
#[test]
#[should_panic(expected = "range start overflow")]
fn test_pooled_slice_excluded_start_overflow() {
let page = page_size();
let pooled = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new()).into_pooled();
let _ = pooled.slice((Bound::Excluded(usize::MAX), Bound::<usize>::Unbounded));
}
fn get_allocated(pool: &BufferPool, size: usize) -> usize {
let class_index = pool.inner.config.class_index(size).unwrap();
pool.inner.classes[class_index]
.allocated
.load(Ordering::Relaxed)
}
fn get_available(pool: &BufferPool, size: usize) -> i64 {
let class_index = pool.inner.config.class_index(size).unwrap();
let label = SizeClassLabel {
size_class: pool.inner.classes[class_index].size as u64,
};
pool.inner.metrics.available.get_or_create(&label).get()
}
#[test]
fn test_freeze_returns_buffer_to_pool() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 0);
let buf = pool.try_alloc(100).unwrap();
assert_eq!(get_allocated(&pool, page), 1);
assert_eq!(get_available(&pool, page), 0);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_refcount_and_copy_to_bytes_paths() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
{
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0xAA; 100]);
let iobuf = buf.freeze();
let clone = iobuf.clone();
let slice = iobuf.slice(10..40);
let empty = iobuf.slice(10..10);
assert!(empty.is_empty());
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 1);
drop(slice);
assert_eq!(get_allocated(&pool, page), 1);
drop(clone);
assert_eq!(get_allocated(&pool, page), 0);
}
{
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0x42; 100]);
let mut iobuf = buf.freeze();
let zero = iobuf.copy_to_bytes(0);
assert!(zero.is_empty());
assert_eq!(iobuf.remaining(), 100);
let partial = iobuf.copy_to_bytes(30);
assert_eq!(&partial[..], &[0x42; 30]);
assert_eq!(iobuf.remaining(), 70);
let rest = iobuf.copy_to_bytes(70);
assert_eq!(&rest[..], &[0x42; 70]);
assert_eq!(iobuf.remaining(), 0);
let empty = iobuf.copy_to_bytes(0);
assert!(empty.is_empty());
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 1);
drop(zero);
drop(partial);
assert_eq!(get_allocated(&pool, page), 1);
drop(rest);
assert_eq!(get_allocated(&pool, page), 0);
}
{
let buf = pool.try_alloc(100).unwrap();
let mut iobufmut = buf;
iobufmut.put_slice(&[0x7E; 100]);
let zero = iobufmut.copy_to_bytes(0);
assert!(zero.is_empty());
assert_eq!(iobufmut.remaining(), 100);
let partial = iobufmut.copy_to_bytes(30);
assert_eq!(&partial[..], &[0x7E; 30]);
assert_eq!(iobufmut.remaining(), 70);
let rest = iobufmut.copy_to_bytes(70);
assert_eq!(&rest[..], &[0x7E; 70]);
assert_eq!(iobufmut.remaining(), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 1);
drop(zero);
drop(partial);
assert_eq!(get_allocated(&pool, page), 1);
drop(rest);
assert_eq!(get_allocated(&pool, page), 0);
}
}
#[test]
fn test_iobuf_to_iobufmut_conversion_reuses_pool_for_non_full_unique_view() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let buf = pool.try_alloc(100).unwrap();
assert_eq!(get_allocated(&pool, page), 1);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
let iobufmut: IoBufMut = iobuf.into();
assert_eq!(
get_allocated(&pool, page),
1,
"pooled buffer should remain allocated after zero-copy IoBuf->IoBufMut conversion"
);
assert_eq!(get_available(&pool, page), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_to_iobufmut_conversion_preserves_full_unique_view() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xEE; page]);
let iobuf = buf.freeze();
let iobufmut: IoBufMut = iobuf.into();
assert_eq!(iobufmut.len(), page);
assert!(iobufmut.as_ref().iter().all(|&b| b == 0xEE));
assert_eq!(get_allocated(&pool, page), 1);
assert_eq!(get_available(&pool, page), 0);
drop(iobufmut);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_try_into_mut_recycles_full_unique_view() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xAB; page]);
let iobuf = buf.freeze();
assert_eq!(get_allocated(&pool, page), 1);
let recycled = iobuf
.try_into_mut()
.expect("unique full-view pooled buffer should recycle");
assert_eq!(recycled.len(), page);
assert!(recycled.as_ref().iter().all(|&b| b == 0xAB));
assert_eq!(recycled.capacity(), page);
assert_eq!(get_allocated(&pool, page), 1);
drop(recycled);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
}
#[test]
fn test_iobuf_try_into_mut_succeeds_for_unique_slice_and_fails_for_shared() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xCD; page]);
let iobuf = buf.freeze();
let sliced = iobuf.slice(1..page);
drop(iobuf);
let recycled = sliced
.try_into_mut()
.expect("unique sliced pooled buffer should recycle");
assert_eq!(recycled.len(), page - 1);
assert!(recycled.as_ref().iter().all(|&b| b == 0xCD));
assert_eq!(recycled.capacity(), page - 1);
assert_eq!(get_allocated(&pool, page), 1);
drop(recycled);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 1);
let mut buf = pool.try_alloc(page).unwrap();
buf.put_slice(&vec![0xEF; page]);
let iobuf = buf.freeze();
let cloned = iobuf.clone();
let iobuf = iobuf
.try_into_mut()
.expect_err("shared pooled buffer must not convert to mutable");
drop(cloned);
drop(iobuf);
assert_eq!(get_allocated(&pool, page), 0);
assert!(get_available(&pool, page) >= 1);
}
#[test]
fn test_multithreaded_alloc_freeze_return() {
let page = page_size();
let mut registry = test_registry();
let pool = Arc::new(BufferPool::new(test_config(page, page, 100), &mut registry));
let mut handles = vec![];
cfg_if::cfg_if! {
if #[cfg(miri)] {
let iterations = 100;
} else {
let iterations = 1000;
}
}
for _ in 0..10 {
let pool = pool.clone();
let handle = thread::spawn(move || {
for _ in 0..iterations {
let buf = pool.try_alloc(100).unwrap();
let iobuf = buf.freeze();
let clones: Vec<_> = (0..5).map(|_| iobuf.clone()).collect();
drop(iobuf);
for clone in clones {
drop(clone);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let class_index = pool.inner.config.class_index(page).unwrap();
let allocated = pool.inner.classes[class_index]
.allocated
.load(Ordering::Relaxed);
assert_eq!(
allocated, 0,
"all buffers should be returned after multithreaded test"
);
}
#[test]
fn test_cross_thread_buffer_return() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 100), &mut registry);
let (tx, rx) = mpsc::channel();
for _ in 0..50 {
let buf = pool.try_alloc(100).unwrap();
let iobuf = buf.freeze();
tx.send(iobuf).unwrap();
}
drop(tx);
let handle = thread::spawn(move || {
while let Ok(iobuf) = rx.recv() {
drop(iobuf);
}
});
handle.join().unwrap();
assert_eq!(get_allocated(&pool, page), 0);
}
#[test]
fn test_pool_dropped_before_buffer() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0u8; 100]);
let iobuf = buf.freeze();
drop(pool);
assert_eq!(iobuf.len(), 100);
drop(iobuf);
}
#[test]
fn test_bytes_parity_iobuf_buf_trait() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let data: Vec<u8> = (0..100u8).collect();
let mut pooled_mut = pool.try_alloc(data.len()).unwrap();
pooled_mut.put_slice(&data);
let mut pooled = pooled_mut.freeze();
let mut bytes = Bytes::from(data);
assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
Buf::advance(&mut bytes, 13);
Buf::advance(&mut pooled, 13);
assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
let bytes_zero = Buf::copy_to_bytes(&mut bytes, 0);
let pooled_zero = Buf::copy_to_bytes(&mut pooled, 0);
assert_eq!(bytes_zero, pooled_zero);
assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
let bytes_mid = Buf::copy_to_bytes(&mut bytes, 17);
let pooled_mid = Buf::copy_to_bytes(&mut pooled, 17);
assert_eq!(bytes_mid, pooled_mid);
assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
let remaining = Buf::remaining(&bytes);
let bytes_rest = Buf::copy_to_bytes(&mut bytes, remaining);
let pooled_rest = Buf::copy_to_bytes(&mut pooled, remaining);
assert_eq!(bytes_rest, pooled_rest);
assert_eq!(Buf::remaining(&bytes), 0);
assert_eq!(Buf::remaining(&pooled), 0);
assert!(!Buf::has_remaining(&bytes));
assert!(!Buf::has_remaining(&pooled));
}
#[test]
fn test_bytes_parity_iobuf_slice() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let data: Vec<u8> = (0..32u8).collect();
let mut pooled_mut = pool.try_alloc(data.len()).unwrap();
pooled_mut.put_slice(&data);
let pooled = pooled_mut.freeze();
let bytes = Bytes::from(data);
assert_eq!(pooled.slice(..5).as_ref(), bytes.slice(..5).as_ref());
assert_eq!(pooled.slice(6..).as_ref(), bytes.slice(6..).as_ref());
assert_eq!(pooled.slice(3..8).as_ref(), bytes.slice(3..8).as_ref());
assert_eq!(pooled.slice(..=7).as_ref(), bytes.slice(..=7).as_ref());
assert_eq!(pooled.slice(10..10).as_ref(), bytes.slice(10..10).as_ref());
}
#[test]
fn test_bytes_parity_iobuf_split_to() {
let page = page_size();
let mut pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
pooled_mut.put_slice(b"abcdefgh");
let mut pooled = pooled_mut.into_pooled();
let mut bytes = Bytes::from_static(b"abcdefgh");
assert_eq!(pooled.split_to(0).as_ref(), bytes.split_to(0).as_ref());
assert_eq!(pooled.as_ref(), bytes.as_ref());
assert_eq!(pooled.split_to(3).as_ref(), bytes.split_to(3).as_ref());
assert_eq!(pooled.as_ref(), bytes.as_ref());
let remaining = bytes.remaining();
assert_eq!(
pooled.split_to(remaining).as_ref(),
bytes.split_to(remaining).as_ref()
);
assert_eq!(pooled.as_ref(), bytes.as_ref());
}
#[test]
#[should_panic(expected = "split_to out of bounds")]
fn test_iobuf_split_to_out_of_bounds() {
let page = page_size();
let mut pooled_mut = PooledBufMut::new(AlignedBuffer::new(page, page), Weak::new());
pooled_mut.put_slice(b"abc");
let mut pooled = pooled_mut.into_pooled();
let _ = pooled.split_to(4);
}
#[test]
fn test_bytesmut_parity_buf_trait() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let mut bytes = BytesMut::with_capacity(100);
bytes.put_slice(&[0xAAu8; 50]);
let mut pooled = pool.try_alloc(100).unwrap();
pooled.put_slice(&[0xAAu8; 50]);
assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
Buf::advance(&mut bytes, 10);
Buf::advance(&mut pooled, 10);
assert_eq!(Buf::remaining(&bytes), Buf::remaining(&pooled));
assert_eq!(Buf::chunk(&bytes), Buf::chunk(&pooled));
let remaining = Buf::remaining(&bytes);
Buf::advance(&mut bytes, remaining);
Buf::advance(&mut pooled, remaining);
assert_eq!(Buf::remaining(&bytes), 0);
assert_eq!(Buf::remaining(&pooled), 0);
assert!(!Buf::has_remaining(&bytes));
assert!(!Buf::has_remaining(&pooled));
}
#[test]
fn test_bytesmut_parity_bufmut_trait() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let mut bytes = BytesMut::with_capacity(100);
let mut pooled = pool.try_alloc(100).unwrap();
assert!(BufMut::remaining_mut(&bytes) >= 100);
assert!(BufMut::remaining_mut(&pooled) >= 100);
BufMut::put_slice(&mut bytes, b"hello");
BufMut::put_slice(&mut pooled, b"hello");
assert_eq!(bytes.as_ref(), pooled.as_ref());
BufMut::put_u8(&mut bytes, 0x42);
BufMut::put_u8(&mut pooled, 0x42);
assert_eq!(bytes.as_ref(), pooled.as_ref());
let bytes_chunk = BufMut::chunk_mut(&mut bytes);
let pooled_chunk = BufMut::chunk_mut(&mut pooled);
assert!(bytes_chunk.len() > 0);
assert!(pooled_chunk.len() > 0);
}
#[test]
fn test_bytesmut_parity_after_advance_paths() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page * 4, 10), &mut registry);
{
let mut bytes = BytesMut::with_capacity(100);
bytes.put_slice(&[0xAAu8; 50]);
Buf::advance(&mut bytes, 10);
let mut pooled = pool.try_alloc(100).unwrap();
pooled.put_slice(&[0xAAu8; 50]);
Buf::advance(&mut pooled, 10);
bytes.truncate(20);
pooled.truncate(20);
assert_eq!(bytes.as_ref(), pooled.as_ref());
}
{
let mut bytes = BytesMut::with_capacity(100);
bytes.put_slice(&[0xAAu8; 50]);
Buf::advance(&mut bytes, 10);
let mut pooled = pool.try_alloc(100).unwrap();
pooled.put_slice(&[0xAAu8; 50]);
Buf::advance(&mut pooled, 10);
bytes.clear();
pooled.clear();
assert_eq!(bytes.len(), 0);
assert_eq!(pooled.len(), 0);
}
{
let mut bytes = BytesMut::with_capacity(page);
bytes.resize(50, 0xBB);
Buf::advance(&mut bytes, 20);
let mut pooled = pool.try_alloc(page).unwrap();
pooled.put_slice(&[0xBB; 50]);
Buf::advance(&mut pooled, 20);
assert_eq!(bytes.capacity(), pooled.capacity());
unsafe {
bytes.set_len(25);
pooled.set_len(25);
}
assert_eq!(bytes.as_ref(), pooled.as_ref());
let bytes_cap = bytes.capacity();
let pooled_cap = pooled.capacity();
bytes.clear();
pooled.clear();
assert_eq!(bytes.capacity(), bytes_cap);
assert_eq!(pooled.capacity(), pooled_cap);
}
{
let mut bytes = BytesMut::with_capacity(100);
bytes.resize(30, 0xAA);
Buf::advance(&mut bytes, 10);
bytes.put_slice(&[0xBB; 10]);
bytes.truncate(100);
let mut pooled = pool.try_alloc(100).unwrap();
pooled.put_slice(&[0xAA; 30]);
Buf::advance(&mut pooled, 10);
pooled.put_slice(&[0xBB; 10]);
pooled.truncate(100);
assert_eq!(bytes.as_ref(), pooled.as_ref());
}
}
#[test]
fn test_pool_exhaustion_and_recovery() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 3), &mut registry);
let buf1 = pool.try_alloc(100).expect("first alloc");
let buf2 = pool.try_alloc(100).expect("second alloc");
let buf3 = pool.try_alloc(100).expect("third alloc");
assert!(pool.try_alloc(100).is_err(), "pool should be exhausted");
drop(buf1);
let buf4 = pool.try_alloc(100).expect("alloc after return");
assert!(pool.try_alloc(100).is_err(), "pool exhausted again");
drop(buf2);
drop(buf3);
drop(buf4);
assert_eq!(get_allocated(&pool, page), 0);
assert_eq!(get_available(&pool, page), 3);
let _buf5 = pool.try_alloc(100).expect("reuse from freelist");
assert_eq!(get_available(&pool, page), 2);
}
#[test]
fn test_try_alloc_errors() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let result = pool.try_alloc(page * 10);
assert_eq!(result.unwrap_err(), PoolError::Oversized);
let _buf1 = pool.try_alloc(100).unwrap();
let _buf2 = pool.try_alloc(100).unwrap();
let result = pool.try_alloc(100);
assert_eq!(result.unwrap_err(), PoolError::Exhausted);
}
#[test]
fn test_try_alloc_zeroed_errors() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let result = pool.try_alloc_zeroed(page * 10);
assert_eq!(result.unwrap_err(), PoolError::Oversized);
let _buf1 = pool.try_alloc_zeroed(100).unwrap();
let _buf2 = pool.try_alloc_zeroed(100).unwrap();
let result = pool.try_alloc_zeroed(100);
assert_eq!(result.unwrap_err(), PoolError::Exhausted);
}
#[test]
fn test_fallback_allocation() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let buf1 = pool.try_alloc(100).unwrap();
let buf2 = pool.try_alloc(100).unwrap();
assert!(buf1.is_pooled());
assert!(buf2.is_pooled());
let mut fallback_exhausted = pool.alloc(100);
assert!(!fallback_exhausted.is_pooled());
assert!((fallback_exhausted.as_mut_ptr() as usize).is_multiple_of(page));
let mut fallback_oversized = pool.alloc(page * 10);
assert!(!fallback_oversized.is_pooled());
assert!((fallback_oversized.as_mut_ptr() as usize).is_multiple_of(page));
assert_eq!(get_allocated(&pool, page), 2);
drop(fallback_exhausted);
drop(fallback_oversized);
assert_eq!(get_allocated(&pool, page), 2);
drop(buf1);
drop(buf2);
assert_eq!(get_allocated(&pool, page), 0);
}
#[test]
fn test_is_pooled() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let pooled = pool.try_alloc(100).unwrap();
assert!(pooled.is_pooled());
let owned = IoBufMut::with_capacity(100);
assert!(!owned.is_pooled());
}
#[test]
fn test_iobuf_is_pooled() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 2), &mut registry);
let pooled = pool.try_alloc(100).unwrap().freeze();
assert!(pooled.is_pooled());
let fallback = pool.alloc(page * 10).freeze();
assert!(!fallback.is_pooled());
let bytes = IoBuf::copy_from_slice(b"hello");
assert!(!bytes.is_pooled());
}
#[test]
fn test_buffer_alignment() {
let page = page_size();
let cache_line = cache_line_size();
let mut registry = test_registry();
cfg_if::cfg_if! {
if #[cfg(miri)] {
let storage_config = BufferPoolConfig {
max_per_class: NZUsize!(32),
..BufferPoolConfig::for_storage()
};
let network_config = BufferPoolConfig {
max_per_class: NZUsize!(32),
..BufferPoolConfig::for_network()
};
} else {
let storage_config = BufferPoolConfig::for_storage();
let network_config = BufferPoolConfig::for_network();
}
}
let storage_buffer_pool = BufferPool::new(storage_config, &mut registry);
let mut buf = storage_buffer_pool.try_alloc(100).unwrap();
assert_eq!(
buf.as_mut_ptr() as usize % page,
0,
"storage buffer not page-aligned"
);
let network_buffer_pool = BufferPool::new(network_config, &mut registry);
let mut buf = network_buffer_pool.try_alloc(100).unwrap();
assert_eq!(
buf.as_mut_ptr() as usize % cache_line,
0,
"network buffer not cache-line aligned"
);
}
#[test]
fn test_alloc_and_freeze_view_paths() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let buf = pool.try_alloc(0).expect("zero capacity should succeed");
assert_eq!(buf.capacity(), page);
assert_eq!(buf.len(), 0);
let buf = pool.try_alloc(page).expect("exact max size should succeed");
assert_eq!(buf.capacity(), page);
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0x42; 100]);
Buf::advance(&mut buf, 100);
assert!(buf.freeze().is_empty());
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0xAA; 50]);
Buf::advance(&mut buf, 20);
let frozen = buf.freeze();
assert_eq!(frozen.len(), 30);
assert_eq!(frozen.as_ref(), &[0xAA; 30]);
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0xAA; 50]);
buf.clear();
let frozen = buf.freeze();
assert!(frozen.is_empty());
}
#[test]
fn test_interleaved_advance_and_write() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(test_config(page, page, 10), &mut registry);
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(b"hello");
Buf::advance(&mut buf, 2);
buf.put_slice(b"world");
assert_eq!(buf.as_ref(), b"lloworld");
}
#[test]
fn test_alignment_after_advance() {
let page = page_size();
let mut registry = test_registry();
let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
let mut buf = pool.try_alloc(100).unwrap();
buf.put_slice(&[0; 100]);
assert_eq!(buf.as_mut_ptr() as usize % page, 0);
Buf::advance(&mut buf, 7);
assert_ne!(buf.as_mut_ptr() as usize % page, 0);
}
}