use std::{
alloc,
array::from_fn,
cell::RefCell,
ops::{Add, AddAssign},
ptr::NonNull,
sync::{
Arc, LazyLock,
atomic::{AtomicU64, Ordering as AtomicOrdering},
},
};
use crossbeam::queue::ArrayQueue;
use super::FBuf;
thread_local! {
static THREAD_SLAB_POOL: RefCell<Option<Arc<FBufSlabs>>> = const { RefCell::new(None) };
}
tokio::task_local! {
pub static TOKIO_FBUF_SLABS: Arc<FBufSlabs>;
}
static FALLBACK_SLAB_POOL: LazyLock<Arc<FBufSlabs>> =
LazyLock::new(|| Arc::new(FBufSlabs::default()));
const _: () = {
assert!(FBuf::ALIGNMENT.is_power_of_two());
assert!(FBufSlabs::HIGHEST_SLAB_CLASS_CAPACITY.is_power_of_two());
assert!(FBufSlabs::HIGHEST_SLAB_CLASS_CAPACITY >= FBuf::ALIGNMENT);
};
#[derive(Debug)]
struct FBufSlab {
buffers: ArrayQueue<CachedAllocation>,
stats: FBufSlabStats,
}
#[derive(Clone, Copy, Debug)]
struct CachedAllocation {
ptr: NonNull<u8>,
}
unsafe impl Send for CachedAllocation {}
#[derive(Debug, Default)]
struct FBufSlabStats {
alloc_requests: AtomicU64,
alloc_hits: AtomicU64,
recycle_requests: AtomicU64,
recycle_hits: AtomicU64,
}
impl FBufSlabStats {
fn snapshot(&self, size: usize, available_buffers: usize) -> FBufSlabStatsSnapshot {
FBufSlabStatsSnapshot {
size,
available_buffers,
alloc_requests: self.alloc_requests.load(AtomicOrdering::Relaxed),
alloc_hits: self.alloc_hits.load(AtomicOrdering::Relaxed),
recycle_requests: self.recycle_requests.load(AtomicOrdering::Relaxed),
recycle_hits: self.recycle_hits.load(AtomicOrdering::Relaxed),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct FBufSlabStatsSnapshot {
pub size: usize,
pub available_buffers: usize,
pub alloc_requests: u64,
pub alloc_hits: u64,
pub recycle_requests: u64,
pub recycle_hits: u64,
}
impl FBufSlabStatsSnapshot {
pub fn malloc_fallbacks(&self) -> u64 {
self.alloc_requests.saturating_sub(self.alloc_hits)
}
pub fn free_fallbacks(&self) -> u64 {
self.recycle_requests.saturating_sub(self.recycle_hits)
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct FBufSlabsStats {
pub cached_buffers: usize,
pub fallback_alloc_requests: u64,
pub fallback_recycle_requests: u64,
pub classes: Vec<FBufSlabStatsSnapshot>,
}
impl FBufSlabsStats {
pub fn alloc_requests(&self) -> u64 {
self.fallback_alloc_requests
+ self
.classes
.iter()
.map(|class| class.alloc_requests)
.sum::<u64>()
}
pub fn recycle_requests(&self) -> u64 {
self.fallback_recycle_requests
+ self
.classes
.iter()
.map(|class| class.recycle_requests)
.sum::<u64>()
}
pub fn mallocs_saved(&self) -> u64 {
self.classes.iter().map(|class| class.alloc_hits).sum()
}
pub fn malloc_fallbacks(&self) -> u64 {
self.fallback_alloc_requests
+ self
.classes
.iter()
.map(FBufSlabStatsSnapshot::malloc_fallbacks)
.sum::<u64>()
}
pub fn frees_saved(&self) -> u64 {
self.classes.iter().map(|class| class.recycle_hits).sum()
}
pub fn free_fallbacks(&self) -> u64 {
self.fallback_recycle_requests
+ self
.classes
.iter()
.map(FBufSlabStatsSnapshot::free_fallbacks)
.sum::<u64>()
}
}
impl Add for FBufSlabsStats {
type Output = Self;
fn add(mut self, rhs: Self) -> Self::Output {
self += rhs;
self
}
}
impl AddAssign for FBufSlabsStats {
fn add_assign(&mut self, rhs: Self) {
self.cached_buffers += rhs.cached_buffers;
self.fallback_alloc_requests += rhs.fallback_alloc_requests;
self.fallback_recycle_requests += rhs.fallback_recycle_requests;
if self.classes.is_empty() {
self.classes = rhs.classes;
return;
}
if rhs.classes.is_empty() {
return;
}
assert_eq!(self.classes.len(), rhs.classes.len());
for (lhs, rhs) in self.classes.iter_mut().zip(rhs.classes) {
assert_eq!(lhs.size, rhs.size);
lhs.available_buffers += rhs.available_buffers;
lhs.alloc_requests += rhs.alloc_requests;
lhs.alloc_hits += rhs.alloc_hits;
lhs.recycle_requests += rhs.recycle_requests;
lhs.recycle_hits += rhs.recycle_hits;
}
}
}
#[derive(Debug)]
pub struct FBufSlabs {
slabs: [FBufSlab; Self::NUM_CLASSES],
fallback_alloc_requests: AtomicU64,
fallback_recycle_requests: AtomicU64,
}
impl FBufSlabs {
pub const HIGHEST_SLAB_CLASS_CAPACITY: usize = 64 * 1024;
pub const DEFAULT_BYTES_PER_CLASS: usize = 16 * 1024 * 1024;
const MIN_SHIFT: u32 = FBuf::ALIGNMENT.trailing_zeros();
const MAX_SHIFT: u32 = Self::HIGHEST_SLAB_CLASS_CAPACITY.trailing_zeros();
const NUM_CLASSES: usize = (Self::MAX_SHIFT - Self::MIN_SHIFT + 1) as usize;
fn slab_size(index: usize) -> usize {
1usize << (Self::MIN_SHIFT as usize + index)
}
pub fn backend_id(&self) -> usize {
self as *const Self as usize
}
pub fn new(bytes_per_class: usize) -> Self {
Self {
slabs: from_fn(|index| {
let size = Self::slab_size(index);
let max_buffers = bytes_per_class.max(size) / size;
FBufSlab {
buffers: ArrayQueue::new(max_buffers),
stats: FBufSlabStats::default(),
}
}),
fallback_alloc_requests: AtomicU64::new(0),
fallback_recycle_requests: AtomicU64::new(0),
}
}
fn try_take(&self, requested_capacity: usize) -> Option<(NonNull<u8>, usize)> {
let Some(index) = Self::request_index(requested_capacity) else {
self.fallback_alloc_requests
.fetch_add(1, AtomicOrdering::Relaxed);
return None;
};
let slab = &self.slabs[index];
let size = Self::slab_size(index);
slab.stats
.alloc_requests
.fetch_add(1, AtomicOrdering::Relaxed);
let ptr = slab.buffers.pop();
if ptr.is_some() {
slab.stats.alloc_hits.fetch_add(1, AtomicOrdering::Relaxed);
}
ptr.map(|ptr| (ptr.ptr, size))
}
fn try_put(&self, ptr: NonNull<u8>, capacity: usize) -> bool {
let slab = match Self::capacity_index(capacity) {
Some(index) => &self.slabs[index],
None => {
self.fallback_recycle_requests
.fetch_add(1, AtomicOrdering::Relaxed);
return false;
}
};
slab.stats
.recycle_requests
.fetch_add(1, AtomicOrdering::Relaxed);
if slab.buffers.push(CachedAllocation { ptr }).is_err() {
return false;
}
slab.stats
.recycle_hits
.fetch_add(1, AtomicOrdering::Relaxed);
true
}
pub fn stats(&self) -> FBufSlabsStats {
let classes = self
.slabs
.iter()
.enumerate()
.map(|(index, slab)| {
let size = Self::slab_size(index);
let available_buffers = slab.buffers.len();
slab.stats.snapshot(size, available_buffers)
})
.collect::<Vec<_>>();
FBufSlabsStats {
cached_buffers: classes.iter().map(|class| class.available_buffers).sum(),
fallback_alloc_requests: self.fallback_alloc_requests.load(AtomicOrdering::Relaxed),
fallback_recycle_requests: self.fallback_recycle_requests.load(AtomicOrdering::Relaxed),
classes,
}
}
fn request_index(requested_capacity: usize) -> Option<usize> {
if requested_capacity == 0 || requested_capacity > Self::HIGHEST_SLAB_CLASS_CAPACITY {
return None;
}
let rounded = requested_capacity.max(FBuf::ALIGNMENT).next_power_of_two();
Some((rounded.trailing_zeros() - Self::MIN_SHIFT) as usize)
}
fn capacity_index(capacity: usize) -> Option<usize> {
if !(FBuf::ALIGNMENT..=Self::HIGHEST_SLAB_CLASS_CAPACITY).contains(&capacity)
|| !capacity.is_power_of_two()
{
return None;
}
Some((capacity.trailing_zeros() - Self::MIN_SHIFT) as usize)
}
#[cfg(test)]
fn cached_buffers(&self, capacity: usize) -> usize {
Self::capacity_index(capacity)
.map(|index| self.slabs[index].buffers.len())
.unwrap_or_default()
}
}
impl Default for FBufSlabs {
fn default() -> Self {
Self::new(Self::DEFAULT_BYTES_PER_CLASS)
}
}
impl Drop for FBufSlabs {
fn drop(&mut self) {
for (index, slab) in self.slabs.iter_mut().enumerate() {
let size = Self::slab_size(index);
while let Some(ptr) = slab.buffers.pop() {
unsafe {
alloc::dealloc(ptr.ptr.as_ptr(), layout_for(size));
}
}
}
}
}
pub fn set_thread_slab_pool(pool: Option<Arc<FBufSlabs>>) -> Option<Arc<FBufSlabs>> {
THREAD_SLAB_POOL.with(|current| current.replace(pool))
}
fn with_active_slab_pool<R>(f: impl FnOnce(&FBufSlabs) -> R) -> R {
let mut f = Some(f);
if let Ok(result) = TOKIO_FBUF_SLABS.try_with(|pool| f.take().unwrap()(pool)) {
return result;
}
if let Ok(Some(result)) = THREAD_SLAB_POOL.try_with(|pool| {
let pool = pool.borrow();
pool.as_ref().map(|pool| f.take().unwrap()(pool))
}) {
return result;
}
f.take().unwrap()(FALLBACK_SLAB_POOL.as_ref())
}
pub(super) fn acquire_allocation(capacity: usize) -> (NonNull<u8>, usize) {
with_active_slab_pool(|pool| pool.try_take(capacity)).unwrap_or_else(|| {
let ptr = allocate_exact(capacity);
(ptr, capacity)
})
}
pub(super) fn recycle_or_dealloc(ptr: NonNull<u8>, capacity: usize) {
if !with_active_slab_pool(|pool| pool.try_put(ptr, capacity)) {
unsafe {
alloc::dealloc(ptr.as_ptr(), layout_for(capacity));
}
}
}
fn allocate_exact(capacity: usize) -> NonNull<u8> {
let layout = layout_for(capacity);
let ptr = unsafe { alloc::alloc(layout) };
if ptr.is_null() {
alloc::handle_alloc_error(layout);
}
unsafe { NonNull::new_unchecked(ptr) }
}
pub(super) fn layout_for(capacity: usize) -> alloc::Layout {
unsafe { alloc::Layout::from_size_align_unchecked(capacity, FBuf::ALIGNMENT) }
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::fbuf::FBuf;
use super::{FBufSlabs, TOKIO_FBUF_SLABS, set_thread_slab_pool};
#[test]
fn slab_pool_reuses_buffers_across_matching_size_classes() {
let pool = Arc::new(FBufSlabs::default());
let previous = set_thread_slab_pool(Some(pool));
let first = FBuf::with_capacity(4096);
let first_ptr = first.as_ptr();
drop(first);
let second = FBuf::with_capacity(3000);
assert_eq!(second.capacity(), 4096);
assert_eq!(second.as_ptr(), first_ptr);
set_thread_slab_pool(previous);
}
#[test]
fn slab_pool_has_fixed_capacity_per_size_class() {
let pool = Arc::new(FBufSlabs::new(4096));
let previous = set_thread_slab_pool(Some(pool.clone()));
let first = FBuf::with_capacity(4096);
let second = FBuf::with_capacity(4096);
drop(first);
drop(second);
assert_eq!(pool.cached_buffers(4096), 1);
set_thread_slab_pool(previous);
}
#[test]
fn slab_pool_stats_track_reuse_and_fallbacks() {
let pool = Arc::new(FBufSlabs::new(4096));
let previous = set_thread_slab_pool(Some(pool.clone()));
let first = FBuf::with_capacity(4096);
drop(first);
let second = FBuf::with_capacity(3000);
drop(second);
let big = FBuf::with_capacity(FBufSlabs::HIGHEST_SLAB_CLASS_CAPACITY + FBuf::ALIGNMENT);
drop(big);
let stats = pool.stats();
let class = stats
.classes
.iter()
.find(|class| class.size == 4096)
.unwrap();
assert_eq!(stats.alloc_requests(), 3);
assert_eq!(stats.mallocs_saved(), 1);
assert_eq!(stats.malloc_fallbacks(), 2);
assert_eq!(stats.recycle_requests(), 3);
assert_eq!(stats.frees_saved(), 2);
assert_eq!(stats.free_fallbacks(), 1);
assert_eq!(stats.cached_buffers, 1);
assert_eq!(stats.fallback_alloc_requests, 1);
assert_eq!(stats.fallback_recycle_requests, 1);
assert_eq!(class.alloc_requests, 2);
assert_eq!(class.alloc_hits, 1);
assert_eq!(class.malloc_fallbacks(), 1);
assert_eq!(class.recycle_requests, 2);
assert_eq!(class.recycle_hits, 2);
assert_eq!(class.free_fallbacks(), 0);
assert_eq!(class.available_buffers, 1);
set_thread_slab_pool(previous);
}
#[tokio::test(flavor = "current_thread")]
async fn task_local_slab_takes_precedence() {
let thread_pool = Arc::new(FBufSlabs::default());
let task_pool = Arc::new(FBufSlabs::default());
let previous = set_thread_slab_pool(Some(thread_pool.clone()));
TOKIO_FBUF_SLABS
.scope(task_pool.clone(), async {
let first = FBuf::with_capacity(4096);
let first_ptr = first.as_ptr();
drop(first);
tokio::task::yield_now().await;
let second = FBuf::with_capacity(3000);
assert_eq!(second.capacity(), 4096);
assert_eq!(second.as_ptr(), first_ptr);
})
.await;
assert_eq!(task_pool.cached_buffers(4096), 1);
assert_eq!(thread_pool.cached_buffers(4096), 0);
let stats = task_pool.stats();
let class = stats
.classes
.iter()
.find(|class| class.size == 4096)
.unwrap();
assert_eq!(class.alloc_requests, 2);
assert_eq!(class.alloc_hits, 1);
assert_eq!(class.recycle_requests, 2);
assert_eq!(class.recycle_hits, 2);
assert_eq!(thread_pool.stats().alloc_requests(), 0);
assert_eq!(thread_pool.stats().recycle_requests(), 0);
set_thread_slab_pool(previous);
}
}