Skip to main content

graphos_common/memory/
arena.rs

1//! Epoch-based arena allocator for structural sharing.
2//!
3//! The arena allocator is the core memory management strategy for Graphos.
4//! It provides:
5//! - Fast bump-pointer allocation
6//! - Epoch-based versioning for MVCC
7//! - Bulk deallocation when epochs become unreachable
8
9// Arena allocators require unsafe code for memory management
10#![allow(unsafe_code)]
11
12use std::alloc::{Layout, alloc, dealloc};
13use std::ptr::NonNull;
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16use parking_lot::RwLock;
17
18use crate::types::EpochId;
19
20/// Default chunk size for arena allocations (1 MB).
21const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
22
23/// A memory chunk in the arena.
24struct Chunk {
25    /// Pointer to the start of the chunk.
26    ptr: NonNull<u8>,
27    /// Total capacity of the chunk.
28    capacity: usize,
29    /// Current allocation offset.
30    offset: AtomicUsize,
31}
32
33impl Chunk {
34    /// Creates a new chunk with the given capacity.
35    fn new(capacity: usize) -> Self {
36        let layout = Layout::from_size_align(capacity, 16).expect("Invalid layout");
37        // SAFETY: We're allocating a valid layout
38        let ptr = unsafe { alloc(layout) };
39        let ptr = NonNull::new(ptr).expect("Allocation failed");
40
41        Self {
42            ptr,
43            capacity,
44            offset: AtomicUsize::new(0),
45        }
46    }
47
48    /// Tries to allocate `size` bytes with the given alignment.
49    /// Returns None if there's not enough space.
50    fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
51        loop {
52            let current = self.offset.load(Ordering::Relaxed);
53
54            // Calculate aligned offset
55            let aligned = (current + align - 1) & !(align - 1);
56            let new_offset = aligned + size;
57
58            if new_offset > self.capacity {
59                return None;
60            }
61
62            // Try to reserve the space
63            match self.offset.compare_exchange_weak(
64                current,
65                new_offset,
66                Ordering::AcqRel,
67                Ordering::Relaxed,
68            ) {
69                Ok(_) => {
70                    // SAFETY: We've reserved this range exclusively
71                    let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
72                    return NonNull::new(ptr);
73                }
74                Err(_) => continue, // Retry
75            }
76        }
77    }
78
79    /// Returns the amount of memory used in this chunk.
80    fn used(&self) -> usize {
81        self.offset.load(Ordering::Relaxed)
82    }
83
84    /// Returns the remaining capacity in this chunk.
85    #[allow(dead_code)]
86    fn remaining(&self) -> usize {
87        self.capacity - self.used()
88    }
89}
90
91impl Drop for Chunk {
92    fn drop(&mut self) {
93        let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
94        // SAFETY: We allocated this memory with the same layout
95        unsafe { dealloc(self.ptr.as_ptr(), layout) };
96    }
97}
98
99// SAFETY: Chunk uses atomic operations for thread-safe allocation
100unsafe impl Send for Chunk {}
101unsafe impl Sync for Chunk {}
102
103/// An epoch-aware arena allocator.
104///
105/// The arena allocates memory in large chunks and uses bump-pointer allocation
106/// for fast allocation. Each arena is associated with an epoch, enabling
107/// structural sharing for MVCC.
108pub struct Arena {
109    /// The epoch this arena belongs to.
110    epoch: EpochId,
111    /// List of memory chunks.
112    chunks: RwLock<Vec<Chunk>>,
113    /// Default chunk size for new allocations.
114    chunk_size: usize,
115    /// Total bytes allocated.
116    total_allocated: AtomicUsize,
117}
118
119impl Arena {
120    /// Creates a new arena for the given epoch.
121    #[must_use]
122    pub fn new(epoch: EpochId) -> Self {
123        Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
124    }
125
126    /// Creates a new arena with a custom chunk size.
127    #[must_use]
128    pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Self {
129        let initial_chunk = Chunk::new(chunk_size);
130        Self {
131            epoch,
132            chunks: RwLock::new(vec![initial_chunk]),
133            chunk_size,
134            total_allocated: AtomicUsize::new(chunk_size),
135        }
136    }
137
138    /// Returns the epoch this arena belongs to.
139    #[must_use]
140    pub fn epoch(&self) -> EpochId {
141        self.epoch
142    }
143
144    /// Allocates `size` bytes with the given alignment.
145    ///
146    /// # Panics
147    ///
148    /// Panics if allocation fails (out of memory).
149    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
150        // First try to allocate from existing chunks
151        {
152            let chunks = self.chunks.read();
153            for chunk in chunks.iter().rev() {
154                if let Some(ptr) = chunk.try_alloc(size, align) {
155                    return ptr;
156                }
157            }
158        }
159
160        // Need a new chunk
161        self.alloc_new_chunk(size, align)
162    }
163
164    /// Allocates a value of type T.
165    pub fn alloc_value<T>(&self, value: T) -> &mut T {
166        let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>());
167        // SAFETY: We've allocated the correct size and alignment
168        unsafe {
169            let typed_ptr = ptr.as_ptr() as *mut T;
170            typed_ptr.write(value);
171            &mut *typed_ptr
172        }
173    }
174
175    /// Allocates a slice of values.
176    pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> &mut [T] {
177        if values.is_empty() {
178            return &mut [];
179        }
180
181        let size = std::mem::size_of::<T>() * values.len();
182        let align = std::mem::align_of::<T>();
183        let ptr = self.alloc(size, align);
184
185        // SAFETY: We've allocated the correct size and alignment
186        unsafe {
187            let typed_ptr = ptr.as_ptr() as *mut T;
188            std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
189            std::slice::from_raw_parts_mut(typed_ptr, values.len())
190        }
191    }
192
193    /// Allocates a new chunk and performs the allocation.
194    fn alloc_new_chunk(&self, size: usize, align: usize) -> NonNull<u8> {
195        let chunk_size = self.chunk_size.max(size + align);
196        let chunk = Chunk::new(chunk_size);
197
198        self.total_allocated
199            .fetch_add(chunk_size, Ordering::Relaxed);
200
201        let ptr = chunk
202            .try_alloc(size, align)
203            .expect("Fresh chunk should have space");
204
205        let mut chunks = self.chunks.write();
206        chunks.push(chunk);
207
208        ptr
209    }
210
211    /// Returns the total memory allocated by this arena.
212    #[must_use]
213    pub fn total_allocated(&self) -> usize {
214        self.total_allocated.load(Ordering::Relaxed)
215    }
216
217    /// Returns the total memory used (not just allocated capacity).
218    #[must_use]
219    pub fn total_used(&self) -> usize {
220        let chunks = self.chunks.read();
221        chunks.iter().map(Chunk::used).sum()
222    }
223
224    /// Returns statistics about this arena.
225    #[must_use]
226    pub fn stats(&self) -> ArenaStats {
227        let chunks = self.chunks.read();
228        ArenaStats {
229            epoch: self.epoch,
230            chunk_count: chunks.len(),
231            total_allocated: self.total_allocated.load(Ordering::Relaxed),
232            total_used: chunks.iter().map(Chunk::used).sum(),
233        }
234    }
235}
236
237/// Statistics about an arena.
238#[derive(Debug, Clone)]
239pub struct ArenaStats {
240    /// The epoch this arena belongs to.
241    pub epoch: EpochId,
242    /// Number of chunks allocated.
243    pub chunk_count: usize,
244    /// Total bytes allocated.
245    pub total_allocated: usize,
246    /// Total bytes used.
247    pub total_used: usize,
248}
249
250/// The global arena allocator managing multiple epochs.
251pub struct ArenaAllocator {
252    /// Map of epochs to arenas.
253    arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
254    /// Current epoch.
255    current_epoch: AtomicUsize,
256    /// Default chunk size.
257    chunk_size: usize,
258}
259
260impl ArenaAllocator {
261    /// Creates a new arena allocator.
262    #[must_use]
263    pub fn new() -> Self {
264        Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
265    }
266
267    /// Creates a new arena allocator with a custom chunk size.
268    #[must_use]
269    pub fn with_chunk_size(chunk_size: usize) -> Self {
270        let allocator = Self {
271            arenas: RwLock::new(hashbrown::HashMap::new()),
272            current_epoch: AtomicUsize::new(0),
273            chunk_size,
274        };
275
276        // Create the initial epoch
277        let epoch = EpochId::INITIAL;
278        allocator
279            .arenas
280            .write()
281            .insert(epoch, Arena::with_chunk_size(epoch, chunk_size));
282
283        allocator
284    }
285
286    /// Returns the current epoch.
287    #[must_use]
288    pub fn current_epoch(&self) -> EpochId {
289        EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
290    }
291
292    /// Creates a new epoch and returns its ID.
293    pub fn new_epoch(&self) -> EpochId {
294        let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
295        let epoch = EpochId::new(new_id);
296
297        let arena = Arena::with_chunk_size(epoch, self.chunk_size);
298        self.arenas.write().insert(epoch, arena);
299
300        epoch
301    }
302
303    /// Gets the arena for a specific epoch.
304    ///
305    /// # Panics
306    ///
307    /// Panics if the epoch doesn't exist.
308    pub fn arena(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
309        parking_lot::RwLockReadGuard::map(self.arenas.read(), |arenas| {
310            arenas.get(&epoch).expect("Epoch should exist")
311        })
312    }
313
314    /// Allocates in the current epoch.
315    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
316        let epoch = self.current_epoch();
317        let arenas = self.arenas.read();
318        arenas
319            .get(&epoch)
320            .expect("Current epoch exists")
321            .alloc(size, align)
322    }
323
324    /// Drops an epoch, freeing all its memory.
325    ///
326    /// This should only be called when no readers are using this epoch.
327    pub fn drop_epoch(&self, epoch: EpochId) {
328        self.arenas.write().remove(&epoch);
329    }
330
331    /// Returns total memory allocated across all epochs.
332    #[must_use]
333    pub fn total_allocated(&self) -> usize {
334        self.arenas
335            .read()
336            .values()
337            .map(Arena::total_allocated)
338            .sum()
339    }
340}
341
342impl Default for ArenaAllocator {
343    fn default() -> Self {
344        Self::new()
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[test]
353    fn test_arena_basic_allocation() {
354        let arena = Arena::new(EpochId::INITIAL);
355
356        // Allocate some bytes
357        let ptr1 = arena.alloc(100, 8);
358        let ptr2 = arena.alloc(200, 8);
359
360        // Pointers should be different
361        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
362    }
363
364    #[test]
365    fn test_arena_value_allocation() {
366        let arena = Arena::new(EpochId::INITIAL);
367
368        let value = arena.alloc_value(42u64);
369        assert_eq!(*value, 42);
370
371        *value = 100;
372        assert_eq!(*value, 100);
373    }
374
375    #[test]
376    fn test_arena_slice_allocation() {
377        let arena = Arena::new(EpochId::INITIAL);
378
379        let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]);
380        assert_eq!(slice, &[1, 2, 3, 4, 5]);
381
382        slice[0] = 10;
383        assert_eq!(slice[0], 10);
384    }
385
386    #[test]
387    fn test_arena_large_allocation() {
388        let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024);
389
390        // Allocate something larger than the chunk size
391        let _ptr = arena.alloc(2048, 8);
392
393        // Should have created a new chunk
394        assert!(arena.stats().chunk_count >= 2);
395    }
396
397    #[test]
398    fn test_arena_allocator_epochs() {
399        let allocator = ArenaAllocator::new();
400
401        let epoch0 = allocator.current_epoch();
402        assert_eq!(epoch0, EpochId::INITIAL);
403
404        let epoch1 = allocator.new_epoch();
405        assert_eq!(epoch1, EpochId::new(1));
406
407        let epoch2 = allocator.new_epoch();
408        assert_eq!(epoch2, EpochId::new(2));
409
410        // Current epoch should be the latest
411        assert_eq!(allocator.current_epoch(), epoch2);
412    }
413
414    #[test]
415    fn test_arena_allocator_allocation() {
416        let allocator = ArenaAllocator::new();
417
418        let ptr1 = allocator.alloc(100, 8);
419        let ptr2 = allocator.alloc(100, 8);
420
421        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
422    }
423
424    #[test]
425    fn test_arena_drop_epoch() {
426        let allocator = ArenaAllocator::new();
427
428        let initial_mem = allocator.total_allocated();
429
430        let epoch1 = allocator.new_epoch();
431        // Allocate some memory in the new epoch
432        {
433            let arena = allocator.arena(epoch1);
434            arena.alloc(10000, 8);
435        }
436
437        let after_alloc = allocator.total_allocated();
438        assert!(after_alloc > initial_mem);
439
440        // Drop the epoch
441        allocator.drop_epoch(epoch1);
442
443        // Memory should decrease
444        let after_drop = allocator.total_allocated();
445        assert!(after_drop < after_alloc);
446    }
447
448    #[test]
449    fn test_arena_stats() {
450        let arena = Arena::with_chunk_size(EpochId::new(5), 4096);
451
452        let stats = arena.stats();
453        assert_eq!(stats.epoch, EpochId::new(5));
454        assert_eq!(stats.chunk_count, 1);
455        assert_eq!(stats.total_allocated, 4096);
456        assert_eq!(stats.total_used, 0);
457
458        arena.alloc(100, 8);
459        let stats = arena.stats();
460        assert!(stats.total_used >= 100);
461    }
462}