use std::cell::UnsafeCell;
use std::mem::{self, MaybeUninit, offset_of};
use std::num::NonZero;
use std::ptr::NonNull;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Mutex};
use std::{iter, ptr};
use infinity_pool::{RawPinnedPool, RawPooled, RawPooledMut};
use nm::{Event, Magnitude};
use crate::BytesBuf;
use crate::constants::ERR_POISONED_LOCK;
use crate::mem::{Block, BlockRef, BlockRefDynamic, BlockRefVTable, BlockSize, Memory};
#[doc = include_str!("../../doc/snippets/choosing_memory_provider.md")]
#[derive(Clone, Debug)]
pub struct GlobalPool {
inner: thread_aware::Arc<GlobalPoolInner, thread_aware::PerCore>,
}
impl thread_aware::ThreadAware for GlobalPool {
fn relocated(self, source: thread_aware::affinity::MemoryAffinity, destination: thread_aware::affinity::PinnedAffinity) -> Self {
Self {
inner: self.inner.relocated(source, destination),
}
}
}
impl GlobalPool {
#[must_use]
#[expect(
clippy::new_without_default,
reason = "to avoid accidental confusion with some 'default' global memory pool, which does not exist"
)]
pub fn new() -> Self {
Self {
inner: thread_aware::Arc::<_, thread_aware::PerCore>::new(GlobalPoolInner::new),
}
}
#[must_use]
#[inline]
pub fn reserve(&self, min_bytes: usize) -> crate::BytesBuf {
self.inner.reserve(min_bytes)
}
}
impl Memory for GlobalPool {
#[cfg_attr(test, mutants::skip)] #[inline]
fn reserve(&self, min_bytes: usize) -> crate::BytesBuf {
self.reserve(min_bytes)
}
}
type SubPool<const SIZE: usize> = Arc<Mutex<RawPinnedPool<MaybeUninit<NeutralBlock<SIZE>>>>>;
#[derive(Debug)]
#[expect(
clippy::struct_field_names,
reason = "pool_ prefix provides clarity for the size-differentiated sub-pools"
)]
struct GlobalPoolInner {
pool_1k: SubPool<1024>,
pool_4k: SubPool<4096>,
pool_16k: SubPool<16_384>,
pool_64k: SubPool<65_536>,
}
impl GlobalPoolInner {
fn new() -> Self {
INSTANCES_CREATED.with(Event::observe_once);
Self {
pool_1k: Arc::new(Mutex::new(RawPinnedPool::new())),
pool_4k: Arc::new(Mutex::new(RawPinnedPool::new())),
pool_16k: Arc::new(Mutex::new(RawPinnedPool::new())),
pool_64k: Arc::new(Mutex::new(RawPinnedPool::new())),
}
}
fn reserve(&self, min_bytes: usize) -> crate::BytesBuf {
RESERVATION_REQUESTED_SIZE.with(|e| e.observe(min_bytes));
if min_bytes == 0 {
return BytesBuf::new();
}
if min_bytes <= 1024 {
allocate_uniform::<1024>(&self.pool_1k, &BLOCK_REF_FNS_1K, min_bytes)
} else if min_bytes <= 4096 {
allocate_uniform::<4096>(&self.pool_4k, &BLOCK_REF_FNS_4K, min_bytes)
} else if min_bytes <= 16_384 {
allocate_uniform::<16_384>(&self.pool_16k, &BLOCK_REF_FNS_16K, min_bytes)
} else {
allocate_uniform::<65_536>(&self.pool_64k, &BLOCK_REF_FNS_64K, min_bytes)
}
}
}
fn allocate_uniform<const SIZE: usize>(
pool_arc: &SubPool<SIZE>,
vtable: &'static BlockRefVTable<BlockMeta<SIZE>>,
min_bytes: usize,
) -> crate::BytesBuf {
let block_count = min_bytes.div_ceil(SIZE);
BLOCK_RENTED_SIZE.with(|e| e.batch(block_count).observe(SIZE));
let mut pool = pool_arc.lock().expect(ERR_POISONED_LOCK);
pool.reserve(block_count);
let blocks = iter::repeat_with(|| allocate_block(&mut *pool, pool_arc, vtable)).take(block_count);
BytesBuf::from_blocks(blocks)
}
fn allocate_block<const SIZE: usize>(
pool: &mut RawPinnedPool<MaybeUninit<NeutralBlock<SIZE>>>,
pool_arc: &SubPool<SIZE>,
vtable: &'static BlockRefVTable<BlockMeta<SIZE>>,
) -> Block {
let initialize_block = |place: &mut MaybeUninit<NeutralBlock<SIZE>>, handle: RawPooledMut<NeutralBlock<SIZE>>| {
let handle = handle.into_shared();
let meta = BlockMeta {
block_pool: Arc::clone(pool_arc),
handle,
ref_count: AtomicUsize::new(1),
};
in_place_initialize_block(place, meta);
handle
};
let handle = unsafe { insert_with_handle_to_self(pool, initialize_block) };
let block = unsafe { handle.as_ref() };
let meta_ptr = NonNull::from(&block.meta);
let capacity_ptr = unsafe { NonNull::new_unchecked(block.memory.get()) };
let block_ref = unsafe { BlockRef::new(meta_ptr, vtable) };
#[expect(
clippy::cast_possible_truncation,
reason = "block sizes are always <= u32::MAX, as a core invariant of this crate"
)]
let block_size: NonZero<BlockSize> = const { NonZero::new(SIZE as u32).expect("block size is always a known non-zero constant") };
unsafe { Block::new(capacity_ptr.cast(), block_size, block_ref) }
}
#[derive(Debug)]
struct NeutralBlock<const SIZE: usize> {
meta: BlockMeta<SIZE>,
memory: UnsafeCell<[MaybeUninit<u8>; SIZE]>,
}
unsafe impl<const SIZE: usize> Sync for NeutralBlock<SIZE> {}
#[derive(Debug)]
struct BlockMeta<const SIZE: usize> {
block_pool: SubPool<SIZE>,
handle: RawPooled<NeutralBlock<SIZE>>,
ref_count: AtomicUsize,
}
#[cfg_attr(test, mutants::skip)] fn in_place_initialize_block<const SIZE: usize>(block: &mut MaybeUninit<NeutralBlock<SIZE>>, meta: BlockMeta<SIZE>) {
let block_ptr = block.as_mut_ptr();
let meta_ptr = unsafe { block_ptr.byte_add(offset_of!(NeutralBlock<SIZE>, meta)) }.cast::<BlockMeta<SIZE>>();
unsafe {
meta_ptr.write(meta);
}
}
unsafe fn insert_with_handle_to_self<F, T, R>(pool: &mut RawPinnedPool<MaybeUninit<T>>, initialize: F) -> R
where
F: FnOnce(&mut MaybeUninit<T>, RawPooledMut<T>) -> R,
{
let handle = unsafe { pool.insert_with(|_| {}) };
let object_uninit = unsafe { handle.ptr().as_mut() };
let handle = unsafe { mem::transmute::<RawPooledMut<MaybeUninit<T>>, RawPooledMut<T>>(handle) };
initialize(object_uninit, handle)
}
const BLOCK_REF_FNS_1K: BlockRefVTable<BlockMeta<1024>> = BlockRefVTable::from_trait();
const BLOCK_REF_FNS_4K: BlockRefVTable<BlockMeta<4096>> = BlockRefVTable::from_trait();
const BLOCK_REF_FNS_16K: BlockRefVTable<BlockMeta<16_384>> = BlockRefVTable::from_trait();
const BLOCK_REF_FNS_64K: BlockRefVTable<BlockMeta<65_536>> = BlockRefVTable::from_trait();
unsafe impl<const SIZE: usize> BlockRefDynamic for BlockMeta<SIZE> {
type State = Self;
#[cfg_attr(test, mutants::skip)] fn clone(state_ptr: NonNull<Self::State>) -> NonNull<Self::State> {
let state = unsafe { state_ptr.as_ref() };
state.ref_count.fetch_add(1, atomic::Ordering::Relaxed);
state_ptr
}
#[cfg_attr(test, mutants::skip)] fn drop(state_ptr: NonNull<Self::State>) {
let state = unsafe { state_ptr.as_ref() };
if state.ref_count.fetch_sub(1, atomic::Ordering::Release) != 1 {
return;
}
atomic::fence(atomic::Ordering::Acquire);
let handle = state.handle;
let pool = unsafe { ptr::read(&raw const state.block_pool) };
let mut pool = pool.lock().expect(ERR_POISONED_LOCK);
unsafe {
pool.remove(handle);
}
}
}
const BLOCK_SIZE_BUCKETS: &[Magnitude] = &[1024, 4096, 16_384, 65_536];
const RESERVATION_SIZE_BUCKETS: &[Magnitude] = &[
0, 256, 512, 1024, 2048, 4096, 8192, 16_384, 32_768, 65_536, 131_072, 262_144, 524_288, 1_048_576,
];
thread_local! {
static BLOCK_RENTED_SIZE: Event = Event::builder()
.name("bytesbuf_global_pool_block_rented_size")
.histogram(BLOCK_SIZE_BUCKETS)
.build();
static RESERVATION_REQUESTED_SIZE: Event = Event::builder()
.name("bytesbuf_global_pool_reservation_requested_size")
.histogram(RESERVATION_SIZE_BUCKETS)
.build();
static INSTANCES_CREATED: Event = Event::builder()
.name("bytesbuf_global_pool_instances_total")
.build();
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests {
#![allow(clippy::indexing_slicing, reason = "panic is fine in tests")]
use std::thread;
use static_assertions::assert_impl_all;
use super::*;
use crate::mem::MemoryShared;
assert_impl_all!(GlobalPool: MemoryShared);
assert_impl_all!(GlobalPool: thread_aware::ThreadAware);
fn assert_all_pools_empty(inner: &GlobalPoolInner) {
assert!(inner.pool_1k.lock().unwrap().is_empty());
assert!(inner.pool_4k.lock().unwrap().is_empty());
assert!(inner.pool_16k.lock().unwrap().is_empty());
assert!(inner.pool_64k.lock().unwrap().is_empty());
}
#[test]
fn smoke_test() {
let memory = GlobalPool::new();
_ = memory.reserve(0);
let builder = memory.reserve(1);
assert!(builder.capacity() >= 1);
let builder = memory.reserve(100);
assert!(builder.capacity() >= 100);
let builder = memory.reserve(1000);
assert!(builder.capacity() >= 1000);
let builder = memory.reserve(10000);
assert!(builder.capacity() >= 10000);
let builder = memory.reserve(100_000);
assert!(builder.capacity() >= 100_000);
let builder = memory.reserve(1_000_000);
assert!(builder.capacity() >= 1_000_000);
}
#[test]
fn piece_by_piece() {
const BLOCK_SIZE: usize = 65_536;
const LEN_BYTES: BlockSize = 1000;
let memory = GlobalPool::new();
let mut buf = memory.reserve(BLOCK_SIZE);
let mut views = Vec::new();
while buf.remaining_capacity() > 0 {
#[expect(clippy::cast_possible_truncation, reason = "intentionally truncating")]
let value = views.len() as u8;
buf.put_byte_repeated(value, (LEN_BYTES as usize).min(buf.remaining_capacity()));
debug_assert!(!buf.is_empty());
views.push(buf.consume_all());
}
#[expect(clippy::cast_possible_truncation, reason = "block size is small")]
let expected_count = (BLOCK_SIZE as BlockSize).div_ceil(LEN_BYTES);
assert_eq!(views.len(), expected_count as usize);
assert!(!views.is_empty());
for (i, sequence) in views.iter().enumerate() {
#[expect(clippy::cast_possible_truncation, reason = "intentionally truncating")]
let expected_value = i as u8;
assert_eq!(sequence.first_slice()[0], expected_value);
}
}
#[test]
fn release_on_other_thread() {
let memory = GlobalPool::new();
let mut sub = memory.reserve(65_536);
sub.put_byte_repeated(42, 65_536);
let data = sub.consume_all();
thread::spawn({
move || {
drop(data);
assert_all_pools_empty(&memory.inner);
}
})
.join()
.unwrap();
}
#[test]
#[cfg_attr(miri, ignore)] fn large_content_survives_trip() {
const SIZE_10MB: usize = 10 * 1024 * 1024;
let pattern = testing_aids::repeating_incrementing_bytes().take(SIZE_10MB).collect::<Vec<_>>();
let memory = GlobalPool::new();
let mut buf = memory.reserve(SIZE_10MB);
buf.put_slice(pattern.as_slice());
let message = buf.consume_all();
assert_eq!(message.len(), SIZE_10MB);
assert_eq!(message, pattern.as_slice());
}
#[test]
#[cfg_attr(miri, ignore)] fn two_large_views_different_patterns() {
const SIZE_10MB: usize = 10 * 1024 * 1024;
let pattern1 = testing_aids::repeating_incrementing_bytes().take(SIZE_10MB).collect::<Vec<_>>();
let pattern2 = testing_aids::repeating_reverse_incrementing_bytes()
.take(SIZE_10MB)
.collect::<Vec<_>>();
let memory = GlobalPool::new();
let mut sb1 = memory.reserve(SIZE_10MB);
sb1.put_slice(pattern1.as_slice());
let mut sb2 = memory.reserve(SIZE_10MB);
sb2.put_slice(pattern2.as_slice());
let view1 = sb1.consume_all();
let view2 = sb2.consume_all();
assert_eq!(view1.len(), SIZE_10MB);
assert_eq!(view2.len(), SIZE_10MB);
assert_eq!(view1, pattern1.as_slice());
assert_eq!(view2, pattern2.as_slice());
}
#[test]
fn small_reservation_uses_1k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(500);
assert_eq!(buf.capacity(), 1024);
}
#[test]
fn exact_1k_uses_1k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(1024);
assert_eq!(buf.capacity(), 1024);
}
#[test]
fn just_over_1k_uses_4k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(1025);
assert_eq!(buf.capacity(), 4096);
}
#[test]
fn exact_4k_uses_4k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(4096);
assert_eq!(buf.capacity(), 4096);
}
#[test]
fn just_over_4k_uses_16k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(4097);
assert_eq!(buf.capacity(), 16_384);
}
#[test]
fn exact_16k_uses_16k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(16_384);
assert_eq!(buf.capacity(), 16_384);
}
#[test]
fn just_over_16k_uses_64k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(16_385);
assert_eq!(buf.capacity(), 65_536);
}
#[test]
fn exact_64k_uses_64k_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(65_536);
assert_eq!(buf.capacity(), 65_536);
}
#[test]
fn multi_block_uniform_64k() {
let memory = GlobalPool::new();
let buf = memory.reserve(70_000);
assert_eq!(buf.capacity(), 65_536 * 2);
}
#[test]
fn multi_block_many_64k() {
let memory = GlobalPool::new();
let buf = memory.reserve(200_000);
assert_eq!(buf.capacity(), 65_536 * 4);
}
#[test]
fn zero_reservation() {
let memory = GlobalPool::new();
let buf = memory.reserve(0);
assert_eq!(buf.capacity(), 0);
}
#[test]
fn pool_isolation_small_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(500);
drop(buf);
assert!(memory.inner.pool_1k.lock().unwrap().is_empty());
assert!(memory.inner.pool_4k.lock().unwrap().is_empty());
assert!(memory.inner.pool_16k.lock().unwrap().is_empty());
assert!(memory.inner.pool_64k.lock().unwrap().is_empty());
}
#[test]
fn pool_isolation_multi_block() {
let memory = GlobalPool::new();
let buf = memory.reserve(70_000);
drop(buf);
assert_all_pools_empty(&memory.inner);
}
#[test]
fn multi_block_just_over_64k() {
let memory = GlobalPool::new();
let buf = memory.reserve(66_036);
assert_eq!(buf.capacity(), 65_536 * 2);
}
#[test]
fn relocated_pool_works() {
use thread_aware::ThreadAware;
use thread_aware::affinity::pinned_affinities;
let affinities = pinned_affinities(&[2]);
let source = affinities[0].into();
let destination = affinities[1];
let memory = GlobalPool::new();
let mut buf = memory.reserve(100);
buf.put_byte(42);
let view = buf.consume_all();
assert_eq!(view.first_slice()[0], 42);
let relocated_memory = memory.relocated(source, destination);
let mut buf2 = relocated_memory.reserve(200);
buf2.put_byte(99);
let view2 = buf2.consume_all();
assert_eq!(view2.first_slice()[0], 99);
}
}