Skip to main content

grafeo_common/memory/
arena.rs

1//! Epoch-based arena allocator for MVCC.
2//!
3//! This is how Grafeo manages memory for versioned data. Each epoch gets its
4//! own arena, and when all readers from an old epoch finish, we free the whole
5//! thing at once. Much faster than tracking individual allocations.
6//!
7//! Use [`ArenaAllocator`] to manage multiple epochs, or [`Arena`] directly
8//! if you're working with a single epoch.
9
10// Arena allocators require unsafe code for memory management
11#![allow(unsafe_code)]
12
13use std::alloc::{Layout, alloc, dealloc};
14use std::ptr::NonNull;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17use parking_lot::RwLock;
18
19use crate::types::EpochId;
20
21/// Default chunk size for arena allocations (1 MB).
22const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
23
24/// A memory chunk in the arena.
25struct Chunk {
26    /// Pointer to the start of the chunk.
27    ptr: NonNull<u8>,
28    /// Total capacity of the chunk.
29    capacity: usize,
30    /// Current allocation offset.
31    offset: AtomicUsize,
32}
33
34impl Chunk {
35    /// Creates a new chunk with the given capacity.
36    fn new(capacity: usize) -> Self {
37        let layout = Layout::from_size_align(capacity, 16).expect("Invalid layout");
38        // SAFETY: We're allocating a valid layout
39        let ptr = unsafe { alloc(layout) };
40        let ptr = NonNull::new(ptr).expect("Allocation failed");
41
42        Self {
43            ptr,
44            capacity,
45            offset: AtomicUsize::new(0),
46        }
47    }
48
49    /// Tries to allocate `size` bytes with the given alignment.
50    /// Returns None if there's not enough space.
51    fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
52        self.try_alloc_with_offset(size, align).map(|(_, ptr)| ptr)
53    }
54
55    /// Tries to allocate `size` bytes with the given alignment.
56    /// Returns (offset, ptr) where offset is the aligned offset within this chunk.
57    /// Returns None if there's not enough space.
58    fn try_alloc_with_offset(&self, size: usize, align: usize) -> Option<(u32, NonNull<u8>)> {
59        loop {
60            let current = self.offset.load(Ordering::Relaxed);
61
62            // Calculate aligned offset
63            let aligned = (current + align - 1) & !(align - 1);
64            let new_offset = aligned + size;
65
66            if new_offset > self.capacity {
67                return None;
68            }
69
70            // Try to reserve the space
71            match self.offset.compare_exchange_weak(
72                current,
73                new_offset,
74                Ordering::AcqRel,
75                Ordering::Relaxed,
76            ) {
77                Ok(_) => {
78                    // SAFETY: We've reserved this range exclusively
79                    let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
80                    return Some((aligned as u32, NonNull::new(ptr)?));
81                }
82                Err(_) => continue, // Retry
83            }
84        }
85    }
86
87    /// Returns the amount of memory used in this chunk.
88    fn used(&self) -> usize {
89        self.offset.load(Ordering::Relaxed)
90    }
91
92    /// Returns the remaining capacity in this chunk.
93    #[allow(dead_code)]
94    fn remaining(&self) -> usize {
95        self.capacity - self.used()
96    }
97}
98
99impl Drop for Chunk {
100    fn drop(&mut self) {
101        let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
102        // SAFETY: We allocated this memory with the same layout
103        unsafe { dealloc(self.ptr.as_ptr(), layout) };
104    }
105}
106
107// SAFETY: Chunk uses atomic operations for thread-safe allocation
108unsafe impl Send for Chunk {}
109unsafe impl Sync for Chunk {}
110
111/// A single epoch's memory arena.
112///
113/// Allocates by bumping a pointer forward - extremely fast. You can't free
114/// individual allocations; instead, drop the whole arena when the epoch
115/// is no longer needed.
116///
117/// Thread-safe: multiple threads can allocate concurrently using atomics.
118pub struct Arena {
119    /// The epoch this arena belongs to.
120    epoch: EpochId,
121    /// List of memory chunks.
122    chunks: RwLock<Vec<Chunk>>,
123    /// Default chunk size for new allocations.
124    chunk_size: usize,
125    /// Total bytes allocated.
126    total_allocated: AtomicUsize,
127}
128
129impl Arena {
130    /// Creates a new arena for the given epoch.
131    #[must_use]
132    pub fn new(epoch: EpochId) -> Self {
133        Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
134    }
135
136    /// Creates a new arena with a custom chunk size.
137    #[must_use]
138    pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Self {
139        let initial_chunk = Chunk::new(chunk_size);
140        Self {
141            epoch,
142            chunks: RwLock::new(vec![initial_chunk]),
143            chunk_size,
144            total_allocated: AtomicUsize::new(chunk_size),
145        }
146    }
147
148    /// Returns the epoch this arena belongs to.
149    #[must_use]
150    pub fn epoch(&self) -> EpochId {
151        self.epoch
152    }
153
154    /// Allocates `size` bytes with the given alignment.
155    ///
156    /// # Panics
157    ///
158    /// Panics if allocation fails (out of memory).
159    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
160        // First try to allocate from existing chunks
161        {
162            let chunks = self.chunks.read();
163            for chunk in chunks.iter().rev() {
164                if let Some(ptr) = chunk.try_alloc(size, align) {
165                    return ptr;
166                }
167            }
168        }
169
170        // Need a new chunk
171        self.alloc_new_chunk(size, align)
172    }
173
174    /// Allocates a value of type T.
175    pub fn alloc_value<T>(&self, value: T) -> &mut T {
176        let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>());
177        // SAFETY: We've allocated the correct size and alignment
178        unsafe {
179            let typed_ptr = ptr.as_ptr() as *mut T;
180            typed_ptr.write(value);
181            &mut *typed_ptr
182        }
183    }
184
185    /// Allocates a slice of values.
186    pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> &mut [T] {
187        if values.is_empty() {
188            return &mut [];
189        }
190
191        let size = std::mem::size_of::<T>() * values.len();
192        let align = std::mem::align_of::<T>();
193        let ptr = self.alloc(size, align);
194
195        // SAFETY: We've allocated the correct size and alignment
196        unsafe {
197            let typed_ptr = ptr.as_ptr() as *mut T;
198            std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
199            std::slice::from_raw_parts_mut(typed_ptr, values.len())
200        }
201    }
202
203    /// Allocates a value and returns its offset within the primary chunk.
204    ///
205    /// This is used by tiered storage to store values in the arena and track
206    /// their locations via compact u32 offsets in `HotVersionRef`.
207    ///
208    /// # Panics
209    ///
210    /// Panics if allocation would require a new chunk. Ensure chunk size is
211    /// large enough for your use case.
212    #[cfg(feature = "tiered-storage")]
213    pub fn alloc_value_with_offset<T>(&self, value: T) -> (u32, &mut T) {
214        let size = std::mem::size_of::<T>();
215        let align = std::mem::align_of::<T>();
216
217        // Try to allocate in the first chunk to get a stable offset
218        let chunks = self.chunks.read();
219        let chunk = chunks
220            .first()
221            .expect("Arena should have at least one chunk");
222
223        let (offset, ptr) = chunk
224            .try_alloc_with_offset(size, align)
225            .expect("Allocation would create new chunk - increase chunk size");
226
227        // SAFETY: We've allocated the correct size and alignment
228        unsafe {
229            let typed_ptr = ptr.as_ptr().cast::<T>();
230            typed_ptr.write(value);
231            (offset, &mut *typed_ptr)
232        }
233    }
234
235    /// Reads a value at the given offset in the primary chunk.
236    ///
237    /// # Safety
238    ///
239    /// - The offset must have been returned by a previous `alloc_value_with_offset` call
240    /// - The type T must match what was stored at that offset
241    /// - The arena must not have been dropped
242    #[cfg(feature = "tiered-storage")]
243    pub unsafe fn read_at<T>(&self, offset: u32) -> &T {
244        let chunks = self.chunks.read();
245        let chunk = chunks
246            .first()
247            .expect("Arena should have at least one chunk");
248
249        debug_assert!(
250            (offset as usize) + std::mem::size_of::<T>() <= chunk.used(),
251            "read_at: offset {} + size_of::<{}>() = {} exceeds chunk used bytes {}",
252            offset,
253            std::any::type_name::<T>(),
254            (offset as usize) + std::mem::size_of::<T>(),
255            chunk.used()
256        );
257        debug_assert!(
258            (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
259            "read_at: offset {} is not aligned for {} (alignment {})",
260            offset,
261            std::any::type_name::<T>(),
262            std::mem::align_of::<T>()
263        );
264
265        // SAFETY: Caller guarantees offset is valid and T matches stored type
266        unsafe {
267            let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
268            &*ptr
269        }
270    }
271
272    /// Reads a value mutably at the given offset in the primary chunk.
273    ///
274    /// # Safety
275    ///
276    /// - The offset must have been returned by a previous `alloc_value_with_offset` call
277    /// - The type T must match what was stored at that offset
278    /// - The arena must not have been dropped
279    /// - No other references to this value may exist
280    #[cfg(feature = "tiered-storage")]
281    pub unsafe fn read_at_mut<T>(&self, offset: u32) -> &mut T {
282        let chunks = self.chunks.read();
283        let chunk = chunks
284            .first()
285            .expect("Arena should have at least one chunk");
286
287        debug_assert!(
288            (offset as usize) + std::mem::size_of::<T>() <= chunk.capacity,
289            "read_at_mut: offset {} + size_of::<{}>() = {} exceeds chunk capacity {}",
290            offset,
291            std::any::type_name::<T>(),
292            (offset as usize) + std::mem::size_of::<T>(),
293            chunk.capacity
294        );
295        debug_assert!(
296            (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
297            "read_at_mut: offset {} is not aligned for {} (alignment {})",
298            offset,
299            std::any::type_name::<T>(),
300            std::mem::align_of::<T>()
301        );
302
303        // SAFETY: Caller guarantees offset is valid, T matches, and no aliasing
304        unsafe {
305            let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
306            &mut *ptr
307        }
308    }
309
310    /// Allocates a new chunk and performs the allocation.
311    fn alloc_new_chunk(&self, size: usize, align: usize) -> NonNull<u8> {
312        let chunk_size = self.chunk_size.max(size + align);
313        let chunk = Chunk::new(chunk_size);
314
315        self.total_allocated
316            .fetch_add(chunk_size, Ordering::Relaxed);
317
318        let ptr = chunk
319            .try_alloc(size, align)
320            .expect("Fresh chunk should have space");
321
322        let mut chunks = self.chunks.write();
323        chunks.push(chunk);
324
325        ptr
326    }
327
328    /// Returns the total memory allocated by this arena.
329    #[must_use]
330    pub fn total_allocated(&self) -> usize {
331        self.total_allocated.load(Ordering::Relaxed)
332    }
333
334    /// Returns the total memory used (not just allocated capacity).
335    #[must_use]
336    pub fn total_used(&self) -> usize {
337        let chunks = self.chunks.read();
338        chunks.iter().map(Chunk::used).sum()
339    }
340
341    /// Returns statistics about this arena.
342    #[must_use]
343    pub fn stats(&self) -> ArenaStats {
344        let chunks = self.chunks.read();
345        ArenaStats {
346            epoch: self.epoch,
347            chunk_count: chunks.len(),
348            total_allocated: self.total_allocated.load(Ordering::Relaxed),
349            total_used: chunks.iter().map(Chunk::used).sum(),
350        }
351    }
352}
353
354/// Statistics about an arena.
355#[derive(Debug, Clone)]
356pub struct ArenaStats {
357    /// The epoch this arena belongs to.
358    pub epoch: EpochId,
359    /// Number of chunks allocated.
360    pub chunk_count: usize,
361    /// Total bytes allocated.
362    pub total_allocated: usize,
363    /// Total bytes used.
364    pub total_used: usize,
365}
366
367/// Manages arenas across multiple epochs.
368///
369/// Use this to create new epochs, allocate in the current epoch, and
370/// clean up old epochs when they're no longer needed.
371pub struct ArenaAllocator {
372    /// Map of epochs to arenas.
373    arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
374    /// Current epoch.
375    current_epoch: AtomicUsize,
376    /// Default chunk size.
377    chunk_size: usize,
378}
379
380impl ArenaAllocator {
381    /// Creates a new arena allocator.
382    #[must_use]
383    pub fn new() -> Self {
384        Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
385    }
386
387    /// Creates a new arena allocator with a custom chunk size.
388    #[must_use]
389    pub fn with_chunk_size(chunk_size: usize) -> Self {
390        let allocator = Self {
391            arenas: RwLock::new(hashbrown::HashMap::new()),
392            current_epoch: AtomicUsize::new(0),
393            chunk_size,
394        };
395
396        // Create the initial epoch
397        let epoch = EpochId::INITIAL;
398        allocator
399            .arenas
400            .write()
401            .insert(epoch, Arena::with_chunk_size(epoch, chunk_size));
402
403        allocator
404    }
405
406    /// Returns the current epoch.
407    #[must_use]
408    pub fn current_epoch(&self) -> EpochId {
409        EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
410    }
411
412    /// Creates a new epoch and returns its ID.
413    pub fn new_epoch(&self) -> EpochId {
414        let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
415        let epoch = EpochId::new(new_id);
416
417        let arena = Arena::with_chunk_size(epoch, self.chunk_size);
418        self.arenas.write().insert(epoch, arena);
419
420        epoch
421    }
422
423    /// Gets the arena for a specific epoch.
424    ///
425    /// # Panics
426    ///
427    /// Panics if the epoch doesn't exist.
428    pub fn arena(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
429        parking_lot::RwLockReadGuard::map(self.arenas.read(), |arenas| {
430            arenas.get(&epoch).expect("Epoch should exist")
431        })
432    }
433
434    /// Ensures an arena exists for the given epoch, creating it if necessary.
435    /// Returns whether a new arena was created.
436    #[cfg(feature = "tiered-storage")]
437    pub fn ensure_epoch(&self, epoch: EpochId) -> bool {
438        // Fast path: check if epoch already exists
439        {
440            let arenas = self.arenas.read();
441            if arenas.contains_key(&epoch) {
442                return false;
443            }
444        }
445
446        // Slow path: create the epoch
447        let mut arenas = self.arenas.write();
448        // Double-check after acquiring write lock
449        if arenas.contains_key(&epoch) {
450            return false;
451        }
452
453        let arena = Arena::with_chunk_size(epoch, self.chunk_size);
454        arenas.insert(epoch, arena);
455        true
456    }
457
458    /// Gets or creates an arena for a specific epoch.
459    #[cfg(feature = "tiered-storage")]
460    pub fn arena_or_create(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
461        self.ensure_epoch(epoch);
462        self.arena(epoch)
463    }
464
465    /// Allocates in the current epoch.
466    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
467        let epoch = self.current_epoch();
468        let arenas = self.arenas.read();
469        arenas
470            .get(&epoch)
471            .expect("Current epoch exists")
472            .alloc(size, align)
473    }
474
475    /// Drops an epoch, freeing all its memory.
476    ///
477    /// This should only be called when no readers are using this epoch.
478    pub fn drop_epoch(&self, epoch: EpochId) {
479        self.arenas.write().remove(&epoch);
480    }
481
482    /// Returns total memory allocated across all epochs.
483    #[must_use]
484    pub fn total_allocated(&self) -> usize {
485        self.arenas
486            .read()
487            .values()
488            .map(Arena::total_allocated)
489            .sum()
490    }
491}
492
493impl Default for ArenaAllocator {
494    fn default() -> Self {
495        Self::new()
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    #[test]
504    fn test_arena_basic_allocation() {
505        let arena = Arena::new(EpochId::INITIAL);
506
507        // Allocate some bytes
508        let ptr1 = arena.alloc(100, 8);
509        let ptr2 = arena.alloc(200, 8);
510
511        // Pointers should be different
512        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
513    }
514
515    #[test]
516    fn test_arena_value_allocation() {
517        let arena = Arena::new(EpochId::INITIAL);
518
519        let value = arena.alloc_value(42u64);
520        assert_eq!(*value, 42);
521
522        *value = 100;
523        assert_eq!(*value, 100);
524    }
525
526    #[test]
527    fn test_arena_slice_allocation() {
528        let arena = Arena::new(EpochId::INITIAL);
529
530        let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]);
531        assert_eq!(slice, &[1, 2, 3, 4, 5]);
532
533        slice[0] = 10;
534        assert_eq!(slice[0], 10);
535    }
536
537    #[test]
538    fn test_arena_large_allocation() {
539        let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024);
540
541        // Allocate something larger than the chunk size
542        let _ptr = arena.alloc(2048, 8);
543
544        // Should have created a new chunk
545        assert!(arena.stats().chunk_count >= 2);
546    }
547
548    #[test]
549    fn test_arena_allocator_epochs() {
550        let allocator = ArenaAllocator::new();
551
552        let epoch0 = allocator.current_epoch();
553        assert_eq!(epoch0, EpochId::INITIAL);
554
555        let epoch1 = allocator.new_epoch();
556        assert_eq!(epoch1, EpochId::new(1));
557
558        let epoch2 = allocator.new_epoch();
559        assert_eq!(epoch2, EpochId::new(2));
560
561        // Current epoch should be the latest
562        assert_eq!(allocator.current_epoch(), epoch2);
563    }
564
565    #[test]
566    fn test_arena_allocator_allocation() {
567        let allocator = ArenaAllocator::new();
568
569        let ptr1 = allocator.alloc(100, 8);
570        let ptr2 = allocator.alloc(100, 8);
571
572        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
573    }
574
575    #[test]
576    fn test_arena_drop_epoch() {
577        let allocator = ArenaAllocator::new();
578
579        let initial_mem = allocator.total_allocated();
580
581        let epoch1 = allocator.new_epoch();
582        // Allocate some memory in the new epoch
583        {
584            let arena = allocator.arena(epoch1);
585            arena.alloc(10000, 8);
586        }
587
588        let after_alloc = allocator.total_allocated();
589        assert!(after_alloc > initial_mem);
590
591        // Drop the epoch
592        allocator.drop_epoch(epoch1);
593
594        // Memory should decrease
595        let after_drop = allocator.total_allocated();
596        assert!(after_drop < after_alloc);
597    }
598
599    #[test]
600    fn test_arena_stats() {
601        let arena = Arena::with_chunk_size(EpochId::new(5), 4096);
602
603        let stats = arena.stats();
604        assert_eq!(stats.epoch, EpochId::new(5));
605        assert_eq!(stats.chunk_count, 1);
606        assert_eq!(stats.total_allocated, 4096);
607        assert_eq!(stats.total_used, 0);
608
609        arena.alloc(100, 8);
610        let stats = arena.stats();
611        assert!(stats.total_used >= 100);
612    }
613}
614
615#[cfg(all(test, feature = "tiered-storage"))]
616mod tiered_storage_tests {
617    use super::*;
618
619    #[test]
620    fn test_alloc_value_with_offset_basic() {
621        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
622
623        let (offset1, val1) = arena.alloc_value_with_offset(42u64);
624        let (offset2, val2) = arena.alloc_value_with_offset(100u64);
625
626        // First allocation should be at offset 0 (aligned)
627        assert_eq!(offset1, 0);
628        // Second allocation should be after the first
629        assert!(offset2 > offset1);
630        assert!(offset2 >= std::mem::size_of::<u64>() as u32);
631
632        // Values should be correct
633        assert_eq!(*val1, 42);
634        assert_eq!(*val2, 100);
635
636        // Mutation should work
637        *val1 = 999;
638        assert_eq!(*val1, 999);
639    }
640
641    #[test]
642    fn test_read_at_basic() {
643        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
644
645        let (offset, _) = arena.alloc_value_with_offset(12345u64);
646
647        // Read it back
648        let value: &u64 = unsafe { arena.read_at(offset) };
649        assert_eq!(*value, 12345);
650    }
651
652    #[test]
653    fn test_read_at_mut_basic() {
654        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
655
656        let (offset, _) = arena.alloc_value_with_offset(42u64);
657
658        // Read and modify
659        let value: &mut u64 = unsafe { arena.read_at_mut(offset) };
660        assert_eq!(*value, 42);
661        *value = 100;
662
663        // Verify modification persisted
664        let value: &u64 = unsafe { arena.read_at(offset) };
665        assert_eq!(*value, 100);
666    }
667
668    #[test]
669    fn test_alloc_value_with_offset_struct() {
670        #[derive(Debug, Clone, PartialEq)]
671        struct TestNode {
672            id: u64,
673            name: [u8; 32],
674            value: i32,
675        }
676
677        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
678
679        let node = TestNode {
680            id: 12345,
681            name: [b'A'; 32],
682            value: -999,
683        };
684
685        let (offset, stored) = arena.alloc_value_with_offset(node.clone());
686        assert_eq!(stored.id, 12345);
687        assert_eq!(stored.value, -999);
688
689        // Read it back
690        let read: &TestNode = unsafe { arena.read_at(offset) };
691        assert_eq!(read.id, node.id);
692        assert_eq!(read.name, node.name);
693        assert_eq!(read.value, node.value);
694    }
695
696    #[test]
697    fn test_alloc_value_with_offset_alignment() {
698        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
699
700        // Allocate a byte first to potentially misalign
701        let (offset1, _) = arena.alloc_value_with_offset(1u8);
702        assert_eq!(offset1, 0);
703
704        // Now allocate a u64 which requires 8-byte alignment
705        let (offset2, val) = arena.alloc_value_with_offset(42u64);
706
707        // offset2 should be 8-byte aligned
708        assert_eq!(offset2 % 8, 0);
709        assert_eq!(*val, 42);
710    }
711
712    #[test]
713    fn test_alloc_value_with_offset_multiple() {
714        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
715
716        let mut offsets = Vec::new();
717        for i in 0..100u64 {
718            let (offset, val) = arena.alloc_value_with_offset(i);
719            offsets.push(offset);
720            assert_eq!(*val, i);
721        }
722
723        // All offsets should be unique and in ascending order
724        for window in offsets.windows(2) {
725            assert!(window[0] < window[1]);
726        }
727
728        // Read all values back
729        for (i, offset) in offsets.iter().enumerate() {
730            let val: &u64 = unsafe { arena.read_at(*offset) };
731            assert_eq!(*val, i as u64);
732        }
733    }
734
735    #[test]
736    fn test_arena_allocator_with_offset() {
737        let allocator = ArenaAllocator::with_chunk_size(4096);
738
739        let epoch = allocator.current_epoch();
740        let arena = allocator.arena(epoch);
741
742        let (offset, val) = arena.alloc_value_with_offset(42u64);
743        assert_eq!(*val, 42);
744
745        let read: &u64 = unsafe { arena.read_at(offset) };
746        assert_eq!(*read, 42);
747    }
748
749    #[test]
750    #[cfg(debug_assertions)]
751    #[should_panic(expected = "exceeds chunk used bytes")]
752    fn test_read_at_out_of_bounds() {
753        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
754        let (_offset, _) = arena.alloc_value_with_offset(42u64);
755
756        // Read way past the allocated region — should panic in debug
757        unsafe {
758            let _: &u64 = arena.read_at(4000);
759        }
760    }
761
762    #[test]
763    #[cfg(debug_assertions)]
764    #[should_panic(expected = "is not aligned")]
765    fn test_read_at_misaligned() {
766        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
767        // Allocate a u8 at offset 0
768        let (_offset, _) = arena.alloc_value_with_offset(0xFFu8);
769        // Also allocate some bytes so offset 1 is within used range
770        let _ = arena.alloc_value_with_offset(0u64);
771
772        // Try to read a u64 at offset 1 (misaligned for u64)
773        unsafe {
774            let _: &u64 = arena.read_at(1);
775        }
776    }
777
778    #[test]
779    #[cfg(not(miri))] // parking_lot uses integer-to-pointer casts incompatible with Miri strict provenance
780    fn test_concurrent_read_stress() {
781        use std::sync::Arc;
782
783        let arena = Arc::new(Arena::with_chunk_size(EpochId::INITIAL, 1024 * 1024));
784        let num_threads = 8;
785        let values_per_thread = 1000;
786
787        // Each thread allocates values and records offsets
788        let mut all_offsets = Vec::new();
789        for t in 0..num_threads {
790            let base = (t * values_per_thread) as u64;
791            let mut offsets = Vec::with_capacity(values_per_thread);
792            for i in 0..values_per_thread as u64 {
793                let (offset, _) = arena.alloc_value_with_offset(base + i);
794                offsets.push(offset);
795            }
796            all_offsets.push(offsets);
797        }
798
799        // Now read all values back concurrently from multiple threads
800        let mut handles = Vec::new();
801        for (t, offsets) in all_offsets.into_iter().enumerate() {
802            let arena = Arc::clone(&arena);
803            let base = (t * values_per_thread) as u64;
804            handles.push(std::thread::spawn(move || {
805                for (i, offset) in offsets.iter().enumerate() {
806                    let val: &u64 = unsafe { arena.read_at(*offset) };
807                    assert_eq!(*val, base + i as u64);
808                }
809            }));
810        }
811
812        for handle in handles {
813            handle.join().expect("Thread panicked");
814        }
815    }
816
817    #[test]
818    fn test_multi_type_interleaved() {
819        #[derive(Debug, Clone, PartialEq)]
820        #[repr(C)]
821        struct Record {
822            id: u64,
823            flags: u32,
824            weight: f32,
825        }
826
827        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096);
828
829        // Interleave different types
830        let (off_u8, _) = arena.alloc_value_with_offset(0xAAu8);
831        let (off_u32, _) = arena.alloc_value_with_offset(0xBBBBu32);
832        let (off_u64, _) = arena.alloc_value_with_offset(0xCCCCCCCCu64);
833        let (off_rec, _) = arena.alloc_value_with_offset(Record {
834            id: 42,
835            flags: 0xFF,
836            weight: 3.14,
837        });
838
839        // Read them all back
840        unsafe {
841            assert_eq!(*arena.read_at::<u8>(off_u8), 0xAA);
842            assert_eq!(*arena.read_at::<u32>(off_u32), 0xBBBB);
843            assert_eq!(*arena.read_at::<u64>(off_u64), 0xCCCCCCCC);
844
845            let rec: &Record = arena.read_at(off_rec);
846            assert_eq!(rec.id, 42);
847            assert_eq!(rec.flags, 0xFF);
848            assert!((rec.weight - 3.14).abs() < 0.001);
849        }
850    }
851}