1#![allow(unsafe_code)]
12
13use std::alloc::{Layout, alloc, dealloc};
14use std::ptr::NonNull;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17use parking_lot::RwLock;
18
19use crate::types::EpochId;
20
21const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
23
24struct Chunk {
26 ptr: NonNull<u8>,
28 capacity: usize,
30 offset: AtomicUsize,
32}
33
34impl Chunk {
35 fn new(capacity: usize) -> Self {
37 let layout = Layout::from_size_align(capacity, 16).expect("Invalid layout");
38 let ptr = unsafe { alloc(layout) };
40 let ptr = NonNull::new(ptr).expect("Allocation failed");
41
42 Self {
43 ptr,
44 capacity,
45 offset: AtomicUsize::new(0),
46 }
47 }
48
49 fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
52 self.try_alloc_with_offset(size, align).map(|(_, ptr)| ptr)
53 }
54
55 fn try_alloc_with_offset(&self, size: usize, align: usize) -> Option<(u32, NonNull<u8>)> {
59 loop {
60 let current = self.offset.load(Ordering::Relaxed);
61
62 let aligned = (current + align - 1) & !(align - 1);
64 let new_offset = aligned + size;
65
66 if new_offset > self.capacity {
67 return None;
68 }
69
70 match self.offset.compare_exchange_weak(
72 current,
73 new_offset,
74 Ordering::AcqRel,
75 Ordering::Relaxed,
76 ) {
77 Ok(_) => {
78 let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
80 return Some((aligned as u32, NonNull::new(ptr)?));
81 }
82 Err(_) => continue, }
84 }
85 }
86
87 fn used(&self) -> usize {
89 self.offset.load(Ordering::Relaxed)
90 }
91
92 #[allow(dead_code)]
94 fn remaining(&self) -> usize {
95 self.capacity - self.used()
96 }
97}
98
99impl Drop for Chunk {
100 fn drop(&mut self) {
101 let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
102 unsafe { dealloc(self.ptr.as_ptr(), layout) };
104 }
105}
106
107unsafe impl Send for Chunk {}
109unsafe impl Sync for Chunk {}
110
111pub struct Arena {
119 epoch: EpochId,
121 chunks: RwLock<Vec<Chunk>>,
123 chunk_size: usize,
125 total_allocated: AtomicUsize,
127}
128
129impl Arena {
130 #[must_use]
132 pub fn new(epoch: EpochId) -> Self {
133 Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
134 }
135
136 #[must_use]
138 pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Self {
139 let initial_chunk = Chunk::new(chunk_size);
140 Self {
141 epoch,
142 chunks: RwLock::new(vec![initial_chunk]),
143 chunk_size,
144 total_allocated: AtomicUsize::new(chunk_size),
145 }
146 }
147
148 #[must_use]
150 pub fn epoch(&self) -> EpochId {
151 self.epoch
152 }
153
154 pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
160 {
162 let chunks = self.chunks.read();
163 for chunk in chunks.iter().rev() {
164 if let Some(ptr) = chunk.try_alloc(size, align) {
165 return ptr;
166 }
167 }
168 }
169
170 self.alloc_new_chunk(size, align)
172 }
173
174 pub fn alloc_value<T>(&self, value: T) -> &mut T {
176 let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>());
177 unsafe {
179 let typed_ptr = ptr.as_ptr() as *mut T;
180 typed_ptr.write(value);
181 &mut *typed_ptr
182 }
183 }
184
185 pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> &mut [T] {
187 if values.is_empty() {
188 return &mut [];
189 }
190
191 let size = std::mem::size_of::<T>() * values.len();
192 let align = std::mem::align_of::<T>();
193 let ptr = self.alloc(size, align);
194
195 unsafe {
197 let typed_ptr = ptr.as_ptr() as *mut T;
198 std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
199 std::slice::from_raw_parts_mut(typed_ptr, values.len())
200 }
201 }
202
203 #[cfg(feature = "tiered-storage")]
213 pub fn alloc_value_with_offset<T>(&self, value: T) -> (u32, &mut T) {
214 let size = std::mem::size_of::<T>();
215 let align = std::mem::align_of::<T>();
216
217 let chunks = self.chunks.read();
219 let chunk = chunks
220 .first()
221 .expect("Arena should have at least one chunk");
222
223 let (offset, ptr) = chunk
224 .try_alloc_with_offset(size, align)
225 .expect("Allocation would create new chunk - increase chunk size");
226
227 unsafe {
229 let typed_ptr = ptr.as_ptr().cast::<T>();
230 typed_ptr.write(value);
231 (offset, &mut *typed_ptr)
232 }
233 }
234
235 #[cfg(feature = "tiered-storage")]
243 pub unsafe fn read_at<T>(&self, offset: u32) -> &T {
244 let chunks = self.chunks.read();
245 let chunk = chunks
246 .first()
247 .expect("Arena should have at least one chunk");
248
249 debug_assert!(
250 (offset as usize) + std::mem::size_of::<T>() <= chunk.used(),
251 "read_at: offset {} + size_of::<{}>() = {} exceeds chunk used bytes {}",
252 offset,
253 std::any::type_name::<T>(),
254 (offset as usize) + std::mem::size_of::<T>(),
255 chunk.used()
256 );
257 debug_assert!(
258 (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
259 "read_at: offset {} is not aligned for {} (alignment {})",
260 offset,
261 std::any::type_name::<T>(),
262 std::mem::align_of::<T>()
263 );
264
265 unsafe {
267 let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
268 &*ptr
269 }
270 }
271
272 #[cfg(feature = "tiered-storage")]
281 pub unsafe fn read_at_mut<T>(&self, offset: u32) -> &mut T {
282 let chunks = self.chunks.read();
283 let chunk = chunks
284 .first()
285 .expect("Arena should have at least one chunk");
286
287 debug_assert!(
288 (offset as usize) + std::mem::size_of::<T>() <= chunk.capacity,
289 "read_at_mut: offset {} + size_of::<{}>() = {} exceeds chunk capacity {}",
290 offset,
291 std::any::type_name::<T>(),
292 (offset as usize) + std::mem::size_of::<T>(),
293 chunk.capacity
294 );
295 debug_assert!(
296 (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
297 "read_at_mut: offset {} is not aligned for {} (alignment {})",
298 offset,
299 std::any::type_name::<T>(),
300 std::mem::align_of::<T>()
301 );
302
303 unsafe {
305 let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
306 &mut *ptr
307 }
308 }
309
310 fn alloc_new_chunk(&self, size: usize, align: usize) -> NonNull<u8> {
312 let chunk_size = self.chunk_size.max(size + align);
313 let chunk = Chunk::new(chunk_size);
314
315 self.total_allocated
316 .fetch_add(chunk_size, Ordering::Relaxed);
317
318 let ptr = chunk
319 .try_alloc(size, align)
320 .expect("Fresh chunk should have space");
321
322 let mut chunks = self.chunks.write();
323 chunks.push(chunk);
324
325 ptr
326 }
327
328 #[must_use]
330 pub fn total_allocated(&self) -> usize {
331 self.total_allocated.load(Ordering::Relaxed)
332 }
333
334 #[must_use]
336 pub fn total_used(&self) -> usize {
337 let chunks = self.chunks.read();
338 chunks.iter().map(Chunk::used).sum()
339 }
340
341 #[must_use]
343 pub fn stats(&self) -> ArenaStats {
344 let chunks = self.chunks.read();
345 ArenaStats {
346 epoch: self.epoch,
347 chunk_count: chunks.len(),
348 total_allocated: self.total_allocated.load(Ordering::Relaxed),
349 total_used: chunks.iter().map(Chunk::used).sum(),
350 }
351 }
352}
353
354#[derive(Debug, Clone)]
356pub struct ArenaStats {
357 pub epoch: EpochId,
359 pub chunk_count: usize,
361 pub total_allocated: usize,
363 pub total_used: usize,
365}
366
367pub struct ArenaAllocator {
372 arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
374 current_epoch: AtomicUsize,
376 chunk_size: usize,
378}
379
380impl ArenaAllocator {
381 #[must_use]
383 pub fn new() -> Self {
384 Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
385 }
386
387 #[must_use]
389 pub fn with_chunk_size(chunk_size: usize) -> Self {
390 let allocator = Self {
391 arenas: RwLock::new(hashbrown::HashMap::new()),
392 current_epoch: AtomicUsize::new(0),
393 chunk_size,
394 };
395
396 let epoch = EpochId::INITIAL;
398 allocator
399 .arenas
400 .write()
401 .insert(epoch, Arena::with_chunk_size(epoch, chunk_size));
402
403 allocator
404 }
405
406 #[must_use]
408 pub fn current_epoch(&self) -> EpochId {
409 EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
410 }
411
412 pub fn new_epoch(&self) -> EpochId {
414 let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
415 let epoch = EpochId::new(new_id);
416
417 let arena = Arena::with_chunk_size(epoch, self.chunk_size);
418 self.arenas.write().insert(epoch, arena);
419
420 epoch
421 }
422
423 pub fn arena(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
429 parking_lot::RwLockReadGuard::map(self.arenas.read(), |arenas| {
430 arenas.get(&epoch).expect("Epoch should exist")
431 })
432 }
433
434 #[cfg(feature = "tiered-storage")]
437 pub fn ensure_epoch(&self, epoch: EpochId) -> bool {
438 {
440 let arenas = self.arenas.read();
441 if arenas.contains_key(&epoch) {
442 return false;
443 }
444 }
445
446 let mut arenas = self.arenas.write();
448 if arenas.contains_key(&epoch) {
450 return false;
451 }
452
453 let arena = Arena::with_chunk_size(epoch, self.chunk_size);
454 arenas.insert(epoch, arena);
455 true
456 }
457
458 #[cfg(feature = "tiered-storage")]
460 pub fn arena_or_create(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
461 self.ensure_epoch(epoch);
462 self.arena(epoch)
463 }
464
465 pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
467 let epoch = self.current_epoch();
468 let arenas = self.arenas.read();
469 arenas
470 .get(&epoch)
471 .expect("Current epoch exists")
472 .alloc(size, align)
473 }
474
475 pub fn drop_epoch(&self, epoch: EpochId) {
479 self.arenas.write().remove(&epoch);
480 }
481
482 #[must_use]
484 pub fn total_allocated(&self) -> usize {
485 self.arenas
486 .read()
487 .values()
488 .map(Arena::total_allocated)
489 .sum()
490 }
491}
492
493impl Default for ArenaAllocator {
494 fn default() -> Self {
495 Self::new()
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_arena_basic_allocation() {
505 let arena = Arena::new(EpochId::INITIAL);
506
507 let ptr1 = arena.alloc(100, 8);
509 let ptr2 = arena.alloc(200, 8);
510
511 assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
513 }
514
515 #[test]
516 fn test_arena_value_allocation() {
517 let arena = Arena::new(EpochId::INITIAL);
518
519 let value = arena.alloc_value(42u64);
520 assert_eq!(*value, 42);
521
522 *value = 100;
523 assert_eq!(*value, 100);
524 }
525
526 #[test]
527 fn test_arena_slice_allocation() {
528 let arena = Arena::new(EpochId::INITIAL);
529
530 let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]);
531 assert_eq!(slice, &[1, 2, 3, 4, 5]);
532
533 slice[0] = 10;
534 assert_eq!(slice[0], 10);
535 }
536
537 #[test]
538 fn test_arena_large_allocation() {
539 let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024);
540
541 let _ptr = arena.alloc(2048, 8);
543
544 assert!(arena.stats().chunk_count >= 2);
546 }
547
548 #[test]
549 fn test_arena_allocator_epochs() {
550 let allocator = ArenaAllocator::new();
551
552 let epoch0 = allocator.current_epoch();
553 assert_eq!(epoch0, EpochId::INITIAL);
554
555 let epoch1 = allocator.new_epoch();
556 assert_eq!(epoch1, EpochId::new(1));
557
558 let epoch2 = allocator.new_epoch();
559 assert_eq!(epoch2, EpochId::new(2));
560
561 assert_eq!(allocator.current_epoch(), epoch2);
563 }
564
565 #[test]
566 fn test_arena_allocator_allocation() {
567 let allocator = ArenaAllocator::new();
568
569 let ptr1 = allocator.alloc(100, 8);
570 let ptr2 = allocator.alloc(100, 8);
571
572 assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
573 }
574
575 #[test]
576 fn test_arena_drop_epoch() {
577 let allocator = ArenaAllocator::new();
578
579 let initial_mem = allocator.total_allocated();
580
581 let epoch1 = allocator.new_epoch();
582 {
584 let arena = allocator.arena(epoch1);
585 arena.alloc(10000, 8);
586 }
587
588 let after_alloc = allocator.total_allocated();
589 assert!(after_alloc > initial_mem);
590
591 allocator.drop_epoch(epoch1);
593
594 let after_drop = allocator.total_allocated();
596 assert!(after_drop < after_alloc);
597 }
598
599 #[test]
600 fn test_arena_stats() {
601 let arena = Arena::with_chunk_size(EpochId::new(5), 4096);
602
603 let stats = arena.stats();
604 assert_eq!(stats.epoch, EpochId::new(5));
605 assert_eq!(stats.chunk_count, 1);
606 assert_eq!(stats.total_allocated, 4096);
607 assert_eq!(stats.total_used, 0);
608
609 arena.alloc(100, 8);
610 let stats = arena.stats();
611 assert!(stats.total_used >= 100);
612 }
613}
614
615#[cfg(all(test, feature = "tiered-storage"))]
616mod tiered_storage_tests {
617 use super::*;
618
619 #[test]
620 fn test_alloc_value_with_offset_basic() {
621 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
622
623 let (offset1, val1) = arena.alloc_value_with_offset(42u64);
624 let (offset2, val2) = arena.alloc_value_with_offset(100u64);
625
626 assert_eq!(offset1, 0);
628 assert!(offset2 > offset1);
630 assert!(offset2 >= std::mem::size_of::<u64>() as u32);
631
632 assert_eq!(*val1, 42);
634 assert_eq!(*val2, 100);
635
636 *val1 = 999;
638 assert_eq!(*val1, 999);
639 }
640
641 #[test]
642 fn test_read_at_basic() {
643 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
644
645 let (offset, _) = arena.alloc_value_with_offset(12345u64);
646
647 let value: &u64 = unsafe { arena.read_at(offset) };
649 assert_eq!(*value, 12345);
650 }
651
652 #[test]
653 fn test_read_at_mut_basic() {
654 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
655
656 let (offset, _) = arena.alloc_value_with_offset(42u64);
657
658 let value: &mut u64 = unsafe { arena.read_at_mut(offset) };
660 assert_eq!(*value, 42);
661 *value = 100;
662
663 let value: &u64 = unsafe { arena.read_at(offset) };
665 assert_eq!(*value, 100);
666 }
667
668 #[test]
669 fn test_alloc_value_with_offset_struct() {
670 #[derive(Debug, Clone, PartialEq)]
671 struct TestNode {
672 id: u64,
673 name: [u8; 32],
674 value: i32,
675 }
676
677 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
678
679 let node = TestNode {
680 id: 12345,
681 name: [b'A'; 32],
682 value: -999,
683 };
684
685 let (offset, stored) = arena.alloc_value_with_offset(node.clone());
686 assert_eq!(stored.id, 12345);
687 assert_eq!(stored.value, -999);
688
689 let read: &TestNode = unsafe { arena.read_at(offset) };
691 assert_eq!(read.id, node.id);
692 assert_eq!(read.name, node.name);
693 assert_eq!(read.value, node.value);
694 }
695
696 #[test]
697 fn test_alloc_value_with_offset_alignment() {
698 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
699
700 let (offset1, _) = arena.alloc_value_with_offset(1u8);
702 assert_eq!(offset1, 0);
703
704 let (offset2, val) = arena.alloc_value_with_offset(42u64);
706
707 assert_eq!(offset2 % 8, 0);
709 assert_eq!(*val, 42);
710 }
711
712 #[test]
713 fn test_alloc_value_with_offset_multiple() {
714 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
715
716 let mut offsets = Vec::new();
717 for i in 0..100u64 {
718 let (offset, val) = arena.alloc_value_with_offset(i);
719 offsets.push(offset);
720 assert_eq!(*val, i);
721 }
722
723 for window in offsets.windows(2) {
725 assert!(window[0] < window[1]);
726 }
727
728 for (i, offset) in offsets.iter().enumerate() {
730 let val: &u64 = unsafe { arena.read_at(*offset) };
731 assert_eq!(*val, i as u64);
732 }
733 }
734
735 #[test]
736 fn test_arena_allocator_with_offset() {
737 let allocator = ArenaAllocator::with_chunk_size(4096);
738
739 let epoch = allocator.current_epoch();
740 let arena = allocator.arena(epoch);
741
742 let (offset, val) = arena.alloc_value_with_offset(42u64);
743 assert_eq!(*val, 42);
744
745 let read: &u64 = unsafe { arena.read_at(offset) };
746 assert_eq!(*read, 42);
747 }
748
749 #[test]
750 #[cfg(debug_assertions)]
751 #[should_panic(expected = "exceeds chunk used bytes")]
752 fn test_read_at_out_of_bounds() {
753 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
754 let (_offset, _) = arena.alloc_value_with_offset(42u64);
755
756 unsafe {
758 let _: &u64 = arena.read_at(4000);
759 }
760 }
761
762 #[test]
763 #[cfg(debug_assertions)]
764 #[should_panic(expected = "is not aligned")]
765 fn test_read_at_misaligned() {
766 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
767 let (_offset, _) = arena.alloc_value_with_offset(0xFFu8);
769 let _ = arena.alloc_value_with_offset(0u64);
771
772 unsafe {
774 let _: &u64 = arena.read_at(1);
775 }
776 }
777
778 #[test]
779 #[cfg(not(miri))] fn test_concurrent_read_stress() {
781 use std::sync::Arc;
782
783 let arena = Arc::new(Arena::with_chunk_size(EpochId::INITIAL, 1024 * 1024));
784 let num_threads = 8;
785 let values_per_thread = 1000;
786
787 let mut all_offsets = Vec::new();
789 for t in 0..num_threads {
790 let base = (t * values_per_thread) as u64;
791 let mut offsets = Vec::with_capacity(values_per_thread);
792 for i in 0..values_per_thread as u64 {
793 let (offset, _) = arena.alloc_value_with_offset(base + i);
794 offsets.push(offset);
795 }
796 all_offsets.push(offsets);
797 }
798
799 let mut handles = Vec::new();
801 for (t, offsets) in all_offsets.into_iter().enumerate() {
802 let arena = Arc::clone(&arena);
803 let base = (t * values_per_thread) as u64;
804 handles.push(std::thread::spawn(move || {
805 for (i, offset) in offsets.iter().enumerate() {
806 let val: &u64 = unsafe { arena.read_at(*offset) };
807 assert_eq!(*val, base + i as u64);
808 }
809 }));
810 }
811
812 for handle in handles {
813 handle.join().expect("Thread panicked");
814 }
815 }
816
817 #[test]
818 fn test_multi_type_interleaved() {
819 #[derive(Debug, Clone, PartialEq)]
820 #[repr(C)]
821 struct Record {
822 id: u64,
823 flags: u32,
824 weight: f32,
825 }
826
827 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
828
829 let (off_u8, _) = arena.alloc_value_with_offset(0xAAu8);
831 let (off_u32, _) = arena.alloc_value_with_offset(0xBBBBu32);
832 let (off_u64, _) = arena.alloc_value_with_offset(0xCCCCCCCCu64);
833 let (off_rec, _) = arena.alloc_value_with_offset(Record {
834 id: 42,
835 flags: 0xFF,
836 weight: 3.14,
837 });
838
839 unsafe {
841 assert_eq!(*arena.read_at::<u8>(off_u8), 0xAA);
842 assert_eq!(*arena.read_at::<u32>(off_u32), 0xBBBB);
843 assert_eq!(*arena.read_at::<u64>(off_u64), 0xCCCCCCCC);
844
845 let rec: &Record = arena.read_at(off_rec);
846 assert_eq!(rec.id, 42);
847 assert_eq!(rec.flags, 0xFF);
848 assert!((rec.weight - 3.14).abs() < 0.001);
849 }
850 }
851}