use std::{
cell::UnsafeCell,
marker::PhantomData,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
vec::Vec,
};
#[repr(C, align(64))]
pub struct Bytes<const MAX_ITEM_SIZE: usize> {
bytes: [u8; MAX_ITEM_SIZE],
}
pub struct Arena<const MAX_ITEM_COUNT: usize, const MAX_ITEM_SIZE: usize> {
buffer: Vec<Arc<UnsafeCell<Bytes<MAX_ITEM_SIZE>>>>,
ref_counts: Vec<Arc<AtomicU32>>,
cursor: usize,
}
pub struct ReservedMemory<const MAX_ITEM_SIZE: usize> {
data: Arc<UnsafeCell<Bytes<MAX_ITEM_SIZE>>>,
ref_count: Arc<AtomicU32>,
drop_fn: fn(&mut Bytes<MAX_ITEM_SIZE>),
}
pub struct UninitReservedMemory<const MAX_ITEM_SIZE: usize> {
data: Arc<UnsafeCell<Bytes<MAX_ITEM_SIZE>>>,
ref_count: Arc<AtomicU32>,
#[cfg(test)]
index: usize,
not_sync: PhantomData<*const ()>,
}
impl<const MAX_ITEM_SIZE: usize> UninitReservedMemory<MAX_ITEM_SIZE> {
pub fn init<O>(self, obj: O) -> ReservedMemory<MAX_ITEM_SIZE> {
assert!(
accept_obj::<O, MAX_ITEM_SIZE>(),
"Object isn't safe to store in this arena"
);
self.init_with_func(
|bytes| {
let ptr = core::ptr::from_mut(bytes);
unsafe {
core::ptr::write(ptr as *mut O, obj);
};
},
|bytes| {
let ptr = core::ptr::from_mut(bytes);
unsafe {
core::ptr::drop_in_place(ptr as *mut O);
}
},
)
}
fn init_with_func<F>(
self,
init_data: F,
drop_fn: fn(&mut Bytes<MAX_ITEM_SIZE>),
) -> ReservedMemory<MAX_ITEM_SIZE>
where
F: FnOnce(&mut Bytes<MAX_ITEM_SIZE>),
{
assert_eq!(
Arc::strong_count(&self.data),
2,
"Slot must be held by exactly two owners (the arena and this \
UninitReservedMemory) to guarantee exclusive write access."
);
let bytes_mut = unsafe { self.data.as_ref().get().as_mut().unwrap() };
init_data(bytes_mut);
ReservedMemory {
data: self.data,
ref_count: self.ref_count,
drop_fn,
}
}
}
impl<const MAX_ITEM_SIZE: usize> core::fmt::Debug for ReservedMemory<MAX_ITEM_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReservedMemory")
.field("data", &self.data)
.field("drop_fn", &self.drop_fn)
.finish()
}
}
impl<const MAX_ITEM_SIZE: usize> Clone for ReservedMemory<MAX_ITEM_SIZE> {
fn clone(&self) -> Self {
self.ref_count.fetch_add(1, Ordering::Release);
Self {
data: self.data.clone(),
ref_count: self.ref_count.clone(),
drop_fn: self.drop_fn,
}
}
}
impl<const MAX_ITEM_SIZE: usize> Drop for ReservedMemory<MAX_ITEM_SIZE> {
fn drop(&mut self) {
let previous = self.ref_count.fetch_sub(1, Ordering::Release);
if previous == 1 {
let bytes_mut = unsafe { self.data.get().as_mut().unwrap() };
(self.drop_fn)(bytes_mut);
}
}
}
unsafe impl<const MAX_ITEM_SIZE: usize> Send for ReservedMemory<MAX_ITEM_SIZE> {}
unsafe impl<const MAX_ITEM_SIZE: usize> Sync for ReservedMemory<MAX_ITEM_SIZE> {}
impl<const MAX_ITEM_SIZE: usize> std::convert::AsRef<Bytes<MAX_ITEM_SIZE>>
for ReservedMemory<MAX_ITEM_SIZE>
{
fn as_ref(&self) -> &Bytes<MAX_ITEM_SIZE> {
unsafe { self.data.as_ref().get().as_ref().unwrap() }
}
}
impl<const MAX_ITEM_COUNT: usize, const MAX_ITEM_SIZE: usize> Default
for Arena<MAX_ITEM_COUNT, MAX_ITEM_SIZE>
{
fn default() -> Self {
Self::new()
}
}
impl<const MAX_ITEM_COUNT: usize, const MAX_ITEM_SIZE: usize> Arena<MAX_ITEM_COUNT, MAX_ITEM_SIZE> {
pub const fn new() -> Self {
Self {
buffer: Vec::new(),
ref_counts: Vec::new(),
cursor: 0,
}
}
pub const fn accept<O>() -> bool {
accept_obj::<O, MAX_ITEM_SIZE>()
}
pub fn reserve(&mut self) -> Option<UninitReservedMemory<MAX_ITEM_SIZE>> {
if self.buffer.is_empty() {
for _ in 0..MAX_ITEM_COUNT {
self.ref_counts.push(Arc::new(AtomicU32::new(0)));
#[allow(clippy::arc_with_non_send_sync)]
self.buffer.push(Arc::new(UnsafeCell::new(Bytes {
bytes: [0; MAX_ITEM_SIZE],
})));
}
}
for i in 0..MAX_ITEM_COUNT {
let i = (i + self.cursor) % MAX_ITEM_COUNT;
let item = &self.buffer[i];
if Arc::strong_count(item) == 1 {
self.cursor = (i + 1) % MAX_ITEM_COUNT;
let data = item.clone();
let ref_count = self.ref_counts[i].clone();
ref_count.store(1, Ordering::Release);
return Some(UninitReservedMemory {
data,
ref_count,
#[cfg(test)]
index: i,
not_sync: PhantomData,
});
}
}
None
}
}
const fn accept_obj<O, const MAX_ITEM_SIZE: usize>() -> bool {
size_of::<O>() <= size_of::<Bytes<MAX_ITEM_SIZE>>()
&& align_of::<O>() <= align_of::<Bytes<MAX_ITEM_SIZE>>()
}
#[cfg(test)]
mod tests {
use super::*;
const MAX_ITEM_SIZE: usize = 2048;
#[test]
fn test_lazy_initialization() {
let mut arena = Arena::<10, MAX_ITEM_SIZE>::new();
assert_eq!(
arena.buffer.len(),
0,
"Buffer should be empty before first reservation"
);
arena.reserve();
assert_eq!(
arena.buffer.len(),
10,
"Buffer should be initialized to size"
);
}
#[test]
fn test_sequential_allocation_moves_cursor() {
let mut arena = Arena::<3, MAX_ITEM_SIZE>::new();
let _ = arena.reserve().expect("Should allocate");
assert_eq!(arena.cursor, 1);
let _ = arena.reserve().expect("Should allocate");
assert_eq!(arena.cursor, 2);
}
#[test]
fn test_reuse_of_freed_data() {
let mut arena = Arena::<2, MAX_ITEM_SIZE>::new();
let data0 = arena.reserve().unwrap();
let _data1 = arena.reserve().unwrap();
assert!(arena.reserve().is_none(), "Should be full");
let data0_index = data0.index;
core::mem::drop(data0);
let data2 = arena.reserve().expect("Should reuse index 0");
assert_eq!(data0_index, data2.index);
}
#[test]
fn test_circular_cursor_search() {
let mut arena = Arena::<3, MAX_ITEM_SIZE>::new();
let _d0 = arena.reserve().unwrap();
let d1 = arena.reserve().unwrap();
let _d2 = arena.reserve().unwrap();
core::mem::drop(d1);
let _ = arena.reserve().expect("Should find the hole at index 1");
assert_eq!(arena.cursor, 2);
}
#[test]
fn test_full_arena_returns_none() {
let mut arena = Arena::<5, MAX_ITEM_SIZE>::new();
let mut reserved = Vec::new();
for _ in 0..5 {
let item = arena.reserve();
assert!(item.is_some());
reserved.push(item);
}
assert!(arena.reserve().is_none());
}
}
#[cfg(test)]
mod drop_lifecycle_tests {
use super::*;
use alloc::boxed::Box;
use alloc::vec::Vec;
use std::sync::Arc;
struct Payload {
_anchor: Arc<()>,
}
#[test]
fn last_clone_runs_destructor_with_one_clone() {
let anchor = Arc::new(());
let mut arena = Arena::<4, 256>::new();
let reserved = arena.reserve().unwrap().init(Payload {
_anchor: anchor.clone(),
});
assert_eq!(Arc::strong_count(&anchor), 2);
drop(reserved);
assert_eq!(
Arc::strong_count(&anchor),
1,
"single ReservedMemory must run drop_fn on drop"
);
}
#[test]
fn destructor_deferred_until_last_of_two_clones() {
let anchor = Arc::new(());
let mut arena = Arena::<4, 256>::new();
let a = arena.reserve().unwrap().init(Payload {
_anchor: anchor.clone(),
});
let b = a.clone();
drop(a);
assert_eq!(
Arc::strong_count(&anchor),
2,
"destructor fired prematurely — `b` still owns the payload"
);
drop(b);
assert_eq!(Arc::strong_count(&anchor), 1);
}
#[test]
fn destructor_deferred_until_last_of_many_clones() {
let anchor = Arc::new(());
let mut arena = Arena::<4, 256>::new();
let first = arena.reserve().unwrap().init(Payload {
_anchor: anchor.clone(),
});
const N: usize = 16;
let clones: Vec<_> = (0..N).map(|_| first.clone()).collect();
drop(first);
assert_eq!(Arc::strong_count(&anchor), 2);
for (i, c) in clones.into_iter().enumerate() {
drop(c);
let expected = if i + 1 == N { 1 } else { 2 };
assert_eq!(
Arc::strong_count(&anchor),
expected,
"premature destructor after dropping clone {i}"
);
}
}
#[test]
fn destructor_runs_exactly_once_across_refill_cycle() {
let anchor1 = Arc::new(());
let anchor2 = Arc::new(());
let mut arena = Arena::<1, 256>::new();
let first = arena.reserve().unwrap().init(Payload {
_anchor: anchor1.clone(),
});
drop(first);
assert_eq!(Arc::strong_count(&anchor1), 1);
let second = arena.reserve().unwrap().init(Payload {
_anchor: anchor2.clone(),
});
assert_eq!(
Arc::strong_count(&anchor1),
1,
"refilling the slot must not touch the prior payload's anchor"
);
assert_eq!(Arc::strong_count(&anchor2), 2);
drop(second);
assert_eq!(Arc::strong_count(&anchor2), 1);
}
#[test]
fn heap_owning_payload_drops_exactly_once() {
struct HeapOwner(#[allow(dead_code)] Box<[u64; 8]>);
let mut arena = Arena::<2, 256>::new();
let a = arena
.reserve()
.unwrap()
.init(HeapOwner(Box::new([1, 2, 3, 4, 5, 6, 7, 8])));
let b = a.clone();
let c = a.clone();
drop(a);
drop(b);
drop(c);
}
}
#[cfg(test)]
mod concurrent_drop_timing_tests {
use super::*;
use std::sync::{Arc, Barrier};
use std::thread;
#[test]
fn concurrent_drops_release_anchor_exactly_once() {
let anchor = Arc::new(());
let mut arena = Arena::<4, 256>::new();
struct Payload {
#[allow(dead_code)]
anchor: Arc<()>,
}
let reserved = arena.reserve().unwrap().init(Payload {
anchor: anchor.clone(),
});
const N: usize = 8;
let barrier = Arc::new(Barrier::new(N));
let mut handles = Vec::with_capacity(N);
for _ in 0..N - 1 {
let clone = reserved.clone();
let b = barrier.clone();
handles.push(thread::spawn(move || {
b.wait();
drop(clone);
}));
}
{
let b = barrier.clone();
let original = reserved;
handles.push(thread::spawn(move || {
b.wait();
drop(original);
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(
Arc::strong_count(&anchor),
1,
"after all clones drop, payload anchor must be released exactly once"
);
}
}
#[cfg(test)]
mod concurrent_tests {
use super::*;
use std::sync::{Arc, Barrier, Mutex};
use std::{thread, vec};
const MAX_ITEM_SIZE: usize = 2048;
fn shared_arena<const N: usize>() -> Arc<Mutex<Arena<N, MAX_ITEM_SIZE>>> {
#[allow(clippy::arc_with_non_send_sync)]
Arc::new(Mutex::new(Arena::<N, MAX_ITEM_SIZE>::new()))
}
#[test]
fn test_drop_called_exactly_once_under_contention() {
let drop_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let arena = shared_arena::<4>();
let uninit = arena.lock().unwrap().reserve().unwrap();
struct Probe(Arc<std::sync::atomic::AtomicUsize>);
impl Drop for Probe {
fn drop(&mut self) {
self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
let reserved = uninit.init(Probe(drop_count.clone()));
let barrier = Arc::new(Barrier::new(32));
let mut handles = vec![];
for _ in 0..32 {
let r = reserved.clone();
let b = barrier.clone();
handles.push(thread::spawn(move || {
b.wait(); drop(r);
}));
}
drop(reserved); for h in handles {
h.join().unwrap();
}
assert_eq!(
drop_count.load(std::sync::atomic::Ordering::Relaxed),
1,
"drop_fn must be called exactly once"
);
}
#[test]
fn test_slot_reuse_after_concurrent_drop() {
let arena = shared_arena::<1>();
let uninit = arena.lock().unwrap().reserve().unwrap();
let reserved = uninit.init(42u64);
let barrier = Arc::new(Barrier::new(8));
let mut handles = vec![];
for _ in 0..8 {
let r = reserved.clone();
let b = barrier.clone();
handles.push(thread::spawn(move || {
b.wait();
drop(r);
}));
}
drop(reserved);
for h in handles {
h.join().unwrap();
}
assert!(
arena.lock().unwrap().reserve().is_some(),
"Slot should be available after all clones are dropped"
);
}
#[test]
fn test_drop_after_arena_dropped() {
let drop_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
struct Probe(Arc<std::sync::atomic::AtomicUsize>);
impl Drop for Probe {
fn drop(&mut self) {
self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
let reserved = {
let mut arena = Arena::<4, MAX_ITEM_SIZE>::new();
let uninit = arena.reserve().unwrap();
uninit.init(Probe(drop_count.clone()))
};
let barrier = Arc::new(Barrier::new(8));
let mut handles = vec![];
for _ in 0..8 {
let r = reserved.clone();
let b = barrier.clone();
handles.push(thread::spawn(move || {
b.wait();
drop(r);
}));
}
drop(reserved);
for h in handles {
h.join().unwrap();
}
assert_eq!(
drop_count.load(std::sync::atomic::Ordering::Relaxed),
1,
"drop_fn must fire exactly once even when arena is dropped first"
);
}
}