1#![allow(unsafe_code)]
12
13use std::alloc::{Layout, alloc, dealloc};
14use std::ptr::NonNull;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17use parking_lot::RwLock;
18
19use crate::types::EpochId;
20
21const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
23
24struct Chunk {
26 ptr: NonNull<u8>,
28 capacity: usize,
30 offset: AtomicUsize,
32}
33
34impl Chunk {
35 fn new(capacity: usize) -> Self {
37 let layout = Layout::from_size_align(capacity, 16).expect("Invalid layout");
38 let ptr = unsafe { alloc(layout) };
40 let ptr = NonNull::new(ptr).expect("Allocation failed");
41
42 Self {
43 ptr,
44 capacity,
45 offset: AtomicUsize::new(0),
46 }
47 }
48
49 fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
52 self.try_alloc_with_offset(size, align).map(|(_, ptr)| ptr)
53 }
54
55 fn try_alloc_with_offset(&self, size: usize, align: usize) -> Option<(u32, NonNull<u8>)> {
59 loop {
60 let current = self.offset.load(Ordering::Relaxed);
61
62 let aligned = (current + align - 1) & !(align - 1);
64 let new_offset = aligned + size;
65
66 if new_offset > self.capacity {
67 return None;
68 }
69
70 match self.offset.compare_exchange_weak(
72 current,
73 new_offset,
74 Ordering::AcqRel,
75 Ordering::Relaxed,
76 ) {
77 Ok(_) => {
78 let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
80 return Some((aligned as u32, NonNull::new(ptr)?));
81 }
82 Err(_) => continue, }
84 }
85 }
86
87 fn used(&self) -> usize {
89 self.offset.load(Ordering::Relaxed)
90 }
91
92 #[allow(dead_code)]
94 fn remaining(&self) -> usize {
95 self.capacity - self.used()
96 }
97}
98
99impl Drop for Chunk {
100 fn drop(&mut self) {
101 let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
102 unsafe { dealloc(self.ptr.as_ptr(), layout) };
104 }
105}
106
107unsafe impl Send for Chunk {}
109unsafe impl Sync for Chunk {}
110
111pub struct Arena {
119 epoch: EpochId,
121 chunks: RwLock<Vec<Chunk>>,
123 chunk_size: usize,
125 total_allocated: AtomicUsize,
127}
128
129impl Arena {
130 #[must_use]
132 pub fn new(epoch: EpochId) -> Self {
133 Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
134 }
135
136 #[must_use]
138 pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Self {
139 let initial_chunk = Chunk::new(chunk_size);
140 Self {
141 epoch,
142 chunks: RwLock::new(vec![initial_chunk]),
143 chunk_size,
144 total_allocated: AtomicUsize::new(chunk_size),
145 }
146 }
147
148 #[must_use]
150 pub fn epoch(&self) -> EpochId {
151 self.epoch
152 }
153
154 pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
160 {
162 let chunks = self.chunks.read();
163 for chunk in chunks.iter().rev() {
164 if let Some(ptr) = chunk.try_alloc(size, align) {
165 return ptr;
166 }
167 }
168 }
169
170 self.alloc_new_chunk(size, align)
172 }
173
174 pub fn alloc_value<T>(&self, value: T) -> &mut T {
176 let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>());
177 unsafe {
179 let typed_ptr = ptr.as_ptr() as *mut T;
180 typed_ptr.write(value);
181 &mut *typed_ptr
182 }
183 }
184
185 pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> &mut [T] {
187 if values.is_empty() {
188 return &mut [];
189 }
190
191 let size = std::mem::size_of::<T>() * values.len();
192 let align = std::mem::align_of::<T>();
193 let ptr = self.alloc(size, align);
194
195 unsafe {
197 let typed_ptr = ptr.as_ptr() as *mut T;
198 std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
199 std::slice::from_raw_parts_mut(typed_ptr, values.len())
200 }
201 }
202
203 #[cfg(feature = "tiered-storage")]
213 pub fn alloc_value_with_offset<T>(&self, value: T) -> (u32, &mut T) {
214 let size = std::mem::size_of::<T>();
215 let align = std::mem::align_of::<T>();
216
217 let chunks = self.chunks.read();
219 let chunk = chunks
220 .first()
221 .expect("Arena should have at least one chunk");
222
223 let (offset, ptr) = chunk
224 .try_alloc_with_offset(size, align)
225 .expect("Allocation would create new chunk - increase chunk size");
226
227 unsafe {
229 let typed_ptr = ptr.as_ptr().cast::<T>();
230 typed_ptr.write(value);
231 (offset, &mut *typed_ptr)
232 }
233 }
234
235 #[cfg(feature = "tiered-storage")]
243 pub unsafe fn read_at<T>(&self, offset: u32) -> &T {
244 let chunks = self.chunks.read();
245 let chunk = chunks
246 .first()
247 .expect("Arena should have at least one chunk");
248 unsafe {
250 let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
251 &*ptr
252 }
253 }
254
255 #[cfg(feature = "tiered-storage")]
264 pub unsafe fn read_at_mut<T>(&self, offset: u32) -> &mut T {
265 let chunks = self.chunks.read();
266 let chunk = chunks
267 .first()
268 .expect("Arena should have at least one chunk");
269 unsafe {
271 let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
272 &mut *ptr
273 }
274 }
275
276 fn alloc_new_chunk(&self, size: usize, align: usize) -> NonNull<u8> {
278 let chunk_size = self.chunk_size.max(size + align);
279 let chunk = Chunk::new(chunk_size);
280
281 self.total_allocated
282 .fetch_add(chunk_size, Ordering::Relaxed);
283
284 let ptr = chunk
285 .try_alloc(size, align)
286 .expect("Fresh chunk should have space");
287
288 let mut chunks = self.chunks.write();
289 chunks.push(chunk);
290
291 ptr
292 }
293
294 #[must_use]
296 pub fn total_allocated(&self) -> usize {
297 self.total_allocated.load(Ordering::Relaxed)
298 }
299
300 #[must_use]
302 pub fn total_used(&self) -> usize {
303 let chunks = self.chunks.read();
304 chunks.iter().map(Chunk::used).sum()
305 }
306
307 #[must_use]
309 pub fn stats(&self) -> ArenaStats {
310 let chunks = self.chunks.read();
311 ArenaStats {
312 epoch: self.epoch,
313 chunk_count: chunks.len(),
314 total_allocated: self.total_allocated.load(Ordering::Relaxed),
315 total_used: chunks.iter().map(Chunk::used).sum(),
316 }
317 }
318}
319
320#[derive(Debug, Clone)]
322pub struct ArenaStats {
323 pub epoch: EpochId,
325 pub chunk_count: usize,
327 pub total_allocated: usize,
329 pub total_used: usize,
331}
332
333pub struct ArenaAllocator {
338 arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
340 current_epoch: AtomicUsize,
342 chunk_size: usize,
344}
345
346impl ArenaAllocator {
347 #[must_use]
349 pub fn new() -> Self {
350 Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
351 }
352
353 #[must_use]
355 pub fn with_chunk_size(chunk_size: usize) -> Self {
356 let allocator = Self {
357 arenas: RwLock::new(hashbrown::HashMap::new()),
358 current_epoch: AtomicUsize::new(0),
359 chunk_size,
360 };
361
362 let epoch = EpochId::INITIAL;
364 allocator
365 .arenas
366 .write()
367 .insert(epoch, Arena::with_chunk_size(epoch, chunk_size));
368
369 allocator
370 }
371
372 #[must_use]
374 pub fn current_epoch(&self) -> EpochId {
375 EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
376 }
377
378 pub fn new_epoch(&self) -> EpochId {
380 let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
381 let epoch = EpochId::new(new_id);
382
383 let arena = Arena::with_chunk_size(epoch, self.chunk_size);
384 self.arenas.write().insert(epoch, arena);
385
386 epoch
387 }
388
389 pub fn arena(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
395 parking_lot::RwLockReadGuard::map(self.arenas.read(), |arenas| {
396 arenas.get(&epoch).expect("Epoch should exist")
397 })
398 }
399
400 #[cfg(feature = "tiered-storage")]
403 pub fn ensure_epoch(&self, epoch: EpochId) -> bool {
404 {
406 let arenas = self.arenas.read();
407 if arenas.contains_key(&epoch) {
408 return false;
409 }
410 }
411
412 let mut arenas = self.arenas.write();
414 if arenas.contains_key(&epoch) {
416 return false;
417 }
418
419 let arena = Arena::with_chunk_size(epoch, self.chunk_size);
420 arenas.insert(epoch, arena);
421 true
422 }
423
424 #[cfg(feature = "tiered-storage")]
426 pub fn arena_or_create(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
427 self.ensure_epoch(epoch);
428 self.arena(epoch)
429 }
430
431 pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
433 let epoch = self.current_epoch();
434 let arenas = self.arenas.read();
435 arenas
436 .get(&epoch)
437 .expect("Current epoch exists")
438 .alloc(size, align)
439 }
440
441 pub fn drop_epoch(&self, epoch: EpochId) {
445 self.arenas.write().remove(&epoch);
446 }
447
448 #[must_use]
450 pub fn total_allocated(&self) -> usize {
451 self.arenas
452 .read()
453 .values()
454 .map(Arena::total_allocated)
455 .sum()
456 }
457}
458
459impl Default for ArenaAllocator {
460 fn default() -> Self {
461 Self::new()
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468
469 #[test]
470 fn test_arena_basic_allocation() {
471 let arena = Arena::new(EpochId::INITIAL);
472
473 let ptr1 = arena.alloc(100, 8);
475 let ptr2 = arena.alloc(200, 8);
476
477 assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
479 }
480
481 #[test]
482 fn test_arena_value_allocation() {
483 let arena = Arena::new(EpochId::INITIAL);
484
485 let value = arena.alloc_value(42u64);
486 assert_eq!(*value, 42);
487
488 *value = 100;
489 assert_eq!(*value, 100);
490 }
491
492 #[test]
493 fn test_arena_slice_allocation() {
494 let arena = Arena::new(EpochId::INITIAL);
495
496 let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]);
497 assert_eq!(slice, &[1, 2, 3, 4, 5]);
498
499 slice[0] = 10;
500 assert_eq!(slice[0], 10);
501 }
502
503 #[test]
504 fn test_arena_large_allocation() {
505 let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024);
506
507 let _ptr = arena.alloc(2048, 8);
509
510 assert!(arena.stats().chunk_count >= 2);
512 }
513
514 #[test]
515 fn test_arena_allocator_epochs() {
516 let allocator = ArenaAllocator::new();
517
518 let epoch0 = allocator.current_epoch();
519 assert_eq!(epoch0, EpochId::INITIAL);
520
521 let epoch1 = allocator.new_epoch();
522 assert_eq!(epoch1, EpochId::new(1));
523
524 let epoch2 = allocator.new_epoch();
525 assert_eq!(epoch2, EpochId::new(2));
526
527 assert_eq!(allocator.current_epoch(), epoch2);
529 }
530
531 #[test]
532 fn test_arena_allocator_allocation() {
533 let allocator = ArenaAllocator::new();
534
535 let ptr1 = allocator.alloc(100, 8);
536 let ptr2 = allocator.alloc(100, 8);
537
538 assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
539 }
540
541 #[test]
542 fn test_arena_drop_epoch() {
543 let allocator = ArenaAllocator::new();
544
545 let initial_mem = allocator.total_allocated();
546
547 let epoch1 = allocator.new_epoch();
548 {
550 let arena = allocator.arena(epoch1);
551 arena.alloc(10000, 8);
552 }
553
554 let after_alloc = allocator.total_allocated();
555 assert!(after_alloc > initial_mem);
556
557 allocator.drop_epoch(epoch1);
559
560 let after_drop = allocator.total_allocated();
562 assert!(after_drop < after_alloc);
563 }
564
565 #[test]
566 fn test_arena_stats() {
567 let arena = Arena::with_chunk_size(EpochId::new(5), 4096);
568
569 let stats = arena.stats();
570 assert_eq!(stats.epoch, EpochId::new(5));
571 assert_eq!(stats.chunk_count, 1);
572 assert_eq!(stats.total_allocated, 4096);
573 assert_eq!(stats.total_used, 0);
574
575 arena.alloc(100, 8);
576 let stats = arena.stats();
577 assert!(stats.total_used >= 100);
578 }
579}
580
581#[cfg(all(test, feature = "tiered-storage"))]
582mod tiered_storage_tests {
583 use super::*;
584
585 #[test]
586 fn test_alloc_value_with_offset_basic() {
587 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
588
589 let (offset1, val1) = arena.alloc_value_with_offset(42u64);
590 let (offset2, val2) = arena.alloc_value_with_offset(100u64);
591
592 assert_eq!(offset1, 0);
594 assert!(offset2 > offset1);
596 assert!(offset2 >= std::mem::size_of::<u64>() as u32);
597
598 assert_eq!(*val1, 42);
600 assert_eq!(*val2, 100);
601
602 *val1 = 999;
604 assert_eq!(*val1, 999);
605 }
606
607 #[test]
608 fn test_read_at_basic() {
609 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
610
611 let (offset, _) = arena.alloc_value_with_offset(12345u64);
612
613 let value: &u64 = unsafe { arena.read_at(offset) };
615 assert_eq!(*value, 12345);
616 }
617
618 #[test]
619 fn test_read_at_mut_basic() {
620 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
621
622 let (offset, _) = arena.alloc_value_with_offset(42u64);
623
624 let value: &mut u64 = unsafe { arena.read_at_mut(offset) };
626 assert_eq!(*value, 42);
627 *value = 100;
628
629 let value: &u64 = unsafe { arena.read_at(offset) };
631 assert_eq!(*value, 100);
632 }
633
634 #[test]
635 fn test_alloc_value_with_offset_struct() {
636 #[derive(Debug, Clone, PartialEq)]
637 struct TestNode {
638 id: u64,
639 name: [u8; 32],
640 value: i32,
641 }
642
643 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
644
645 let node = TestNode {
646 id: 12345,
647 name: [b'A'; 32],
648 value: -999,
649 };
650
651 let (offset, stored) = arena.alloc_value_with_offset(node.clone());
652 assert_eq!(stored.id, 12345);
653 assert_eq!(stored.value, -999);
654
655 let read: &TestNode = unsafe { arena.read_at(offset) };
657 assert_eq!(read.id, node.id);
658 assert_eq!(read.name, node.name);
659 assert_eq!(read.value, node.value);
660 }
661
662 #[test]
663 fn test_alloc_value_with_offset_alignment() {
664 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
665
666 let (offset1, _) = arena.alloc_value_with_offset(1u8);
668 assert_eq!(offset1, 0);
669
670 let (offset2, val) = arena.alloc_value_with_offset(42u64);
672
673 assert_eq!(offset2 % 8, 0);
675 assert_eq!(*val, 42);
676 }
677
678 #[test]
679 fn test_alloc_value_with_offset_multiple() {
680 let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
681
682 let mut offsets = Vec::new();
683 for i in 0..100u64 {
684 let (offset, val) = arena.alloc_value_with_offset(i);
685 offsets.push(offset);
686 assert_eq!(*val, i);
687 }
688
689 for window in offsets.windows(2) {
691 assert!(window[0] < window[1]);
692 }
693
694 for (i, offset) in offsets.iter().enumerate() {
696 let val: &u64 = unsafe { arena.read_at(*offset) };
697 assert_eq!(*val, i as u64);
698 }
699 }
700
701 #[test]
702 fn test_arena_allocator_with_offset() {
703 let allocator = ArenaAllocator::with_chunk_size(4096);
704
705 let epoch = allocator.current_epoch();
706 let arena = allocator.arena(epoch);
707
708 let (offset, val) = arena.alloc_value_with_offset(42u64);
709 assert_eq!(*val, 42);
710
711 let read: &u64 = unsafe { arena.read_at(offset) };
712 assert_eq!(*read, 42);
713 }
714}