#![allow(unsafe_op_in_unsafe_fn, reason = "see module doc: inner unsafe blocks in unsafe fn add noise here")]
#![allow(clippy::unnecessary_safety_comment, reason = "safety rationale documented at function level")]
use alloc::sync::{Arc, Weak};
use core::mem;
use core::ptr::{self, NonNull};
#[cfg(feature = "stats")]
use core::sync::atomic::AtomicU64;
use core::sync::atomic::{AtomicPtr, AtomicU8, AtomicUsize, Ordering};
use allocator_api2::alloc::{AllocError, Allocator};
use super::chunk::Chunk;
use super::chunk_alloc::chunk_alloc_size;
use super::constants::{MAX_CHUNK_BYTES, MAX_NORMAL_ALLOC, MIN_CHUNK_BYTES, SizeClass};
use super::drop_entry::DropEntry;
use super::local_chunk::LocalChunk;
use super::owner_thread_cell::OwnerThreadCell;
use super::shared_chunk::SharedChunk;
#[derive(Clone, Copy)]
pub(crate) struct ChunkProviderConfig {
byte_budget: usize,
max_normal_alloc: usize,
}
impl ChunkProviderConfig {
#[inline]
pub(crate) fn new(byte_budget: usize, max_normal_alloc: usize) -> Self {
Self {
byte_budget,
max_normal_alloc,
}
}
#[inline]
pub(crate) fn max_normal_alloc(&self) -> usize {
self.max_normal_alloc
}
}
impl Default for ChunkProviderConfig {
fn default() -> Self {
Self::new(usize::MAX, MAX_NORMAL_ALLOC)
}
}
#[cfg(feature = "stats")]
#[derive(Clone, Copy)]
pub(crate) struct ChunkAllocStats {
normal_local: u64,
oversized_local: u64,
normal_shared: u64,
oversized_shared: u64,
}
#[cfg(feature = "stats")]
impl ChunkAllocStats {
#[inline]
pub(crate) fn normal_local(&self) -> u64 {
self.normal_local
}
#[inline]
pub(crate) fn oversized_local(&self) -> u64 {
self.oversized_local
}
#[inline]
pub(crate) fn normal_shared(&self) -> u64 {
self.normal_shared
}
#[inline]
pub(crate) fn oversized_shared(&self) -> u64 {
self.oversized_shared
}
}
pub(crate) struct ChunkProvider<A: Allocator + Clone> {
allocator: A,
config: ChunkProviderConfig,
weak_self: Weak<Self>,
bytes_outstanding: AtomicUsize,
local_cache: OwnerThreadCell<*mut u8>,
local_cache_class: AtomicU8,
shared_cache: AtomicPtr<u8>,
shared_cache_class: AtomicU8,
#[cfg(feature = "stats")]
normal_local_chunks_allocated: AtomicU64,
#[cfg(feature = "stats")]
oversized_local_chunks_allocated: AtomicU64,
#[cfg(feature = "stats")]
normal_shared_chunks_allocated: AtomicU64,
#[cfg(feature = "stats")]
oversized_shared_chunks_allocated: AtomicU64,
#[cfg(feature = "stats")]
wasted_tail_bytes: AtomicU64,
}
#[allow(clippy::non_send_fields_in_send_ty, reason = "OwnerThreadCell enforces single-thread access")]
unsafe impl<A: Allocator + Clone + Send> Send for ChunkProvider<A> {}
unsafe impl<A: Allocator + Clone + Sync> Sync for ChunkProvider<A> {}
impl<A: Allocator + Clone> ChunkProvider<A> {
pub(crate) fn new(allocator: A, config: ChunkProviderConfig) -> Arc<Self> {
Arc::new_cyclic(|weak| Self {
allocator,
config,
weak_self: Weak::clone(weak),
bytes_outstanding: AtomicUsize::new(0),
local_cache: OwnerThreadCell::new(ptr::null_mut()),
local_cache_class: AtomicU8::new(0),
shared_cache: AtomicPtr::new(ptr::null_mut()),
shared_cache_class: AtomicU8::new(0),
#[cfg(feature = "stats")]
normal_local_chunks_allocated: AtomicU64::new(0),
#[cfg(feature = "stats")]
oversized_local_chunks_allocated: AtomicU64::new(0),
#[cfg(feature = "stats")]
normal_shared_chunks_allocated: AtomicU64::new(0),
#[cfg(feature = "stats")]
oversized_shared_chunks_allocated: AtomicU64::new(0),
#[cfg(feature = "stats")]
wasted_tail_bytes: AtomicU64::new(0),
})
}
#[cfg(feature = "stats")]
pub(crate) fn chunk_alloc_stats(&self) -> ChunkAllocStats {
ChunkAllocStats {
normal_local: self.normal_local_chunks_allocated.load(Ordering::Relaxed),
oversized_local: self.oversized_local_chunks_allocated.load(Ordering::Relaxed),
normal_shared: self.normal_shared_chunks_allocated.load(Ordering::Relaxed),
oversized_shared: self.oversized_shared_chunks_allocated.load(Ordering::Relaxed),
}
}
#[cfg(feature = "stats")]
pub(crate) fn bytes_outstanding(&self) -> u64 {
self.bytes_outstanding.load(Ordering::Relaxed) as u64
}
#[cfg(feature = "stats")]
pub(crate) fn wasted_tail_bytes(&self) -> u64 {
self.wasted_tail_bytes.load(Ordering::Relaxed)
}
#[cfg(feature = "stats")]
pub(crate) fn record_wasted_tail(&self, n: u64) {
self.wasted_tail_bytes.fetch_add(n, Ordering::Relaxed);
}
#[cfg(feature = "stats")]
pub(crate) fn release_wasted_tail(&self, n: u64) {
self.wasted_tail_bytes.fetch_sub(n, Ordering::Relaxed);
}
#[inline]
#[cfg_attr(test, mutants::skip)] pub(crate) fn config(&self) -> ChunkProviderConfig {
self.config
}
pub(crate) fn allocator(&self) -> &A {
&self.allocator
}
pub(crate) fn acquire_local(&self, min_payload: usize, ratchet_class: SizeClass) -> Result<NonNull<LocalChunk<A>>, AllocError> {
let header = LocalChunk::<A>::header_size();
let needed_total = header.checked_add(min_payload).ok_or(AllocError)?;
debug_assert!(
min_payload <= self.config.max_normal_alloc && !exceeds_max_chunk_bytes(needed_total),
"acquire_local invoked with oversized request — caller must route to acquire_oversized_local",
);
self.acquire_normal_local(SizeClass::min_for_bytes(needed_total).max(ratchet_class))
}
#[cfg_attr(test, mutants::skip)]
fn acquire_normal_local(&self, class: SizeClass) -> Result<NonNull<LocalChunk<A>>, AllocError> {
let popped = unsafe {
if class.raw() > self.local_cache_class.load(Ordering::Relaxed) {
self.advance_local_cache_floor(class);
}
self.local_cache.with(|head| {
let cur = *head;
if cur.is_null() {
None
} else {
let fat = LocalChunk::<A>::header_to_fat(cur);
let head_nn = NonNull::new_unchecked(fat);
*head = LocalChunk::next(head_nn);
LocalChunk::reinit_for_acquire(head_nn);
Some(head_nn)
}
})
};
if let Some(chunk) = popped {
return Ok(chunk);
}
self.allocate_fresh_local(class)
}
#[cold]
#[inline(never)]
unsafe fn advance_local_cache_floor(&self, new_class: SizeClass) {
self.local_cache_class.store(new_class.raw(), Ordering::Relaxed);
let new_min_total = new_class.bytes();
unsafe {
self.local_cache.with(|head| {
let mut cur = *head;
let mut new_head: *mut u8 = ptr::null_mut();
while !cur.is_null() {
let fat = LocalChunk::<A>::header_to_fat(cur);
let chunk_nn = NonNull::new_unchecked(fat);
let next = LocalChunk::next(chunk_nn);
let total = LocalChunk::<A>::header_size() + (*chunk_nn.as_ptr()).capacity();
if total >= new_min_total {
LocalChunk::set_next(chunk_nn, new_head);
new_head = cur;
} else {
LocalChunk::destroy(chunk_nn, &self.allocator);
self.release_bytes(total);
}
cur = next;
}
*head = new_head;
});
}
}
fn allocate_fresh_local(&self, class: SizeClass) -> Result<NonNull<LocalChunk<A>>, AllocError> {
let header = LocalChunk::<A>::header_size();
let total = class.bytes();
let payload_size = total - header;
self.reserve_bytes(total)?;
match LocalChunk::<A>::allocate(&self.allocator, ptr::from_ref(self), payload_size) {
Ok(chunk) => {
#[cfg(feature = "stats")]
self.normal_local_chunks_allocated.fetch_add(1, Ordering::Relaxed);
Ok(chunk)
}
Err(e) => {
self.release_bytes(total);
Err(e)
}
}
}
pub(crate) fn acquire_shared(&self, min_payload: usize, ratchet_class: SizeClass) -> Result<NonNull<SharedChunk<A>>, AllocError> {
let header = SharedChunk::<A>::header_size();
let needed_total = header.checked_add(min_payload).ok_or(AllocError)?;
debug_assert!(
min_payload <= self.config.max_normal_alloc && !exceeds_max_chunk_bytes(needed_total),
"acquire_shared invoked with oversized request — caller must route to acquire_oversized_shared",
);
self.acquire_normal_shared(SizeClass::min_for_bytes(needed_total).max(ratchet_class))
}
#[cfg_attr(test, mutants::skip)]
fn acquire_normal_shared(&self, class: SizeClass) -> Result<NonNull<SharedChunk<A>>, AllocError> {
unsafe {
if class.raw() > self.shared_cache_class.load(Ordering::Relaxed) {
self.advance_shared_cache_floor(class);
}
if let Some(chunk) = self.pop_shared() {
SharedChunk::reinit_for_acquire(chunk);
return Ok(chunk);
}
}
self.allocate_fresh_shared(class)
}
#[cold]
#[inline(never)]
unsafe fn advance_shared_cache_floor(&self, new_class: SizeClass) {
self.shared_cache_class.store(new_class.raw(), Ordering::Release);
let new_min_total = new_class.bytes();
let mut cur = self.shared_cache.swap(ptr::null_mut(), Ordering::AcqRel);
unsafe {
while !cur.is_null() {
let fat = SharedChunk::<A>::header_to_fat(cur);
let chunk_nn = NonNull::new_unchecked(fat);
let link = SharedChunk::cache_link(chunk_nn);
let next = (*link).load(Ordering::Acquire);
let total = SharedChunk::<A>::header_size() + (*chunk_nn.as_ptr()).capacity();
if total >= new_min_total {
self.push_shared(chunk_nn);
} else {
SharedChunk::destroy(chunk_nn);
self.release_bytes(total);
}
cur = next;
}
}
}
#[cfg_attr(test, mutants::skip)] fn allocate_fresh_shared(&self, class: SizeClass) -> Result<NonNull<SharedChunk<A>>, AllocError> {
let header = SharedChunk::<A>::header_size();
let total = class.bytes();
let payload_size = total - header;
self.reserve_bytes(total)?;
match SharedChunk::<A>::allocate(self.allocator.clone(), Weak::clone(&self.weak_self), payload_size) {
Ok(chunk) => {
#[cfg(feature = "stats")]
self.normal_shared_chunks_allocated.fetch_add(1, Ordering::Relaxed);
Ok(chunk)
}
Err(e) => {
self.release_bytes(total);
Err(e)
}
}
}
pub(crate) unsafe fn release_local(&self, chunk: NonNull<LocalChunk<A>>) {
let capacity = (*chunk.as_ptr()).capacity();
let total = chunk_alloc_size(LocalChunk::<A>::header_size(), capacity, LocalChunk::<A>::value_align())
.expect("released chunk's layout was valid when it was allocated");
#[cfg(feature = "stats")]
{
let wasted = u64::from((*chunk.as_ptr()).wasted_at_retire());
if wasted != 0 {
self.release_wasted_tail(wasted);
}
}
if !is_cacheable_size(total) || total < SizeClass::new(self.local_cache_class.load(Ordering::Relaxed)).bytes() {
LocalChunk::destroy(chunk, &self.allocator);
self.release_bytes(total);
return;
}
self.local_cache.with(|head| {
LocalChunk::set_next(chunk, *head);
*head = chunk.cast::<u8>().as_ptr();
});
}
pub(crate) unsafe fn release_shared(&self, chunk: NonNull<SharedChunk<A>>) {
let capacity = (*chunk.as_ptr()).capacity();
let total = chunk_alloc_size(SharedChunk::<A>::header_size(), capacity, SharedChunk::<A>::value_align())
.expect("released chunk's layout was valid when it was allocated");
#[cfg(feature = "stats")]
{
let wasted = u64::from((*chunk.as_ptr()).wasted_at_retire());
if wasted != 0 {
self.release_wasted_tail(wasted);
}
}
if !is_cacheable_size(total) || total < SizeClass::new(self.shared_cache_class.load(Ordering::Acquire)).bytes() {
SharedChunk::destroy(chunk);
self.release_bytes(total);
return;
}
self.push_shared(chunk);
}
pub(crate) fn preallocate_local(&self, class: SizeClass) -> Result<(), AllocError> {
let chunk = self.allocate_fresh_local(class)?;
unsafe { LocalChunk::<A>::destroy_or_cache_just_acquired(self, chunk) };
Ok(())
}
pub(crate) fn preallocate_shared(&self, class: SizeClass) -> Result<(), AllocError> {
let chunk = self.allocate_fresh_shared(class)?;
unsafe { SharedChunk::<A>::destroy_or_cache_just_acquired(self, chunk) };
Ok(())
}
fn reserve_bytes(&self, n: usize) -> Result<(), AllocError> {
if self
.bytes_outstanding
.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |cur| {
let new = cur.checked_add(n)?;
if new > self.config.byte_budget {
return None;
}
Some(new)
})
.is_ok()
{
Ok(())
} else {
Err(AllocError)
}
}
fn release_bytes(&self, n: usize) {
self.bytes_outstanding.fetch_sub(n, Ordering::AcqRel);
}
pub(crate) fn acquire_oversized_local(&self, min_payload: usize) -> Result<NonNull<LocalChunk<A>>, AllocError> {
let payload = round_up_to_drop_align(min_payload.checked_add(oversized_payload_align_slack()).ok_or(AllocError)?)?;
let total = chunk_alloc_size(LocalChunk::<A>::header_size(), payload, LocalChunk::<A>::value_align())?;
self.reserve_bytes(total)?;
match LocalChunk::<A>::allocate(&self.allocator, ptr::from_ref(self), payload) {
Ok(chunk) => {
#[cfg(feature = "stats")]
self.oversized_local_chunks_allocated.fetch_add(1, Ordering::Relaxed);
Ok(chunk)
}
Err(e) => {
self.release_bytes(total);
Err(e)
}
}
}
pub(crate) fn acquire_oversized_shared(&self, min_payload: usize) -> Result<NonNull<SharedChunk<A>>, AllocError> {
let payload = round_up_to_drop_align(min_payload.checked_add(oversized_payload_align_slack()).ok_or(AllocError)?)?;
let total = chunk_alloc_size(SharedChunk::<A>::header_size(), payload, SharedChunk::<A>::value_align())?;
self.reserve_bytes(total)?;
match SharedChunk::<A>::allocate(self.allocator.clone(), Weak::clone(&self.weak_self), payload) {
Ok(chunk) => {
#[cfg(feature = "stats")]
self.oversized_shared_chunks_allocated.fetch_add(1, Ordering::Relaxed);
Ok(chunk)
}
Err(e) => {
self.release_bytes(total);
Err(e)
}
}
}
unsafe fn pop_shared(&self) -> Option<NonNull<SharedChunk<A>>> {
let floor_min_total = SizeClass::new(self.shared_cache_class.load(Ordering::Relaxed)).bytes();
loop {
let updated = self.shared_cache.fetch_update(Ordering::AcqRel, Ordering::Acquire, |cur| {
if cur.is_null() {
return None;
}
let fat = SharedChunk::<A>::header_to_fat(cur);
let link = SharedChunk::cache_link(NonNull::new_unchecked(fat));
Some((*link).load(Ordering::Acquire))
});
let Ok(popped) = updated else { return None };
let fat = SharedChunk::<A>::header_to_fat(popped);
let chunk_nn = NonNull::new_unchecked(fat);
let total = SharedChunk::<A>::header_size() + (*chunk_nn.as_ptr()).capacity();
if total >= floor_min_total {
return Some(chunk_nn);
}
SharedChunk::destroy(chunk_nn);
self.release_bytes(total);
}
}
unsafe fn push_shared(&self, chunk: NonNull<SharedChunk<A>>) {
let head = &self.shared_cache;
let link = SharedChunk::cache_link(chunk);
let new = chunk.cast::<u8>().as_ptr();
let mut cur = head.load(Ordering::Acquire);
loop {
ptr::write((*link).as_ptr(), cur);
#[cfg(test)]
tests::maybe_inject_push_race::<A>(head, cur);
match head.compare_exchange_weak(cur, new, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => return,
Err(actual) => {
#[cfg(test)]
tests::note_push_retry();
cur = actual;
}
}
}
}
fn drain_all(&self) {
unsafe {
self.local_cache.with(|head| {
let mut cur = *head;
while !cur.is_null() {
let fat = LocalChunk::<A>::header_to_fat(cur);
let chunk_nn = NonNull::new_unchecked(fat);
let next = LocalChunk::next(chunk_nn);
LocalChunk::destroy(chunk_nn, &self.allocator);
cur = next;
}
*head = ptr::null_mut();
});
let mut cur = self.shared_cache.swap(ptr::null_mut(), Ordering::AcqRel);
while !cur.is_null() {
let fat = SharedChunk::<A>::header_to_fat(cur);
let chunk_nn = NonNull::new_unchecked(fat);
let link = SharedChunk::cache_link(chunk_nn);
let next = (*link).load(Ordering::Acquire);
SharedChunk::destroy(chunk_nn);
cur = next;
}
}
}
}
impl<A: Allocator + Clone> Drop for ChunkProvider<A> {
fn drop(&mut self) {
self.drain_all();
}
}
#[inline]
pub(crate) fn is_cacheable_size(total: usize) -> bool {
(MIN_CHUNK_BYTES..=MAX_CHUNK_BYTES).contains(&total) && total.is_power_of_two()
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn round_up_to_drop_align(min_payload: usize) -> Result<usize, AllocError> {
let mask = mem::align_of::<DropEntry>() - 1;
min_payload.checked_add(mask).map(|v| v & !mask).ok_or(AllocError)
}
#[inline]
#[cfg_attr(test, mutants::skip)]
fn oversized_payload_align_slack() -> usize {
mem::align_of::<DropEntry>() - 1
}
#[cfg_attr(test, mutants::skip)] #[inline]
fn exceeds_max_chunk_bytes(needed_total: usize) -> bool {
needed_total > MAX_CHUNK_BYTES
}
impl<A: Allocator + Clone> LocalChunk<A> {
pub(super) unsafe fn destroy_or_cache_just_acquired(provider: &ChunkProvider<A>, chunk: NonNull<Self>) {
unsafe {
let last = chunk.as_ref().dec_ref();
debug_assert!(last, "preallocate chunk refcount should reach zero");
provider.release_local(chunk);
}
}
}
impl<A: Allocator + Clone> SharedChunk<A> {
pub(super) unsafe fn destroy_or_cache_just_acquired(provider: &ChunkProvider<A>, chunk: NonNull<Self>) {
unsafe {
let last = chunk.as_ref().dec_ref();
debug_assert!(last, "preallocate chunk refcount should reach zero");
provider.release_shared(chunk);
}
}
}
#[cfg(test)]
mod tests {
use core::cell::Cell;
use std::thread_local;
use allocator_api2::alloc::Global;
use super::*;
thread_local! {
static INJECT_PUSH_RACE: Cell<*mut u8> = const { Cell::new(ptr::null_mut()) };
static PUSH_RETRY_COUNT: Cell<usize> = const { Cell::new(0) };
}
pub(super) unsafe fn maybe_inject_push_race<A: Allocator + Clone>(head: &AtomicPtr<u8>, cur: *mut u8) {
let inject = INJECT_PUSH_RACE.with(|slot| slot.replace(ptr::null_mut()));
if inject.is_null() {
return;
}
let fat = SharedChunk::<A>::header_to_fat(inject);
let link = SharedChunk::cache_link(NonNull::new_unchecked(fat));
ptr::write((*link).as_ptr(), cur);
head.store(inject, Ordering::Release);
}
pub(super) fn note_push_retry() {
PUSH_RETRY_COUNT.with(|c| c.set(c.get() + 1));
}
#[test]
fn chunk_provider_config_default_matches_constants() {
let c = ChunkProviderConfig::default();
assert_eq!(c.byte_budget, usize::MAX);
assert_eq!(c.max_normal_alloc(), MAX_NORMAL_ALLOC);
}
#[test]
fn pop_shared_destroys_below_floor_straggler() {
let provider = ChunkProvider::<Global>::new(Global, ChunkProviderConfig::default());
unsafe {
provider.advance_shared_cache_floor(SizeClass::new(3));
let chunk = provider.allocate_fresh_shared(SizeClass::ZERO).expect("fresh class-0 chunk");
assert!(chunk.as_ref().dec_ref(), "fresh chunk drops to refcount 0");
provider.push_shared(chunk);
assert!(provider.pop_shared().is_none());
}
}
#[test]
fn is_cacheable_size_requires_range_and_power_of_two() {
assert!(is_cacheable_size(MIN_CHUNK_BYTES));
assert!(is_cacheable_size(MAX_CHUNK_BYTES));
assert!(!is_cacheable_size(MIN_CHUNK_BYTES + 1));
assert!(!is_cacheable_size(MAX_CHUNK_BYTES * 2));
assert!(!is_cacheable_size(MIN_CHUNK_BYTES / 2));
assert!(!is_cacheable_size(0));
}
#[test]
fn push_shared_retries_on_contended_cas() {
let provider = ChunkProvider::<Global>::new(Global, ChunkProviderConfig::default());
PUSH_RETRY_COUNT.with(|c| c.set(0));
unsafe {
let c = provider.allocate_fresh_shared(SizeClass::ZERO).expect("chunk c");
assert!(c.as_ref().dec_ref(), "fresh chunk drops to refcount 0");
provider.push_shared(c);
let d = provider.allocate_fresh_shared(SizeClass::ZERO).expect("chunk d");
assert!(d.as_ref().dec_ref(), "fresh chunk drops to refcount 0");
INJECT_PUSH_RACE.with(|slot| slot.set(d.cast::<u8>().as_ptr()));
let b = provider.allocate_fresh_shared(SizeClass::ZERO).expect("chunk b");
assert!(b.as_ref().dec_ref(), "fresh chunk drops to refcount 0");
provider.push_shared(b);
}
assert!(
PUSH_RETRY_COUNT.with(Cell::get) >= 1,
"the contended CAS retry arm must run at least once",
);
}
}