Skip to main content

grafeo_common/memory/
arena.rs

1//! Epoch-based arena allocator for MVCC.
2//!
3//! This is how Grafeo manages memory for versioned data. Each epoch gets its
4//! own arena, and when all readers from an old epoch finish, we free the whole
5//! thing at once. Much faster than tracking individual allocations.
6//!
7//! Use [`ArenaAllocator`] to manage multiple epochs, or [`Arena`] directly
8//! if you're working with a single epoch.
9
10// Arena allocators require unsafe code for memory management
11#![allow(unsafe_code)]
12
13use std::alloc::{Layout, alloc, dealloc};
14use std::fmt;
15use std::ptr::NonNull;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18use parking_lot::RwLock;
19
20use crate::types::EpochId;
21
22/// Default chunk size for arena allocations (1 MB).
23const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
24
25/// Errors from arena allocation operations.
26#[derive(Debug, Clone)]
27pub enum AllocError {
28    /// The system allocator returned null (out of memory).
29    OutOfMemory,
30    /// The requested epoch does not exist.
31    EpochNotFound(EpochId),
32    /// Arena chunk has insufficient space for the allocation.
33    InsufficientSpace,
34}
35
36impl fmt::Display for AllocError {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        match self {
39            Self::OutOfMemory => write!(f, "arena allocation failed: out of memory"),
40            Self::EpochNotFound(id) => write!(f, "epoch {id} not found in arena allocator"),
41            Self::InsufficientSpace => {
42                write!(f, "arena chunk has insufficient space for allocation")
43            }
44        }
45    }
46}
47
48impl std::error::Error for AllocError {}
49
50impl From<AllocError> for crate::Error {
51    fn from(e: AllocError) -> Self {
52        match e {
53            AllocError::OutOfMemory | AllocError::InsufficientSpace => {
54                crate::Error::Storage(crate::utils::error::StorageError::Full)
55            }
56            AllocError::EpochNotFound(id) => {
57                crate::Error::Internal(format!("epoch {id} not found in arena allocator"))
58            }
59        }
60    }
61}
62
63/// A memory chunk in the arena.
64struct Chunk {
65    /// Pointer to the start of the chunk.
66    ptr: NonNull<u8>,
67    /// Total capacity of the chunk.
68    capacity: usize,
69    /// Current allocation offset.
70    offset: AtomicUsize,
71}
72
73impl Chunk {
74    /// Creates a new chunk with the given capacity.
75    ///
76    /// # Errors
77    ///
78    /// Returns `AllocError::OutOfMemory` if the system allocator fails.
79    fn new(capacity: usize) -> Result<Self, AllocError> {
80        let layout = Layout::from_size_align(capacity, 16).map_err(|_| AllocError::OutOfMemory)?;
81        // SAFETY: We're allocating a valid layout
82        let ptr = unsafe { alloc(layout) };
83        let ptr = NonNull::new(ptr).ok_or(AllocError::OutOfMemory)?;
84
85        Ok(Self {
86            ptr,
87            capacity,
88            offset: AtomicUsize::new(0),
89        })
90    }
91
92    /// Tries to allocate `size` bytes with the given alignment.
93    /// Returns None if there's not enough space.
94    fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
95        self.try_alloc_with_offset(size, align).map(|(_, ptr)| ptr)
96    }
97
98    /// Tries to allocate `size` bytes with the given alignment.
99    /// Returns (offset, ptr) where offset is the aligned offset within this chunk.
100    /// Returns None if there's not enough space.
101    fn try_alloc_with_offset(&self, size: usize, align: usize) -> Option<(u32, NonNull<u8>)> {
102        loop {
103            let current = self.offset.load(Ordering::Relaxed);
104
105            // Calculate aligned offset
106            let aligned = (current + align - 1) & !(align - 1);
107            let new_offset = aligned + size;
108
109            if new_offset > self.capacity {
110                return None;
111            }
112
113            // Try to reserve the space
114            match self.offset.compare_exchange_weak(
115                current,
116                new_offset,
117                Ordering::AcqRel,
118                Ordering::Relaxed,
119            ) {
120                Ok(_) => {
121                    // SAFETY: We've reserved this range exclusively
122                    let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
123                    return Some((aligned as u32, NonNull::new(ptr)?));
124                }
125                Err(_) => continue, // Retry
126            }
127        }
128    }
129
130    /// Returns the amount of memory used in this chunk.
131    fn used(&self) -> usize {
132        self.offset.load(Ordering::Relaxed)
133    }
134}
135
136impl Drop for Chunk {
137    fn drop(&mut self) {
138        let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
139        // SAFETY: We allocated this memory with the same layout
140        unsafe { dealloc(self.ptr.as_ptr(), layout) };
141    }
142}
143
144// SAFETY: Chunk uses atomic operations for thread-safe allocation
145unsafe impl Send for Chunk {}
146unsafe impl Sync for Chunk {}
147
148/// A single epoch's memory arena.
149///
150/// Allocates by bumping a pointer forward - extremely fast. You can't free
151/// individual allocations; instead, drop the whole arena when the epoch
152/// is no longer needed.
153///
154/// Thread-safe: multiple threads can allocate concurrently using atomics.
155pub struct Arena {
156    /// The epoch this arena belongs to.
157    epoch: EpochId,
158    /// List of memory chunks.
159    chunks: RwLock<Vec<Chunk>>,
160    /// Default chunk size for new allocations.
161    chunk_size: usize,
162    /// Total bytes allocated.
163    total_allocated: AtomicUsize,
164}
165
166impl Arena {
167    /// Creates a new arena for the given epoch.
168    ///
169    /// # Errors
170    ///
171    /// Returns `AllocError::OutOfMemory` if the initial chunk allocation fails.
172    pub fn new(epoch: EpochId) -> Result<Self, AllocError> {
173        Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
174    }
175
176    /// Creates a new arena with a custom chunk size.
177    ///
178    /// # Errors
179    ///
180    /// Returns `AllocError::OutOfMemory` if the initial chunk allocation fails.
181    pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Result<Self, AllocError> {
182        let initial_chunk = Chunk::new(chunk_size)?;
183        Ok(Self {
184            epoch,
185            chunks: RwLock::new(vec![initial_chunk]),
186            chunk_size,
187            total_allocated: AtomicUsize::new(chunk_size),
188        })
189    }
190
191    /// Returns the epoch this arena belongs to.
192    #[must_use]
193    pub fn epoch(&self) -> EpochId {
194        self.epoch
195    }
196
197    /// Allocates `size` bytes with the given alignment.
198    ///
199    /// # Errors
200    ///
201    /// Returns `AllocError::OutOfMemory` if a new chunk is needed and
202    /// the system allocator fails.
203    pub fn alloc(&self, size: usize, align: usize) -> Result<NonNull<u8>, AllocError> {
204        // First try to allocate from existing chunks
205        {
206            let chunks = self.chunks.read();
207            for chunk in chunks.iter().rev() {
208                if let Some(ptr) = chunk.try_alloc(size, align) {
209                    return Ok(ptr);
210                }
211            }
212        }
213
214        // Need a new chunk
215        self.alloc_new_chunk(size, align)
216    }
217
218    /// Allocates a value of type T.
219    ///
220    /// # Errors
221    ///
222    /// Returns `AllocError::OutOfMemory` if allocation fails.
223    pub fn alloc_value<T>(&self, value: T) -> Result<&mut T, AllocError> {
224        let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>())?;
225        // SAFETY: We've allocated the correct size and alignment
226        Ok(unsafe {
227            let typed_ptr = ptr.as_ptr() as *mut T;
228            typed_ptr.write(value);
229            &mut *typed_ptr
230        })
231    }
232
233    /// Allocates a slice of values.
234    ///
235    /// # Errors
236    ///
237    /// Returns `AllocError::OutOfMemory` if allocation fails.
238    pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> Result<&mut [T], AllocError> {
239        if values.is_empty() {
240            return Ok(&mut []);
241        }
242
243        let size = std::mem::size_of::<T>() * values.len();
244        let align = std::mem::align_of::<T>();
245        let ptr = self.alloc(size, align)?;
246
247        // SAFETY: We've allocated the correct size and alignment
248        Ok(unsafe {
249            let typed_ptr = ptr.as_ptr() as *mut T;
250            std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
251            std::slice::from_raw_parts_mut(typed_ptr, values.len())
252        })
253    }
254
255    /// Allocates a value and returns its offset within the primary chunk.
256    ///
257    /// This is used by tiered storage to store values in the arena and track
258    /// their locations via compact u32 offsets in `HotVersionRef`.
259    ///
260    /// # Errors
261    ///
262    /// Returns `AllocError::InsufficientSpace` if the primary chunk does not
263    /// have enough room. Increase the chunk size for your use case.
264    ///
265    /// # Panics
266    ///
267    /// Panics if the arena has no chunks (should never happen in normal use).
268    #[cfg(feature = "tiered-storage")]
269    pub fn alloc_value_with_offset<T>(&self, value: T) -> Result<(u32, &mut T), AllocError> {
270        let size = std::mem::size_of::<T>();
271        let align = std::mem::align_of::<T>();
272
273        // Try to allocate in the first chunk to get a stable offset
274        let chunks = self.chunks.read();
275        let chunk = chunks
276            .first()
277            .expect("Arena should have at least one chunk");
278
279        let (offset, ptr) = chunk
280            .try_alloc_with_offset(size, align)
281            .ok_or(AllocError::InsufficientSpace)?;
282
283        // SAFETY: We've allocated the correct size and alignment
284        Ok(unsafe {
285            let typed_ptr = ptr.as_ptr().cast::<T>();
286            typed_ptr.write(value);
287            (offset, &mut *typed_ptr)
288        })
289    }
290
291    /// Reads a value at the given offset in the primary chunk.
292    ///
293    /// # Safety
294    ///
295    /// - The offset must have been returned by a previous `alloc_value_with_offset` call
296    /// - The type T must match what was stored at that offset
297    /// - The arena must not have been dropped
298    ///
299    /// # Panics
300    ///
301    /// Panics if the arena has no chunks (should never happen in normal use).
302    #[cfg(feature = "tiered-storage")]
303    pub unsafe fn read_at<T>(&self, offset: u32) -> &T {
304        let chunks = self.chunks.read();
305        let chunk = chunks
306            .first()
307            .expect("Arena should have at least one chunk");
308
309        debug_assert!(
310            (offset as usize) + std::mem::size_of::<T>() <= chunk.used(),
311            "read_at: offset {} + size_of::<{}>() = {} exceeds chunk used bytes {}",
312            offset,
313            std::any::type_name::<T>(),
314            (offset as usize) + std::mem::size_of::<T>(),
315            chunk.used()
316        );
317        debug_assert!(
318            (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
319            "read_at: offset {} is not aligned for {} (alignment {})",
320            offset,
321            std::any::type_name::<T>(),
322            std::mem::align_of::<T>()
323        );
324
325        // SAFETY: Caller guarantees offset is valid and T matches stored type
326        unsafe {
327            let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
328            &*ptr
329        }
330    }
331
332    /// Reads a value mutably at the given offset in the primary chunk.
333    ///
334    /// # Safety
335    ///
336    /// - The offset must have been returned by a previous `alloc_value_with_offset` call
337    /// - The type T must match what was stored at that offset
338    /// - The arena must not have been dropped
339    /// - No other references to this value may exist
340    ///
341    /// # Panics
342    ///
343    /// Panics if the arena has no chunks (should never happen in normal use).
344    #[cfg(feature = "tiered-storage")]
345    pub unsafe fn read_at_mut<T>(&self, offset: u32) -> &mut T {
346        let chunks = self.chunks.read();
347        let chunk = chunks
348            .first()
349            .expect("Arena should have at least one chunk");
350
351        debug_assert!(
352            (offset as usize) + std::mem::size_of::<T>() <= chunk.capacity,
353            "read_at_mut: offset {} + size_of::<{}>() = {} exceeds chunk capacity {}",
354            offset,
355            std::any::type_name::<T>(),
356            (offset as usize) + std::mem::size_of::<T>(),
357            chunk.capacity
358        );
359        debug_assert!(
360            (offset as usize).is_multiple_of(std::mem::align_of::<T>()),
361            "read_at_mut: offset {} is not aligned for {} (alignment {})",
362            offset,
363            std::any::type_name::<T>(),
364            std::mem::align_of::<T>()
365        );
366
367        // SAFETY: Caller guarantees offset is valid, T matches, and no aliasing
368        unsafe {
369            let ptr = chunk.ptr.as_ptr().add(offset as usize).cast::<T>();
370            &mut *ptr
371        }
372    }
373
374    /// Allocates a new chunk and performs the allocation.
375    fn alloc_new_chunk(&self, size: usize, align: usize) -> Result<NonNull<u8>, AllocError> {
376        let chunk_size = self.chunk_size.max(size + align);
377        let chunk = Chunk::new(chunk_size)?;
378
379        self.total_allocated
380            .fetch_add(chunk_size, Ordering::Relaxed);
381
382        // The chunk was sized to fit this allocation, so this cannot fail.
383        let ptr = chunk
384            .try_alloc(size, align)
385            .expect("fresh chunk sized to fit");
386
387        let mut chunks = self.chunks.write();
388        chunks.push(chunk);
389
390        Ok(ptr)
391    }
392
393    /// Returns the total memory allocated by this arena.
394    #[must_use]
395    pub fn total_allocated(&self) -> usize {
396        self.total_allocated.load(Ordering::Relaxed)
397    }
398
399    /// Returns the total memory used (not just allocated capacity).
400    #[must_use]
401    pub fn total_used(&self) -> usize {
402        let chunks = self.chunks.read();
403        chunks.iter().map(Chunk::used).sum()
404    }
405
406    /// Returns statistics about this arena.
407    #[must_use]
408    pub fn stats(&self) -> ArenaStats {
409        let chunks = self.chunks.read();
410        ArenaStats {
411            epoch: self.epoch,
412            chunk_count: chunks.len(),
413            total_allocated: self.total_allocated.load(Ordering::Relaxed),
414            total_used: chunks.iter().map(Chunk::used).sum(),
415        }
416    }
417}
418
419/// Statistics about an arena.
420#[derive(Debug, Clone)]
421pub struct ArenaStats {
422    /// The epoch this arena belongs to.
423    pub epoch: EpochId,
424    /// Number of chunks allocated.
425    pub chunk_count: usize,
426    /// Total bytes allocated.
427    pub total_allocated: usize,
428    /// Total bytes used.
429    pub total_used: usize,
430}
431
432/// Manages arenas across multiple epochs.
433///
434/// Use this to create new epochs, allocate in the current epoch, and
435/// clean up old epochs when they're no longer needed.
436pub struct ArenaAllocator {
437    /// Map of epochs to arenas.
438    arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
439    /// Current epoch.
440    current_epoch: AtomicUsize,
441    /// Default chunk size.
442    chunk_size: usize,
443}
444
445impl ArenaAllocator {
446    /// Creates a new arena allocator.
447    ///
448    /// # Errors
449    ///
450    /// Returns `AllocError::OutOfMemory` if the initial arena allocation fails.
451    pub fn new() -> Result<Self, AllocError> {
452        Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
453    }
454
455    /// Creates a new arena allocator with a custom chunk size.
456    ///
457    /// # Errors
458    ///
459    /// Returns `AllocError::OutOfMemory` if the initial arena allocation fails.
460    pub fn with_chunk_size(chunk_size: usize) -> Result<Self, AllocError> {
461        let allocator = Self {
462            arenas: RwLock::new(hashbrown::HashMap::new()),
463            current_epoch: AtomicUsize::new(0),
464            chunk_size,
465        };
466
467        // Create the initial epoch
468        let epoch = EpochId::INITIAL;
469        allocator
470            .arenas
471            .write()
472            .insert(epoch, Arena::with_chunk_size(epoch, chunk_size)?);
473
474        Ok(allocator)
475    }
476
477    /// Returns the current epoch.
478    #[must_use]
479    pub fn current_epoch(&self) -> EpochId {
480        EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
481    }
482
483    /// Creates a new epoch and returns its ID.
484    ///
485    /// # Errors
486    ///
487    /// Returns `AllocError::OutOfMemory` if the arena allocation fails.
488    pub fn new_epoch(&self) -> Result<EpochId, AllocError> {
489        let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
490        let epoch = EpochId::new(new_id);
491
492        let arena = Arena::with_chunk_size(epoch, self.chunk_size)?;
493        self.arenas.write().insert(epoch, arena);
494
495        Ok(epoch)
496    }
497
498    /// Gets the arena for a specific epoch.
499    ///
500    /// # Errors
501    ///
502    /// Returns `AllocError::EpochNotFound` if the epoch doesn't exist.
503    pub fn arena(
504        &self,
505        epoch: EpochId,
506    ) -> Result<impl std::ops::Deref<Target = Arena> + '_, AllocError> {
507        let arenas = self.arenas.read();
508        if !arenas.contains_key(&epoch) {
509            return Err(AllocError::EpochNotFound(epoch));
510        }
511        Ok(parking_lot::RwLockReadGuard::map(arenas, |arenas| {
512            &arenas[&epoch]
513        }))
514    }
515
516    /// Ensures an arena exists for the given epoch, creating it if necessary.
517    /// Returns whether a new arena was created.
518    ///
519    /// # Errors
520    ///
521    /// Returns `AllocError::OutOfMemory` if a new arena allocation fails.
522    #[cfg(feature = "tiered-storage")]
523    pub fn ensure_epoch(&self, epoch: EpochId) -> Result<bool, AllocError> {
524        // Fast path: check if epoch already exists
525        {
526            let arenas = self.arenas.read();
527            if arenas.contains_key(&epoch) {
528                return Ok(false);
529            }
530        }
531
532        // Slow path: create the epoch
533        let mut arenas = self.arenas.write();
534        // Double-check after acquiring write lock
535        if arenas.contains_key(&epoch) {
536            return Ok(false);
537        }
538
539        let arena = Arena::with_chunk_size(epoch, self.chunk_size)?;
540        arenas.insert(epoch, arena);
541        Ok(true)
542    }
543
544    /// Gets or creates an arena for a specific epoch.
545    ///
546    /// # Errors
547    ///
548    /// Returns `AllocError` if the arena allocation fails.
549    #[cfg(feature = "tiered-storage")]
550    pub fn arena_or_create(
551        &self,
552        epoch: EpochId,
553    ) -> Result<impl std::ops::Deref<Target = Arena> + '_, AllocError> {
554        self.ensure_epoch(epoch)?;
555        self.arena(epoch)
556    }
557
558    /// Allocates in the current epoch.
559    ///
560    /// # Errors
561    ///
562    /// Returns `AllocError` if allocation fails.
563    ///
564    /// # Panics
565    ///
566    /// Panics if the current epoch has no arena (should never happen in normal use).
567    pub fn alloc(&self, size: usize, align: usize) -> Result<NonNull<u8>, AllocError> {
568        let epoch = self.current_epoch();
569        let arenas = self.arenas.read();
570        arenas
571            .get(&epoch)
572            .expect("current epoch always exists")
573            .alloc(size, align)
574    }
575
576    /// Drops an epoch, freeing all its memory.
577    ///
578    /// This should only be called when no readers are using this epoch.
579    pub fn drop_epoch(&self, epoch: EpochId) {
580        self.arenas.write().remove(&epoch);
581    }
582
583    /// Returns total memory allocated across all epochs.
584    #[must_use]
585    pub fn total_allocated(&self) -> usize {
586        self.arenas
587            .read()
588            .values()
589            .map(Arena::total_allocated)
590            .sum()
591    }
592}
593
594impl Default for ArenaAllocator {
595    /// Creates a default arena allocator.
596    ///
597    /// # Panics
598    ///
599    /// Panics if the initial arena allocation fails (out of memory).
600    fn default() -> Self {
601        Self::new().expect("failed to allocate default arena")
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use super::*;
608
609    #[test]
610    fn test_arena_basic_allocation() {
611        let arena = Arena::new(EpochId::INITIAL).unwrap();
612
613        // Allocate some bytes
614        let ptr1 = arena.alloc(100, 8).unwrap();
615        let ptr2 = arena.alloc(200, 8).unwrap();
616
617        // Pointers should be different
618        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
619    }
620
621    #[test]
622    fn test_arena_value_allocation() {
623        let arena = Arena::new(EpochId::INITIAL).unwrap();
624
625        let value = arena.alloc_value(42u64).unwrap();
626        assert_eq!(*value, 42);
627
628        *value = 100;
629        assert_eq!(*value, 100);
630    }
631
632    #[test]
633    fn test_arena_slice_allocation() {
634        let arena = Arena::new(EpochId::INITIAL).unwrap();
635
636        let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]).unwrap();
637        assert_eq!(slice, &[1, 2, 3, 4, 5]);
638
639        slice[0] = 10;
640        assert_eq!(slice[0], 10);
641    }
642
643    #[test]
644    fn test_arena_large_allocation() {
645        let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024).unwrap();
646
647        // Allocate something larger than the chunk size
648        let _ptr = arena.alloc(2048, 8).unwrap();
649
650        // Should have created a new chunk
651        assert!(arena.stats().chunk_count >= 2);
652    }
653
654    #[test]
655    fn test_arena_allocator_epochs() {
656        let allocator = ArenaAllocator::new().unwrap();
657
658        let epoch0 = allocator.current_epoch();
659        assert_eq!(epoch0, EpochId::INITIAL);
660
661        let epoch1 = allocator.new_epoch().unwrap();
662        assert_eq!(epoch1, EpochId::new(1));
663
664        let epoch2 = allocator.new_epoch().unwrap();
665        assert_eq!(epoch2, EpochId::new(2));
666
667        // Current epoch should be the latest
668        assert_eq!(allocator.current_epoch(), epoch2);
669    }
670
671    #[test]
672    fn test_arena_allocator_allocation() {
673        let allocator = ArenaAllocator::new().unwrap();
674
675        let ptr1 = allocator.alloc(100, 8).unwrap();
676        let ptr2 = allocator.alloc(100, 8).unwrap();
677
678        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
679    }
680
681    #[test]
682    fn test_arena_drop_epoch() {
683        let allocator = ArenaAllocator::new().unwrap();
684
685        let initial_mem = allocator.total_allocated();
686
687        let epoch1 = allocator.new_epoch().unwrap();
688        // Allocate some memory in the new epoch
689        {
690            let arena = allocator.arena(epoch1).unwrap();
691            arena.alloc(10000, 8).unwrap();
692        }
693
694        let after_alloc = allocator.total_allocated();
695        assert!(after_alloc > initial_mem);
696
697        // Drop the epoch
698        allocator.drop_epoch(epoch1);
699
700        // Memory should decrease
701        let after_drop = allocator.total_allocated();
702        assert!(after_drop < after_alloc);
703    }
704
705    #[test]
706    fn test_arena_stats() {
707        let arena = Arena::with_chunk_size(EpochId::new(5), 4096).unwrap();
708
709        let stats = arena.stats();
710        assert_eq!(stats.epoch, EpochId::new(5));
711        assert_eq!(stats.chunk_count, 1);
712        assert_eq!(stats.total_allocated, 4096);
713        assert_eq!(stats.total_used, 0);
714
715        arena.alloc(100, 8).unwrap();
716        let stats = arena.stats();
717        assert!(stats.total_used >= 100);
718    }
719}
720
721#[cfg(all(test, feature = "tiered-storage"))]
722mod tiered_storage_tests {
723    use super::*;
724
725    #[test]
726    fn test_alloc_value_with_offset_basic() {
727        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
728
729        let (offset1, val1) = arena.alloc_value_with_offset(42u64).unwrap();
730        let (offset2, val2) = arena.alloc_value_with_offset(100u64).unwrap();
731
732        // First allocation should be at offset 0 (aligned)
733        assert_eq!(offset1, 0);
734        // Second allocation should be after the first
735        assert!(offset2 > offset1);
736        assert!(offset2 >= std::mem::size_of::<u64>() as u32);
737
738        // Values should be correct
739        assert_eq!(*val1, 42);
740        assert_eq!(*val2, 100);
741
742        // Mutation should work
743        *val1 = 999;
744        assert_eq!(*val1, 999);
745    }
746
747    #[test]
748    fn test_read_at_basic() {
749        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
750
751        let (offset, _) = arena.alloc_value_with_offset(12345u64).unwrap();
752
753        // Read it back
754        // SAFETY: offset was returned by alloc_value_with_offset for the same type and arena
755        let value: &u64 = unsafe { arena.read_at(offset) };
756        assert_eq!(*value, 12345);
757    }
758
759    #[test]
760    fn test_read_at_mut_basic() {
761        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
762
763        let (offset, _) = arena.alloc_value_with_offset(42u64).unwrap();
764
765        // Read and modify
766        // SAFETY: offset was returned by alloc_value_with_offset for the same type and arena
767        let value: &mut u64 = unsafe { arena.read_at_mut(offset) };
768        assert_eq!(*value, 42);
769        *value = 100;
770
771        // Verify modification persisted
772        // SAFETY: offset was returned by alloc_value_with_offset for the same type and arena
773        let value: &u64 = unsafe { arena.read_at(offset) };
774        assert_eq!(*value, 100);
775    }
776
777    #[test]
778    fn test_alloc_value_with_offset_struct() {
779        #[derive(Debug, Clone, PartialEq)]
780        struct TestNode {
781            id: u64,
782            name: [u8; 32],
783            value: i32,
784        }
785
786        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
787
788        let node = TestNode {
789            id: 12345,
790            name: [b'A'; 32],
791            value: -999,
792        };
793
794        let (offset, stored) = arena.alloc_value_with_offset(node.clone()).unwrap();
795        assert_eq!(stored.id, 12345);
796        assert_eq!(stored.value, -999);
797
798        // Read it back
799        // SAFETY: offset was returned by alloc_value_with_offset for the same type and arena
800        let read: &TestNode = unsafe { arena.read_at(offset) };
801        assert_eq!(read.id, node.id);
802        assert_eq!(read.name, node.name);
803        assert_eq!(read.value, node.value);
804    }
805
806    #[test]
807    fn test_alloc_value_with_offset_alignment() {
808        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
809
810        // Allocate a byte first to potentially misalign
811        let (offset1, _) = arena.alloc_value_with_offset(1u8).unwrap();
812        assert_eq!(offset1, 0);
813
814        // Now allocate a u64 which requires 8-byte alignment
815        let (offset2, val) = arena.alloc_value_with_offset(42u64).unwrap();
816
817        // offset2 should be 8-byte aligned
818        assert_eq!(offset2 % 8, 0);
819        assert_eq!(*val, 42);
820    }
821
822    #[test]
823    fn test_alloc_value_with_offset_multiple() {
824        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
825
826        let mut offsets = Vec::new();
827        for i in 0..100u64 {
828            let (offset, val) = arena.alloc_value_with_offset(i).unwrap();
829            offsets.push(offset);
830            assert_eq!(*val, i);
831        }
832
833        // All offsets should be unique and in ascending order
834        for window in offsets.windows(2) {
835            assert!(window[0] < window[1]);
836        }
837
838        // Read all values back
839        for (i, offset) in offsets.iter().enumerate() {
840            // SAFETY: offset was returned by alloc_value_with_offset for the same type and arena
841            let val: &u64 = unsafe { arena.read_at(*offset) };
842            assert_eq!(*val, i as u64);
843        }
844    }
845
846    #[test]
847    fn test_arena_allocator_with_offset() {
848        let allocator = ArenaAllocator::with_chunk_size(4096).unwrap();
849
850        let epoch = allocator.current_epoch();
851        let arena = allocator.arena(epoch).unwrap();
852
853        let (offset, val) = arena.alloc_value_with_offset(42u64).unwrap();
854        assert_eq!(*val, 42);
855
856        // SAFETY: offset was returned by alloc_value_with_offset for the same type and arena
857        let read: &u64 = unsafe { arena.read_at(offset) };
858        assert_eq!(*read, 42);
859    }
860
861    #[test]
862    #[cfg(debug_assertions)]
863    #[should_panic(expected = "exceeds chunk used bytes")]
864    fn test_read_at_out_of_bounds() {
865        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
866        let (_offset, _) = arena.alloc_value_with_offset(42u64).unwrap();
867
868        // Read way past the allocated region: should panic in debug
869        // SAFETY: intentionally invalid offset to test debug assertion
870        unsafe {
871            let _: &u64 = arena.read_at(4000);
872        }
873    }
874
875    #[test]
876    #[cfg(debug_assertions)]
877    #[should_panic(expected = "is not aligned")]
878    fn test_read_at_misaligned() {
879        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
880        // Allocate a u8 at offset 0
881        let (_offset, _) = arena.alloc_value_with_offset(0xFFu8).unwrap();
882        // Also allocate some bytes so offset 1 is within used range
883        let _ = arena.alloc_value_with_offset(0u64).unwrap();
884
885        // Try to read a u64 at offset 1 (misaligned for u64)
886        // SAFETY: intentionally misaligned offset to test debug assertion
887        unsafe {
888            let _: &u64 = arena.read_at(1);
889        }
890    }
891
892    #[test]
893    #[cfg(not(miri))] // parking_lot uses integer-to-pointer casts incompatible with Miri strict provenance
894    fn test_concurrent_read_stress() {
895        use std::sync::Arc;
896
897        let arena = Arc::new(Arena::with_chunk_size(EpochId::INITIAL, 1024 * 1024).unwrap());
898        let num_threads = 8;
899        let values_per_thread = 1000;
900
901        // Each thread allocates values and records offsets
902        let mut all_offsets = Vec::new();
903        for t in 0..num_threads {
904            let base = (t * values_per_thread) as u64;
905            let mut offsets = Vec::with_capacity(values_per_thread);
906            for i in 0..values_per_thread as u64 {
907                let (offset, _) = arena.alloc_value_with_offset(base + i).unwrap();
908                offsets.push(offset);
909            }
910            all_offsets.push(offsets);
911        }
912
913        // Now read all values back concurrently from multiple threads
914        let mut handles = Vec::new();
915        for (t, offsets) in all_offsets.into_iter().enumerate() {
916            let arena = Arc::clone(&arena);
917            let base = (t * values_per_thread) as u64;
918            handles.push(std::thread::spawn(move || {
919                for (i, offset) in offsets.iter().enumerate() {
920                    // SAFETY: offset was returned by alloc_value_with_offset for the same type and arena
921                    let val: &u64 = unsafe { arena.read_at(*offset) };
922                    assert_eq!(*val, base + i as u64);
923                }
924            }));
925        }
926
927        for handle in handles {
928            handle.join().expect("Thread panicked");
929        }
930    }
931
932    #[test]
933    fn test_alloc_value_with_offset_insufficient_space() {
934        // Create a tiny arena where a large allocation will fail
935        let arena = Arena::with_chunk_size(EpochId::INITIAL, 64).unwrap();
936
937        // Fill up the chunk
938        let _ = arena.alloc_value_with_offset([0u8; 48]).unwrap();
939
940        // This should return InsufficientSpace, not panic
941        let result = arena.alloc_value_with_offset([0u8; 32]);
942        assert!(result.is_err());
943    }
944
945    #[test]
946    fn test_multi_type_interleaved() {
947        #[derive(Debug, Clone, PartialEq)]
948        #[repr(C)]
949        struct Record {
950            id: u64,
951            flags: u32,
952            weight: f32,
953        }
954
955        let arena = Arena::with_chunk_size(EpochId::INITIAL, 4096).unwrap();
956
957        // Interleave different types
958        let (off_u8, _) = arena.alloc_value_with_offset(0xAAu8).unwrap();
959        let (off_u32, _) = arena.alloc_value_with_offset(0xBBBBu32).unwrap();
960        let (off_u64, _) = arena.alloc_value_with_offset(0xCCCCCCCCu64).unwrap();
961        let (off_rec, _) = arena
962            .alloc_value_with_offset(Record {
963                id: 42,
964                flags: 0xFF,
965                weight: std::f32::consts::PI,
966            })
967            .unwrap();
968
969        // Read them all back
970        // SAFETY: all offsets were returned by alloc_value_with_offset for matching types and arena
971        unsafe {
972            assert_eq!(*arena.read_at::<u8>(off_u8), 0xAA);
973            assert_eq!(*arena.read_at::<u32>(off_u32), 0xBBBB);
974            assert_eq!(*arena.read_at::<u64>(off_u64), 0xCCCCCCCC);
975
976            let rec: &Record = arena.read_at(off_rec);
977            assert_eq!(rec.id, 42);
978            assert_eq!(rec.flags, 0xFF);
979            assert!((rec.weight - std::f32::consts::PI).abs() < 0.001);
980        }
981    }
982}