#![allow(unsafe_code)]
use std::alloc::{Layout, alloc, dealloc};
use std::fmt;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use parking_lot::RwLock;
use crate::types::EpochId;
const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
#[derive(Debug, Clone)]
pub enum AllocError {
OutOfMemory,
EpochNotFound(EpochId),
InsufficientSpace,
}
impl fmt::Display for AllocError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::OutOfMemory => write!(f, "arena allocation failed: out of memory"),
Self::EpochNotFound(id) => write!(f, "epoch {id} not found in arena allocator"),
Self::InsufficientSpace => {
write!(f, "arena chunk has insufficient space for allocation")
}
}
}
}
impl std::error::Error for AllocError {}
impl From<AllocError> for crate::Error {
fn from(e: AllocError) -> Self {
match e {
AllocError::OutOfMemory | AllocError::InsufficientSpace => {
crate::Error::Storage(crate::utils::error::StorageError::Full)
}
AllocError::EpochNotFound(id) => {
crate::Error::Internal(format!("epoch {id} not found in arena allocator"))
}
}
}
}
struct Chunk {
ptr: NonNull<u8>,
capacity: usize,
offset: AtomicUsize,
}
impl Chunk {
fn new(capacity: usize) -> Result<Self, AllocError> {
let layout = Layout::from_size_align(capacity, 16).map_err(|_| AllocError::OutOfMemory)?;
let ptr = unsafe { alloc(layout) };
let ptr = NonNull::new(ptr).ok_or(AllocError::OutOfMemory)?;
Ok(Self {
ptr,
capacity,
offset: AtomicUsize::new(0),
})
}
fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
self.try_alloc_with_offset(size, align).map(|(_, ptr)| ptr)
}
fn try_alloc_with_offset(&self, size: usize, align: usize) -> Option<(u32, NonNull<u8>)> {
loop {
let current = self.offset.load(Ordering::Relaxed);
let aligned = (current + align - 1) & !(align - 1);
let new_offset = aligned + size;
if new_offset > self.capacity {
return None;
}
match self.offset.compare_exchange_weak(
current,
new_offset,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
return Some((aligned as u32, NonNull::new(ptr)?));
}
Err(_) => continue, }
}
}
fn used(&self) -> usize {
self.offset.load(Ordering::Relaxed)
}
}
impl Drop for Chunk {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
unsafe { dealloc(self.ptr.as_ptr(), layout) };
}
}
unsafe impl Send for Chunk {}
unsafe impl Sync for Chunk {}
pub struct Arena {
epoch: EpochId,
chunks: RwLock<Vec<Chunk>>,
chunk_size: usize,
total_allocated: AtomicUsize,
}
impl Arena {
pub fn new(epoch: EpochId) -> Result<Self, AllocError> {
Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
}
pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Result<Self, AllocError> {
let initial_chunk = Chunk::new(chunk_size)?;
Ok(Self {
epoch,
chunks: RwLock::new(vec![initial_chunk]),
chunk_size,
total_allocated: AtomicUsize::new(chunk_size),
})
}
#[must_use]
pub fn epoch(&self) -> EpochId {
self.epoch
}
pub fn alloc(&self, size: usize, align: usize) -> Result<NonNull<u8>, AllocError> {
{
let chunks = self.chunks.read();
for chunk in chunks.iter().rev() {
if let Some(ptr) = chunk.try_alloc(size, align) {
return Ok(ptr);
}
}
}
self.alloc_new_chunk(size, align)
}
pub fn alloc_value<T>(&self, value: T) -> Result<&mut T, AllocError> {
let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>())?;
Ok(unsafe {
let typed_ptr = ptr.as_ptr() as *mut T;
typed_ptr.write(value);
&mut *typed_ptr
})
}
pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> Result<&mut [T], AllocError> {
if values.is_empty() {
return Ok(&mut []);
}
let size = std::mem::size_of::<T>() * values.len();
let align = std::mem::align_of::<T>();
let ptr = self.alloc(size, align)?;
Ok(unsafe {
let typed_ptr = ptr.as_ptr() as *mut T;
std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
std::slice::from_raw_parts_mut(typed_ptr, values.len())
})
}
#[cfg(feature = "tiered-storage")]
pub fn alloc_value_with_offset<T>(&self, value: T) -> Result<(u32, &mut T), AllocError> {
let size = std::mem::size_of::<T>();
let align = std::mem::align_of::<T>();
let chunks = self.chunks.read();
let chunk = chunks
.first()
.expect("Arena should have at least one chunk");
let (offset, ptr) = chunk
.try_alloc_with_offset(size, align)
.ok_or(AllocError::InsufficientSpace)?;
Ok(unsafe {
let typed_ptr = ptr.as_ptr().cast::<T>();
typed_ptr.write(value);
(offset, &mut *typed_ptr)
})
}
#[cfg(feature = "tiered-storage")]
pub unsafe fn read_at<T>(&self, offset: u32) -> &T {
let chunks = self.chunks.read();
let chunk = chunks
.first()
.expect("Arena should have at least one chunk");
debug_assert!(
(offset as usize) + std::mem::size_of::<T>() <= chunk.used(),
"read_at: offset {} + size_of::<{}>() = {} exceeds chunk used bytes {}",
offset,
std::any::type_name::<T>(),
(offset as usize) + std::mem::size_of::<T>(),
chunk.used()
);
debug_assert!(
(offset as usize).is_multiple_of(std::mem::align_of::<T>()),
"read_at: offset {} is not aligned for {} (alignment {})",
offset,
std::any::type_name::<T>(),
std::mem::align_of::<T>()
);
unsafe {
let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
&*ptr
}
}
#[cfg(feature = "tiered-storage")]
pub unsafe fn read_at_mut<T>(&self, offset: u32) -> &mut T {
let chunks = self.chunks.read();
let chunk = chunks
.first()
.expect("Arena should have at least one chunk");
debug_assert!(
(offset as usize) + std::mem::size_of::<T>() <= chunk.capacity,
"read_at_mut: offset {} + size_of::<{}>() = {} exceeds chunk capacity {}",
offset,
std::any::type_name::<T>(),
(offset as usize) + std::mem::size_of::<T>(),
chunk.capacity
);
debug_assert!(
(offset as usize).is_multiple_of(std::mem::align_of::<T>()),
"read_at_mut: offset {} is not aligned for {} (alignment {})",
offset,
std::any::type_name::<T>(),
std::mem::align_of::<T>()
);
unsafe {
let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
&mut *ptr
}
}
fn alloc_new_chunk(&self, size: usize, align: usize) -> Result<NonNull<u8>, AllocError> {
let chunk_size = self.chunk_size.max(size + align);
let chunk = Chunk::new(chunk_size)?;
self.total_allocated
.fetch_add(chunk_size, Ordering::Relaxed);
let ptr = chunk
.try_alloc(size, align)
.expect("fresh chunk sized to fit");
let mut chunks = self.chunks.write();
chunks.push(chunk);
Ok(ptr)
}
#[must_use]
pub fn total_allocated(&self) -> usize {
self.total_allocated.load(Ordering::Relaxed)
}
#[must_use]
pub fn total_used(&self) -> usize {
let chunks = self.chunks.read();
chunks.iter().map(Chunk::used).sum()
}
#[must_use]
pub fn stats(&self) -> ArenaStats {
let chunks = self.chunks.read();
ArenaStats {
epoch: self.epoch,
chunk_count: chunks.len(),
total_allocated: self.total_allocated.load(Ordering::Relaxed),
total_used: chunks.iter().map(Chunk::used).sum(),
}
}
}
#[derive(Debug, Clone)]
pub struct ArenaStats {
pub epoch: EpochId,
pub chunk_count: usize,
pub total_allocated: usize,
pub total_used: usize,
}
pub struct ArenaAllocator {
arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
current_epoch: AtomicUsize,
chunk_size: usize,
}
impl ArenaAllocator {
pub fn new() -> Result<Self, AllocError> {
Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
}
pub fn with_chunk_size(chunk_size: usize) -> Result<Self, AllocError> {
let allocator = Self {
arenas: RwLock::new(hashbrown::HashMap::new()),
current_epoch: AtomicUsize::new(0),
chunk_size,
};
let epoch = EpochId::INITIAL;
allocator
.arenas
.write()
.insert(epoch, Arena::with_chunk_size(epoch, chunk_size)?);
Ok(allocator)
}
#[must_use]
pub fn current_epoch(&self) -> EpochId {
EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
}
pub fn new_epoch(&self) -> Result<EpochId, AllocError> {
let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
let epoch = EpochId::new(new_id);
let arena = Arena::with_chunk_size(epoch, self.chunk_size)?;
self.arenas.write().insert(epoch, arena);
Ok(epoch)
}
pub fn arena(
&self,
epoch: EpochId,
) -> Result<impl std::ops::Deref<Target = Arena> + '_, AllocError> {
let arenas = self.arenas.read();
if !arenas.contains_key(&epoch) {
return Err(AllocError::EpochNotFound(epoch));
}
Ok(parking_lot::RwLockReadGuard::map(arenas, |arenas| {
&arenas[&epoch]
}))
}
#[cfg(feature = "tiered-storage")]
pub fn ensure_epoch(&self, epoch: EpochId) -> Result<bool, AllocError> {
{
let arenas = self.arenas.read();
if arenas.contains_key(&epoch) {
return Ok(false);
}
}
let mut arenas = self.arenas.write();
if arenas.contains_key(&epoch) {
return Ok(false);
}
let arena = Arena::with_chunk_size(epoch, self.chunk_size)?;
arenas.insert(epoch, arena);
Ok(true)
}
#[cfg(feature = "tiered-storage")]
pub fn arena_or_create(
&self,
epoch: EpochId,
) -> Result<impl std::ops::Deref<Target = Arena> + '_, AllocError> {
self.ensure_epoch(epoch)?;
self.arena(epoch)
}
pub fn alloc(&self, size: usize, align: usize) -> Result<NonNull<u8>, AllocError> {
let epoch = self.current_epoch();
let arenas = self.arenas.read();
arenas
.get(&epoch)
.expect("current epoch always exists")
.alloc(size, align)
}
pub fn drop_epoch(&self, epoch: EpochId) {
self.arenas.write().remove(&epoch);
}
#[must_use]
pub fn total_allocated(&self) -> usize {
self.arenas
.read()
.values()
.map(Arena::total_allocated)
.sum()
}
}
impl Default for ArenaAllocator {
fn default() -> Self {
Self::new().expect("failed to allocate default arena")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_arena_basic_allocation() {
let arena = Arena::new(EpochId::INITIAL).unwrap();
let ptr1 = arena.alloc(100, 8).unwrap();
let ptr2 = arena.alloc(200, 8).unwrap();
assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
}
#[test]
fn test_arena_value_allocation() {
let arena = Arena::new(EpochId::INITIAL).unwrap();
let value = arena.alloc_value(42u64).unwrap();
assert_eq!(*value, 42);
*value = 100;
assert_eq!(*value, 100);
}
#[test]
fn test_arena_slice_allocation() {
let arena = Arena::new(EpochId::INITIAL).unwrap();
let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]).unwrap();
assert_eq!(slice, &[1, 2, 3, 4, 5]);
slice[0] = 10;
assert_eq!(slice[0], 10);
}
#[test]
fn test_arena_large_allocation() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024).unwrap();
let _ptr = arena.alloc(2048, 8).unwrap();
assert!(arena.stats().chunk_count >= 2);
}
#[test]
fn test_arena_allocator_epochs() {
let allocator = ArenaAllocator::new().unwrap();
let epoch0 = allocator.current_epoch();
assert_eq!(epoch0, EpochId::INITIAL);
let epoch1 = allocator.new_epoch().unwrap();
assert_eq!(epoch1, EpochId::new(1));
let epoch2 = allocator.new_epoch().unwrap();
assert_eq!(epoch2, EpochId::new(2));
assert_eq!(allocator.current_epoch(), epoch2);
}
#[test]
fn test_arena_allocator_allocation() {
let allocator = ArenaAllocator::new().unwrap();
let ptr1 = allocator.alloc(100, 8).unwrap();
let ptr2 = allocator.alloc(100, 8).unwrap();
assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
}
#[test]
fn test_arena_drop_epoch() {
let allocator = ArenaAllocator::new().unwrap();
let initial_mem = allocator.total_allocated();
let epoch1 = allocator.new_epoch().unwrap();
{
let arena = allocator.arena(epoch1).unwrap();
arena.alloc(10000, 8).unwrap();
}
let after_alloc = allocator.total_allocated();
assert!(after_alloc > initial_mem);
allocator.drop_epoch(epoch1);
let after_drop = allocator.total_allocated();
assert!(after_drop < after_alloc);
}
#[test]
fn test_arena_stats() {
let arena = Arena::with_chunk_size(EpochId::new(5), 4096).unwrap();
let stats = arena.stats();
assert_eq!(stats.epoch, EpochId::new(5));
assert_eq!(stats.chunk_count, 1);
assert_eq!(stats.total_allocated, 4096);
assert_eq!(stats.total_used, 0);
arena.alloc(100, 8).unwrap();
let stats = arena.stats();
assert!(stats.total_used >= 100);
}
}
#[cfg(all(test, feature = "tiered-storage"))]
mod tiered_storage_tests {
use super::*;
#[test]
fn test_alloc_value_with_offset_basic() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let (offset1, val1) = arena.alloc_value_with_offset(42u64).unwrap();
let (offset2, val2) = arena.alloc_value_with_offset(100u64).unwrap();
assert_eq!(offset1, 0);
assert!(offset2 > offset1);
assert!(offset2 >= std::mem::size_of::<u64>() as u32);
assert_eq!(*val1, 42);
assert_eq!(*val2, 100);
*val1 = 999;
assert_eq!(*val1, 999);
}
#[test]
fn test_read_at_basic() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let (offset, _) = arena.alloc_value_with_offset(12345u64).unwrap();
let value: &u64 = unsafe { arena.read_at(offset) };
assert_eq!(*value, 12345);
}
#[test]
fn test_read_at_mut_basic() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let (offset, _) = arena.alloc_value_with_offset(42u64).unwrap();
let value: &mut u64 = unsafe { arena.read_at_mut(offset) };
assert_eq!(*value, 42);
*value = 100;
let value: &u64 = unsafe { arena.read_at(offset) };
assert_eq!(*value, 100);
}
#[test]
fn test_alloc_value_with_offset_struct() {
#[derive(Debug, Clone, PartialEq)]
struct TestNode {
id: u64,
name: [u8; 32],
value: i32,
}
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let node = TestNode {
id: 12345,
name: [b'A'; 32],
value: -999,
};
let (offset, stored) = arena.alloc_value_with_offset(node.clone()).unwrap();
assert_eq!(stored.id, 12345);
assert_eq!(stored.value, -999);
let read: &TestNode = unsafe { arena.read_at(offset) };
assert_eq!(read.id, node.id);
assert_eq!(read.name, node.name);
assert_eq!(read.value, node.value);
}
#[test]
fn test_alloc_value_with_offset_alignment() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let (offset1, _) = arena.alloc_value_with_offset(1u8).unwrap();
assert_eq!(offset1, 0);
let (offset2, val) = arena.alloc_value_with_offset(42u64).unwrap();
assert_eq!(offset2 % 8, 0);
assert_eq!(*val, 42);
}
#[test]
fn test_alloc_value_with_offset_multiple() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let mut offsets = Vec::new();
for i in 0..100u64 {
let (offset, val) = arena.alloc_value_with_offset(i).unwrap();
offsets.push(offset);
assert_eq!(*val, i);
}
for window in offsets.windows(2) {
assert!(window[0] < window[1]);
}
for (i, offset) in offsets.iter().enumerate() {
let val: &u64 = unsafe { arena.read_at(*offset) };
assert_eq!(*val, i as u64);
}
}
#[test]
fn test_arena_allocator_with_offset() {
let allocator = ArenaAllocator::with_chunk_size(4096).unwrap();
let epoch = allocator.current_epoch();
let arena = allocator.arena(epoch).unwrap();
let (offset, val) = arena.alloc_value_with_offset(42u64).unwrap();
assert_eq!(*val, 42);
let read: &u64 = unsafe { arena.read_at(offset) };
assert_eq!(*read, 42);
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "exceeds chunk used bytes")]
fn test_read_at_out_of_bounds() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let (_offset, _) = arena.alloc_value_with_offset(42u64).unwrap();
unsafe {
let _: &u64 = arena.read_at(4000);
}
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "is not aligned")]
fn test_read_at_misaligned() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let (_offset, _) = arena.alloc_value_with_offset(0xFFu8).unwrap();
let _ = arena.alloc_value_with_offset(0u64).unwrap();
unsafe {
let _: &u64 = arena.read_at(1);
}
}
#[test]
#[cfg(not(miri))] fn test_concurrent_read_stress() {
use std::sync::Arc;
let arena = Arc::new(Arena::with_chunk_size(EpochId::INITIAL, 1024 * 1024).unwrap());
let num_threads = 8;
let values_per_thread = 1000;
let mut all_offsets = Vec::new();
for t in 0..num_threads {
let base = (t * values_per_thread) as u64;
let mut offsets = Vec::with_capacity(values_per_thread);
for i in 0..values_per_thread as u64 {
let (offset, _) = arena.alloc_value_with_offset(base + i).unwrap();
offsets.push(offset);
}
all_offsets.push(offsets);
}
let mut handles = Vec::new();
for (t, offsets) in all_offsets.into_iter().enumerate() {
let arena = Arc::clone(&arena);
let base = (t * values_per_thread) as u64;
handles.push(std::thread::spawn(move || {
for (i, offset) in offsets.iter().enumerate() {
let val: &u64 = unsafe { arena.read_at(*offset) };
assert_eq!(*val, base + i as u64);
}
}));
}
for handle in handles {
handle.join().expect("Thread panicked");
}
}
#[test]
fn test_alloc_value_with_offset_insufficient_space() {
let arena = Arena::with_chunk_size(EpochId::INITIAL, 64).unwrap();
let _ = arena.alloc_value_with_offset([0u8; 48]).unwrap();
let result = arena.alloc_value_with_offset([0u8; 32]);
assert!(result.is_err());
}
#[test]
fn test_multi_type_interleaved() {
#[derive(Debug, Clone, PartialEq)]
#[repr(C)]
struct Record {
id: u64,
flags: u32,
weight: f32,
}
let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
let (off_u8, _) = arena.alloc_value_with_offset(0xAAu8).unwrap();
let (off_u32, _) = arena.alloc_value_with_offset(0xBBBBu32).unwrap();
let (off_u64, _) = arena.alloc_value_with_offset(0xCCCCCCCCu64).unwrap();
let (off_rec, _) = arena
.alloc_value_with_offset(Record {
id: 42,
flags: 0xFF,
weight: std::f32::consts::PI,
})
.unwrap();
unsafe {
assert_eq!(*arena.read_at::<u8>(off_u8), 0xAA);
assert_eq!(*arena.read_at::<u32>(off_u32), 0xBBBB);
assert_eq!(*arena.read_at::<u64>(off_u64), 0xCCCCCCCC);
let rec: &Record = arena.read_at(off_rec);
assert_eq!(rec.id, 42);
assert_eq!(rec.flags, 0xFF);
assert!((rec.weight - std::f32::consts::PI).abs() < 0.001);
}
}
}