1#![allow(unsafe_code)]
12
13use std::alloc::{Layout, alloc, dealloc};
14use std::ptr::NonNull;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17use parking_lot::RwLock;
18
19use crate::types::EpochId;
20
21const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
23
24struct Chunk {
26 ptr: NonNull<u8>,
28 capacity: usize,
30 offset: AtomicUsize,
32}
33
34impl Chunk {
35 fn new(capacity: usize) -> Self {
37 let layout = Layout::from_size_align(capacity, 16).expect("Invalid layout");
38 let ptr = unsafe { alloc(layout) };
40 let ptr = NonNull::new(ptr).expect("Allocation failed");
41
42 Self {
43 ptr,
44 capacity,
45 offset: AtomicUsize::new(0),
46 }
47 }
48
49 fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
52 self.try_alloc_with_offset(size, align).map(|(_, ptr)| ptr)
53 }
54
55 fn try_alloc_with_offset(&self, size: usize, align: usize) -> Option<(u32, NonNull<u8>)> {
59 loop {
60 let current = self.offset.load(Ordering::Relaxed);
61
62 let aligned = (current + align - 1) & !(align - 1);
64 let new_offset = aligned + size;
65
66 if new_offset > self.capacity {
67 return None;
68 }
69
70 match self.offset.compare_exchange_weak(
72 current,
73 new_offset,
74 Ordering::AcqRel,
75 Ordering::Relaxed,
76 ) {
77 Ok(_) => {
78 let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
80 return Some((aligned as u32, NonNull::new(ptr)?));
81 }
82 Err(_) => continue, }
84 }
85 }
86
87 fn used(&self) -> usize {
89 self.offset.load(Ordering::Relaxed)
90 }
91}
92
93impl Drop for Chunk {
94 fn drop(&mut self) {
95 let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
96 unsafe { dealloc(self.ptr.as_ptr(), layout) };
98 }
99}
100
101unsafe impl Send for Chunk {}
103unsafe impl Sync for Chunk {}
104
105pub struct Arena {
113 epoch: EpochId,
115 chunks: RwLock<Vec<Chunk>>,
117 chunk_size: usize,
119 total_allocated: AtomicUsize,
121}
122
123impl Arena {
124 #[must_use]
126 pub fn new(epoch: EpochId) -> Self {
127 Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
128 }
129
130 #[must_use]
132 pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Self {
133 let initial_chunk = Chunk::new(chunk_size);
134 Self {
135 epoch,
136 chunks: RwLock::new(vec![initial_chunk]),
137 chunk_size,
138 total_allocated: AtomicUsize::new(chunk_size),
139 }
140 }
141
142 #[must_use]
144 pub fn epoch(&self) -> EpochId {
145 self.epoch
146 }
147
148 pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
154 {
156 let chunks = self.chunks.read();
157 for chunk in chunks.iter().rev() {
158 if let Some(ptr) = chunk.try_alloc(size, align) {
159 return ptr;
160 }
161 }
162 }
163
164 self.alloc_new_chunk(size, align)
166 }
167
168 pub fn alloc_value<T>(&self, value: T) -> &mut T {
170 let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>());
171 unsafe {
173 let typed_ptr = ptr.as_ptr() as *mut T;
174 typed_ptr.write(value);
175 &mut *typed_ptr
176 }
177 }
178
179 pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> &mut [T] {
181 if values.is_empty() {
182 return &mut [];
183 }
184
185 let size = std::mem::size_of::<T>() * values.len();
186 let align = std::mem::align_of::<T>();
187 let ptr = self.alloc(size, align);
188
189 unsafe {
191 let typed_ptr = ptr.as_ptr() as *mut T;
192 std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
193 std::slice::from_raw_parts_mut(typed_ptr, values.len())
194 }
195 }
196
197 #[cfg(feature = "tiered-storage")]
207 pub fn alloc_value_with_offset<T>(&self, value: T) -> (u32, &mut T) {
208 let size = std::mem::size_of::<T>();
209 let align = std::mem::align_of::<T>();
210
211 let chunks = self.chunks.read();
213 let chunk = chunks
214 .first()
215 .expect("Arena should have at least one chunk");
216
217 let (offset, ptr) = chunk
218 .try_alloc_with_offset(size, align)
219 .expect("Allocation would create new chunk - increase chunk size");
220
221 unsafe {
223 let typed_ptr = ptr.as_ptr().cast::<T>();
224 typed_ptr.write(value);
225 (offset, &mut *typed_ptr)
226 }
227 }
228
229 #[cfg(feature = "tiered-storage")]
237 pub unsafe fn read_at<T>(&self, offset: u32) -> &T {
238 let chunks = self.chunks.read();
239 let chunk = chunks
240 .first()
241 .expect("Arena should have at least one chunk");
242
243 debug_assert!(
244 (offset as usize) + std::mem::size_of::<T>() <= chunk.used(),
245 "read_at: offset {} + size_of::<{}>() = {} exceeds chunk used bytes {}",
246 offset,
247 std::any::type_name::<T>(),
248 (offset as usize) + std::mem::size_of::<T>(),
249 chunk.used()
250 );
251 debug_assert!(
252 (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
253 "read_at: offset {} is not aligned for {} (alignment {})",
254 offset,
255 std::any::type_name::<T>(),
256 std::mem::align_of::<T>()
257 );
258
259 unsafe {
261 let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
262 &*ptr
263 }
264 }
265
266 #[cfg(feature = "tiered-storage")]
275 pub unsafe fn read_at_mut<T>(&self, offset: u32) -> &mut T {
276 let chunks = self.chunks.read();
277 let chunk = chunks
278 .first()
279 .expect("Arena should have at least one chunk");
280
281 debug_assert!(
282 (offset as usize) + std::mem::size_of::<T>() <= chunk.capacity,
283 "read_at_mut: offset {} + size_of::<{}>() = {} exceeds chunk capacity {}",
284 offset,
285 std::any::type_name::<T>(),
286 (offset as usize) + std::mem::size_of::<T>(),
287 chunk.capacity
288 );
289 debug_assert!(
290 (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
291 "read_at_mut: offset {} is not aligned for {} (alignment {})",
292 offset,
293 std::any::type_name::<T>(),
294 std::mem::align_of::<T>()
295 );
296
297 unsafe {
299 let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
300 &mut *ptr
301 }
302 }
303
304 fn alloc_new_chunk(&self, size: usize, align: usize) -> NonNull<u8> {
306 let chunk_size = self.chunk_size.max(size + align);
307 let chunk = Chunk::new(chunk_size);
308
309 self.total_allocated
310 .fetch_add(chunk_size, Ordering::Relaxed);
311
312 let ptr = chunk
313 .try_alloc(size, align)
314 .expect("Fresh chunk should have space");
315
316 let mut chunks = self.chunks.write();
317 chunks.push(chunk);
318
319 ptr
320 }
321
322 #[must_use]
324 pub fn total_allocated(&self) -> usize {
325 self.total_allocated.load(Ordering::Relaxed)
326 }
327
328 #[must_use]
330 pub fn total_used(&self) -> usize {
331 let chunks = self.chunks.read();
332 chunks.iter().map(Chunk::used).sum()
333 }
334
335 #[must_use]
337 pub fn stats(&self) -> ArenaStats {
338 let chunks = self.chunks.read();
339 ArenaStats {
340 epoch: self.epoch,
341 chunk_count: chunks.len(),
342 total_allocated: self.total_allocated.load(Ordering::Relaxed),
343 total_used: chunks.iter().map(Chunk::used).sum(),
344 }
345 }
346}
347
348#[derive(Debug, Clone)]
350pub struct ArenaStats {
351 pub epoch: EpochId,
353 pub chunk_count: usize,
355 pub total_allocated: usize,
357 pub total_used: usize,
359}
360
361pub struct ArenaAllocator {
366 arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
368 current_epoch: AtomicUsize,
370 chunk_size: usize,
372}
373
374impl ArenaAllocator {
375 #[must_use]
377 pub fn new() -> Self {
378 Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
379 }
380
381 #[must_use]
383 pub fn with_chunk_size(chunk_size: usize) -> Self {
384 let allocator = Self {
385 arenas: RwLock::new(hashbrown::HashMap::new()),
386 current_epoch: AtomicUsize::new(0),
387 chunk_size,
388 };
389
390 let epoch = EpochId::INITIAL;
392 allocator
393 .arenas
394 .write()
395 .insert(epoch, Arena::with_chunk_size(epoch, chunk_size));
396
397 allocator
398 }
399
400 #[must_use]
402 pub fn current_epoch(&self) -> EpochId {
403 EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
404 }
405
406 pub fn new_epoch(&self) -> EpochId {
408 let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
409 let epoch = EpochId::new(new_id);
410
411 let arena = Arena::with_chunk_size(epoch, self.chunk_size);
412 self.arenas.write().insert(epoch, arena);
413
414 epoch
415 }
416
417 pub fn arena(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
423 parking_lot::RwLockReadGuard::map(self.arenas.read(), |arenas| {
424 arenas.get(&epoch).expect("Epoch should exist")
425 })
426 }
427
428 #[cfg(feature = "tiered-storage")]
431 pub fn ensure_epoch(&self, epoch: EpochId) -> bool {
432 {
434 let arenas = self.arenas.read();
435 if arenas.contains_key(&epoch) {
436 return false;
437 }
438 }
439
440 let mut arenas = self.arenas.write();
442 if arenas.contains_key(&epoch) {
444 return false;
445 }
446
447 let arena = Arena::with_chunk_size(epoch, self.chunk_size);
448 arenas.insert(epoch, arena);
449 true
450 }
451
452 #[cfg(feature = "tiered-storage")]
454 pub fn arena_or_create(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
455 self.ensure_epoch(epoch);
456 self.arena(epoch)
457 }
458
459 pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
461 let epoch = self.current_epoch();
462 let arenas = self.arenas.read();
463 arenas
464 .get(&epoch)
465 .expect("Current epoch exists")
466 .alloc(size, align)
467 }
468
469 pub fn drop_epoch(&self, epoch: EpochId) {
473 self.arenas.write().remove(&epoch);
474 }
475
476 #[must_use]
478 pub fn total_allocated(&self) -> usize {
479 self.arenas
480 .read()
481 .values()
482 .map(Arena::total_allocated)
483 .sum()
484 }
485}
486
487impl Default for ArenaAllocator {
488 fn default() -> Self {
489 Self::new()
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 #[test]
498 fn test_arena_basic_allocation() {
499 let arena = Arena::new(EpochId::INITIAL);
500
501 let ptr1 = arena.alloc(100, 8);
503 let ptr2 = arena.alloc(200, 8);
504
505 assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
507 }
508
509 #[test]
510 fn test_arena_value_allocation() {
511 let arena = Arena::new(EpochId::INITIAL);
512
513 let value = arena.alloc_value(42u64);
514 assert_eq!(*value, 42);
515
516 *value = 100;
517 assert_eq!(*value, 100);
518 }
519
520 #[test]
521 fn test_arena_slice_allocation() {
522 let arena = Arena::new(EpochId::INITIAL);
523
524 let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]);
525 assert_eq!(slice, &[1, 2, 3, 4, 5]);
526
527 slice[0] = 10;
528 assert_eq!(slice[0], 10);
529 }
530
531 #[test]
532 fn test_arena_large_allocation() {
533 let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024);
534
535 let _ptr = arena.alloc(2048, 8);
537
538 assert!(arena.stats().chunk_count >= 2);
540 }
541
542 #[test]
543 fn test_arena_allocator_epochs() {
544 let allocator = ArenaAllocator::new();
545
546 let epoch0 = allocator.current_epoch();
547 assert_eq!(epoch0, EpochId::INITIAL);
548
549 let epoch1 = allocator.new_epoch();
550 assert_eq!(epoch1, EpochId::new(1));
551
552 let epoch2 = allocator.new_epoch();
553 assert_eq!(epoch2, EpochId::new(2));
554
555 assert_eq!(allocator.current_epoch(), epoch2);
557 }
558
559 #[test]
560 fn test_arena_allocator_allocation() {
561 let allocator = ArenaAllocator::new();
562
563 let ptr1 = allocator.alloc(100, 8);
564 let ptr2 = allocator.alloc(100, 8);
565
566 assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
567 }
568
569 #[test]
570 fn test_arena_drop_epoch() {
571 let allocator = ArenaAllocator::new();
572
573 let initial_mem = allocator.total_allocated();
574
575 let epoch1 = allocator.new_epoch();
576 {
578 let arena = allocator.arena(epoch1);
579 arena.alloc(10000, 8);
580 }
581
582 let after_alloc = allocator.total_allocated();
583 assert!(after_alloc > initial_mem);
584
585 allocator.drop_epoch(epoch1);
587
588 let after_drop = allocator.total_allocated();
590 assert!(after_drop < after_alloc);
591 }
592
593 #[test]
594 fn test_arena_stats() {
595 let arena = Arena::with_chunk_size(EpochId::new(5), 4096);
596
597 let stats = arena.stats();
598 assert_eq!(stats.epoch, EpochId::new(5));
599 assert_eq!(stats.chunk_count, 1);
600 assert_eq!(stats.total_allocated, 4096);
601 assert_eq!(stats.total_used, 0);
602
603 arena.alloc(100, 8);
604 let stats = arena.stats();
605 assert!(stats.total_used >= 100);
606 }
607}
608
609#[cfg(all(test, feature = "tiered-storage"))]
610mod tiered_storage_tests {
611 use super::*;
612
613 #[test]
614 fn test_alloc_value_with_offset_basic() {
615 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
616
617 let (offset1, val1) = arena.alloc_value_with_offset(42u64);
618 let (offset2, val2) = arena.alloc_value_with_offset(100u64);
619
620 assert_eq!(offset1, 0);
622 assert!(offset2 > offset1);
624 assert!(offset2 >= std::mem::size_of::<u64>() as u32);
625
626 assert_eq!(*val1, 42);
628 assert_eq!(*val2, 100);
629
630 *val1 = 999;
632 assert_eq!(*val1, 999);
633 }
634
635 #[test]
636 fn test_read_at_basic() {
637 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
638
639 let (offset, _) = arena.alloc_value_with_offset(12345u64);
640
641 let value: &u64 = unsafe { arena.read_at(offset) };
643 assert_eq!(*value, 12345);
644 }
645
646 #[test]
647 fn test_read_at_mut_basic() {
648 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
649
650 let (offset, _) = arena.alloc_value_with_offset(42u64);
651
652 let value: &mut u64 = unsafe { arena.read_at_mut(offset) };
654 assert_eq!(*value, 42);
655 *value = 100;
656
657 let value: &u64 = unsafe { arena.read_at(offset) };
659 assert_eq!(*value, 100);
660 }
661
662 #[test]
663 fn test_alloc_value_with_offset_struct() {
664 #[derive(Debug, Clone, PartialEq)]
665 struct TestNode {
666 id: u64,
667 name: [u8; 32],
668 value: i32,
669 }
670
671 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
672
673 let node = TestNode {
674 id: 12345,
675 name: [b'A'; 32],
676 value: -999,
677 };
678
679 let (offset, stored) = arena.alloc_value_with_offset(node.clone());
680 assert_eq!(stored.id, 12345);
681 assert_eq!(stored.value, -999);
682
683 let read: &TestNode = unsafe { arena.read_at(offset) };
685 assert_eq!(read.id, node.id);
686 assert_eq!(read.name, node.name);
687 assert_eq!(read.value, node.value);
688 }
689
690 #[test]
691 fn test_alloc_value_with_offset_alignment() {
692 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
693
694 let (offset1, _) = arena.alloc_value_with_offset(1u8);
696 assert_eq!(offset1, 0);
697
698 let (offset2, val) = arena.alloc_value_with_offset(42u64);
700
701 assert_eq!(offset2 % 8, 0);
703 assert_eq!(*val, 42);
704 }
705
706 #[test]
707 fn test_alloc_value_with_offset_multiple() {
708 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
709
710 let mut offsets = Vec::new();
711 for i in 0..100u64 {
712 let (offset, val) = arena.alloc_value_with_offset(i);
713 offsets.push(offset);
714 assert_eq!(*val, i);
715 }
716
717 for window in offsets.windows(2) {
719 assert!(window[0] < window[1]);
720 }
721
722 for (i, offset) in offsets.iter().enumerate() {
724 let val: &u64 = unsafe { arena.read_at(*offset) };
725 assert_eq!(*val, i as u64);
726 }
727 }
728
729 #[test]
730 fn test_arena_allocator_with_offset() {
731 let allocator = ArenaAllocator::with_chunk_size(4096);
732
733 let epoch = allocator.current_epoch();
734 let arena = allocator.arena(epoch);
735
736 let (offset, val) = arena.alloc_value_with_offset(42u64);
737 assert_eq!(*val, 42);
738
739 let read: &u64 = unsafe { arena.read_at(offset) };
740 assert_eq!(*read, 42);
741 }
742
743 #[test]
744 #[cfg(debug_assertions)]
745 #[should_panic(expected = "exceeds chunk used bytes")]
746 fn test_read_at_out_of_bounds() {
747 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
748 let (_offset, _) = arena.alloc_value_with_offset(42u64);
749
750 unsafe {
752 let _: &u64 = arena.read_at(4000);
753 }
754 }
755
756 #[test]
757 #[cfg(debug_assertions)]
758 #[should_panic(expected = "is not aligned")]
759 fn test_read_at_misaligned() {
760 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
761 let (_offset, _) = arena.alloc_value_with_offset(0xFFu8);
763 let _ = arena.alloc_value_with_offset(0u64);
765
766 unsafe {
768 let _: &u64 = arena.read_at(1);
769 }
770 }
771
772 #[test]
773 #[cfg(not(miri))] fn test_concurrent_read_stress() {
775 use std::sync::Arc;
776
777 let arena = Arc::new(Arena::with_chunk_size(EpochId::INITIAL, 1024 * 1024));
778 let num_threads = 8;
779 let values_per_thread = 1000;
780
781 let mut all_offsets = Vec::new();
783 for t in 0..num_threads {
784 let base = (t * values_per_thread) as u64;
785 let mut offsets = Vec::with_capacity(values_per_thread);
786 for i in 0..values_per_thread as u64 {
787 let (offset, _) = arena.alloc_value_with_offset(base + i);
788 offsets.push(offset);
789 }
790 all_offsets.push(offsets);
791 }
792
793 let mut handles = Vec::new();
795 for (t, offsets) in all_offsets.into_iter().enumerate() {
796 let arena = Arc::clone(&arena);
797 let base = (t * values_per_thread) as u64;
798 handles.push(std::thread::spawn(move || {
799 for (i, offset) in offsets.iter().enumerate() {
800 let val: &u64 = unsafe { arena.read_at(*offset) };
801 assert_eq!(*val, base + i as u64);
802 }
803 }));
804 }
805
806 for handle in handles {
807 handle.join().expect("Thread panicked");
808 }
809 }
810
811 #[test]
812 fn test_multi_type_interleaved() {
813 #[derive(Debug, Clone, PartialEq)]
814 #[repr(C)]
815 struct Record {
816 id: u64,
817 flags: u32,
818 weight: f32,
819 }
820
821 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
822
823 let (off_u8, _) = arena.alloc_value_with_offset(0xAAu8);
825 let (off_u32, _) = arena.alloc_value_with_offset(0xBBBBu32);
826 let (off_u64, _) = arena.alloc_value_with_offset(0xCCCCCCCCu64);
827 let (off_rec, _) = arena.alloc_value_with_offset(Record {
828 id: 42,
829 flags: 0xFF,
830 weight: std::f32::consts::PI,
831 });
832
833 unsafe {
835 assert_eq!(*arena.read_at::<u8>(off_u8), 0xAA);
836 assert_eq!(*arena.read_at::<u32>(off_u32), 0xBBBB);
837 assert_eq!(*arena.read_at::<u64>(off_u64), 0xCCCCCCCC);
838
839 let rec: &Record = arena.read_at(off_rec);
840 assert_eq!(rec.id, 42);
841 assert_eq!(rec.flags, 0xFF);
842 assert!((rec.weight - std::f32::consts::PI).abs() < 0.001);
843 }
844 }
845}