use alloc::sync::Arc;
use core::ptr::NonNull;
use allocator_api2::alloc::{AllocError, Allocator};
use super::constants::{LARGE, MAX_CHUNK_BYTES, NUM_CHUNK_CLASSES, class_to_bytes, min_class_for_bytes};
use super::local_chunk::{LocalChunk, header_size as local_header_size};
use super::local_slot::LocalSlot;
use super::shared_chunk::{SharedChunk, header_size as shared_header_size};
use super::sync::{AtomicPtr, AtomicU8, AtomicUsize, Ordering};
#[cfg(feature = "stats")]
use crate::arena_stats::bump_stat;
pub(crate) struct ChunkProvider<A: Allocator + Clone> {
pub(crate) allocator: A,
pub(crate) max_normal_alloc: usize,
pub(crate) byte_budget: Option<usize>,
pub(crate) total_chunk_bytes: AtomicUsize,
pub(crate) local_cache: LocalSlot<Option<NonNull<LocalChunk<A>>>>,
pub(crate) shared_cache_head: AtomicPtr<u8>,
pub(crate) local_high_water: LocalSlot<u8>,
pub(crate) shared_high_water: AtomicU8,
#[cfg(feature = "stats")]
pub(crate) stats: crate::arena_stats::StatsStorage,
}
impl<A: Allocator + Clone> ChunkProvider<A> {
pub(crate) fn new(
allocator: A,
max_normal_alloc: usize,
byte_budget: Option<usize>,
initial_local_class: u8,
initial_shared_class: u8,
) -> Arc<Self> {
Arc::new(Self {
allocator,
max_normal_alloc,
byte_budget,
total_chunk_bytes: AtomicUsize::new(0),
local_cache: LocalSlot::new(None),
shared_cache_head: AtomicPtr::new(core::ptr::null_mut()),
local_high_water: LocalSlot::new(initial_local_class),
shared_high_water: AtomicU8::new(initial_shared_class),
#[cfg(feature = "stats")]
stats: crate::arena_stats::StatsStorage::default(),
})
}
fn reserve_budget(&self, bytes: usize) -> Result<(), AllocError> {
let Some(budget) = self.byte_budget else {
self.total_chunk_bytes.fetch_add(bytes, Ordering::Relaxed);
return Ok(());
};
let prev = self.total_chunk_bytes.fetch_add(bytes, Ordering::AcqRel);
let next = prev.wrapping_add(bytes);
if next < prev || next > budget {
self.total_chunk_bytes.fetch_sub(bytes, Ordering::Release);
return Err(AllocError);
}
Ok(())
}
fn release_budget(&self, bytes: usize) {
self.total_chunk_bytes.fetch_sub(bytes, Ordering::Release);
}
#[cfg_attr(test, mutants::skip)] pub(crate) fn acquire_local(self: &Arc<Self>, min_payload: usize) -> Result<NonNull<LocalChunk<A>>, AllocError> {
if min_payload > self.max_normal_alloc {
let rounded_payload = super::drop_list::round_payload(min_payload, local_header_size::<A>()).ok_or(AllocError)?;
let total_bytes = local_header_size::<A>().checked_add(rounded_payload).ok_or(AllocError)?;
self.reserve_budget(total_bytes)?;
#[cfg(feature = "stats")]
bump_stat!(self, oversized_local_chunks_allocated, 1);
match LocalChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
Ok(c) => return Ok(c),
Err(e) => {
self.release_budget(total_bytes);
return Err(e);
}
}
}
let req_class = min_class_for_bytes(min_payload + local_header_size::<A>());
let high_water = unsafe { self.local_high_water.with_mut(|h| *h) };
let max_class = NUM_CHUNK_CLASSES - 1;
let target_class = req_class.max(high_water).min(max_class);
debug_assert!(target_class < NUM_CHUNK_CLASSES);
let target_total = class_to_bytes(target_class);
let popped = unsafe {
self.local_cache.with_mut(|head| -> Option<NonNull<LocalChunk<A>>> {
use core::cell::Cell;
let prev_link: *const Cell<Option<NonNull<LocalChunk<A>>>> = {
let head_cell: &Cell<Option<NonNull<LocalChunk<A>>>> = Cell::from_mut(head);
core::ptr::from_ref(head_cell)
};
let mut cur = (*prev_link).get();
while let Some(chunk) = cur {
let c = chunk.as_ref();
let cap = c.capacity;
let next = c.next.get();
(*prev_link).set(next);
c.next.set(None);
if cap >= min_payload {
return Some(chunk);
}
self.release_budget(local_header_size::<A>() + cap);
LocalChunk::free_backing(chunk);
cur = next;
}
None
})
};
if let Some(chunk) = popped {
unsafe { chunk.as_ref().revive_for_reuse() };
return Ok(chunk);
}
let total_bytes = target_total;
self.reserve_budget(total_bytes)?;
#[cfg(feature = "stats")]
bump_stat!(self, normal_local_chunks_allocated, 1);
let chunk = match LocalChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
Ok(c) => c,
Err(e) => {
self.release_budget(total_bytes);
return Err(e);
}
};
let next_high_water = target_class.saturating_add(1).min(NUM_CHUNK_CLASSES - 1).min(max_class);
unsafe {
self.local_high_water.with_mut(|h| {
if next_high_water > *h {
*h = next_high_water;
}
});
}
Ok(chunk)
}
pub(crate) fn preallocate_local(self: &Arc<Self>) -> Result<(), AllocError> {
let high_water = unsafe { self.local_high_water.with_mut(|h| *h) };
let target_class = high_water;
let total_bytes = class_to_bytes(target_class);
self.reserve_budget(total_bytes)?;
#[cfg(feature = "stats")]
bump_stat!(self, normal_local_chunks_allocated, 1);
let chunk = match LocalChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
Ok(c) => c,
Err(e) => {
self.release_budget(total_bytes);
return Err(e);
}
};
let chunk_ref = unsafe { chunk.as_ref() };
let prev = chunk_ref.refcount.replace(0);
debug_assert_eq!(prev, LARGE);
unsafe {
self.local_cache.with_mut(|head| {
chunk_ref.next.set(*head);
*head = Some(chunk);
});
}
Ok(())
}
pub(crate) fn preallocate_shared(self: &Arc<Self>) -> Result<(), AllocError> {
let high_water = self.shared_high_water.load(Ordering::Relaxed);
let target_class = high_water;
let total_bytes = class_to_bytes(target_class);
self.reserve_budget(total_bytes)?;
#[cfg(feature = "stats")]
bump_stat!(self, normal_shared_chunks_allocated, 1);
let chunk = match SharedChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
Ok(c) => c,
Err(e) => {
self.release_budget(total_bytes);
return Err(e);
}
};
unsafe { chunk.as_ref() }.refcount.0.store(0, Ordering::Relaxed);
unsafe { self.push_shared_cache(chunk) };
let _ = self.shared_high_water.fetch_max(target_class, Ordering::Relaxed);
Ok(())
}
#[cfg(feature = "stats")]
#[inline]
pub fn stats_snapshot(&self) -> crate::arena_stats::ArenaStats {
self.stats.snapshot()
}
pub(crate) unsafe fn release_local(&self, chunk: NonNull<LocalChunk<A>>) {
unsafe { LocalChunk::replay_drops(chunk) };
let cap = unsafe { (*chunk.as_ptr()).capacity };
let header = local_header_size::<A>();
let oversized = header.saturating_add(cap) > MAX_CHUNK_BYTES;
let eligible = if oversized {
false
} else {
let high_water = unsafe { self.local_high_water.with_mut(|h| *h) };
cap >= class_to_bytes(high_water).saturating_sub(header)
};
if eligible {
unsafe {
self.local_cache.with_mut(|head| {
(*chunk.as_ptr()).next.set(*head);
*head = Some(chunk);
});
}
} else {
self.release_budget(header + cap);
unsafe { LocalChunk::free_backing(chunk) };
}
}
#[cfg_attr(test, mutants::skip)] pub(crate) fn acquire_shared(self: &Arc<Self>, min_payload: usize) -> Result<NonNull<SharedChunk<A>>, AllocError> {
if min_payload > self.max_normal_alloc {
let rounded_payload = super::drop_list::round_payload(min_payload, shared_header_size::<A>()).ok_or(AllocError)?;
let total_bytes = shared_header_size::<A>().checked_add(rounded_payload).ok_or(AllocError)?;
self.reserve_budget(total_bytes)?;
#[cfg(feature = "stats")]
bump_stat!(self, oversized_shared_chunks_allocated, 1);
match SharedChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
Ok(c) => return Ok(c),
Err(e) => {
self.release_budget(total_bytes);
return Err(e);
}
}
}
let req_class = min_class_for_bytes(min_payload + shared_header_size::<A>());
let high_water = self.shared_high_water.load(Ordering::Relaxed);
let max_class = NUM_CHUNK_CLASSES - 1;
let target_class = req_class.max(high_water).min(max_class);
debug_assert!(target_class < NUM_CHUNK_CLASSES);
let target_total = class_to_bytes(target_class);
if let Some(chunk) = self.try_pop_shared_at_least(min_payload) {
unsafe { chunk.as_ref().revive_for_reuse() };
return Ok(chunk);
}
let total_bytes = target_total;
self.reserve_budget(total_bytes)?;
#[cfg(feature = "stats")]
bump_stat!(self, normal_shared_chunks_allocated, 1);
let chunk = match SharedChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
Ok(c) => c,
Err(e) => {
self.release_budget(total_bytes);
return Err(e);
}
};
let next_high_water = target_class.saturating_add(1).min(NUM_CHUNK_CLASSES - 1).min(max_class);
let _ = self.shared_high_water.fetch_max(next_high_water, Ordering::Relaxed);
Ok(chunk)
}
#[cfg_attr(coverage_nightly, coverage(off))]
unsafe fn push_shared_cache(&self, chunk: NonNull<SharedChunk<A>>) {
let chunk_thin = SharedChunk::to_thin_ptr(chunk);
let chunk_ref = unsafe { chunk.as_ref() };
let mut current = self.shared_cache_head.load(Ordering::Relaxed);
loop {
chunk_ref.next.store(current, Ordering::Relaxed);
match self
.shared_cache_head
.compare_exchange_weak(current, chunk_thin, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg_attr(test, mutants::skip)] fn try_pop_shared_at_least(&self, min_bytes: usize) -> Option<NonNull<SharedChunk<A>>> {
loop {
let head_thin = self.shared_cache_head.load(Ordering::Acquire);
if head_thin.is_null() {
return None;
}
let head = unsafe { SharedChunk::<A>::from_thin_ptr(head_thin) };
let next_thin = unsafe { (*head.as_ptr()).next.load(Ordering::Relaxed) };
if self
.shared_cache_head
.compare_exchange_weak(head_thin, next_thin, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
continue;
}
unsafe { (*head.as_ptr()).next.store(core::ptr::null_mut(), Ordering::Relaxed) };
let cap = unsafe { (*head.as_ptr()).capacity };
if cap >= min_bytes {
return Some(head);
}
self.release_budget(shared_header_size::<A>() + cap);
unsafe { SharedChunk::free_backing(head) };
}
}
pub(crate) unsafe fn release_shared(&self, chunk: NonNull<SharedChunk<A>>) {
unsafe { SharedChunk::replay_drops(chunk) };
let cap = unsafe { (*chunk.as_ptr()).capacity };
let header = shared_header_size::<A>();
let oversized = header.saturating_add(cap) > MAX_CHUNK_BYTES;
let eligible = if oversized {
false
} else {
let high_water = self.shared_high_water.load(Ordering::Relaxed);
cap >= class_to_bytes(high_water).saturating_sub(header)
};
if eligible {
unsafe { self.push_shared_cache(chunk) };
} else {
self.release_budget(header + cap);
unsafe { SharedChunk::free_backing(chunk) };
}
}
}
impl<A: Allocator + Clone> Drop for ChunkProvider<A> {
fn drop(&mut self) {
let mut head = unsafe { self.local_cache.with_mut(Option::take) };
while let Some(chunk) = head {
let next = unsafe { (*chunk.as_ptr()).next.replace(None) };
unsafe { LocalChunk::free_backing(chunk) };
head = next;
}
let mut cur_thin = self.shared_cache_head.swap(core::ptr::null_mut(), Ordering::Relaxed);
while !cur_thin.is_null() {
let chunk = unsafe { SharedChunk::<A>::from_thin_ptr(cur_thin) };
let next_thin = unsafe { (*chunk.as_ptr()).next.load(Ordering::Relaxed) };
unsafe { SharedChunk::free_backing(chunk) };
cur_thin = next_thin;
}
}
}
unsafe impl<A: Allocator + Clone + Send + Sync> Send for ChunkProvider<A> {}
unsafe impl<A: Allocator + Clone + Send + Sync> Sync for ChunkProvider<A> {}
#[cfg(test)]
mod tests {
use allocator_api2::alloc::Global;
use super::*;
#[test]
fn reserve_budget_rejects_wraparound() {
let provider = ChunkProvider::<Global>::new(Global, 4096, Some(usize::MAX), 0, 0);
let seed = usize::MAX - 100;
provider.total_chunk_bytes.store(seed, Ordering::Relaxed);
assert!(provider.reserve_budget(200).is_err());
assert_eq!(provider.total_chunk_bytes.load(Ordering::Relaxed), seed);
}
#[test]
fn reserve_budget_zero_bytes_succeeds_within_budget() {
let provider = ChunkProvider::<Global>::new(Global, 4096, Some(1024), 0, 0);
provider.total_chunk_bytes.store(512, Ordering::Relaxed);
provider.reserve_budget(0).unwrap();
assert_eq!(provider.total_chunk_bytes.load(Ordering::Relaxed), 512);
}
}