Skip to main content

graphos_common/memory/
arena.rs

1//! Epoch-based arena allocator for structural sharing.
2//!
3//! The arena allocator is the core memory management strategy for Graphos.
4//! It provides:
5//! - Fast bump-pointer allocation
6//! - Epoch-based versioning for MVCC
7//! - Bulk deallocation when epochs become unreachable
8
9use crate::types::EpochId;
10use parking_lot::RwLock;
11use std::alloc::{alloc, dealloc, Layout};
12use std::ptr::NonNull;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15/// Default chunk size for arena allocations (1 MB).
16const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
17
18/// A memory chunk in the arena.
19struct Chunk {
20    /// Pointer to the start of the chunk.
21    ptr: NonNull<u8>,
22    /// Total capacity of the chunk.
23    capacity: usize,
24    /// Current allocation offset.
25    offset: AtomicUsize,
26}
27
28impl Chunk {
29    /// Creates a new chunk with the given capacity.
30    fn new(capacity: usize) -> Self {
31        let layout = Layout::from_size_align(capacity, 16).expect("Invalid layout");
32        // SAFETY: We're allocating a valid layout
33        let ptr = unsafe { alloc(layout) };
34        let ptr = NonNull::new(ptr).expect("Allocation failed");
35
36        Self {
37            ptr,
38            capacity,
39            offset: AtomicUsize::new(0),
40        }
41    }
42
43    /// Tries to allocate `size` bytes with the given alignment.
44    /// Returns None if there's not enough space.
45    fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
46        loop {
47            let current = self.offset.load(Ordering::Relaxed);
48
49            // Calculate aligned offset
50            let aligned = (current + align - 1) & !(align - 1);
51            let new_offset = aligned + size;
52
53            if new_offset > self.capacity {
54                return None;
55            }
56
57            // Try to reserve the space
58            match self.offset.compare_exchange_weak(
59                current,
60                new_offset,
61                Ordering::AcqRel,
62                Ordering::Relaxed,
63            ) {
64                Ok(_) => {
65                    // SAFETY: We've reserved this range exclusively
66                    let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
67                    return NonNull::new(ptr);
68                }
69                Err(_) => continue, // Retry
70            }
71        }
72    }
73
74    /// Returns the amount of memory used in this chunk.
75    fn used(&self) -> usize {
76        self.offset.load(Ordering::Relaxed)
77    }
78
79    /// Returns the remaining capacity in this chunk.
80    fn remaining(&self) -> usize {
81        self.capacity - self.used()
82    }
83}
84
85impl Drop for Chunk {
86    fn drop(&mut self) {
87        let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
88        // SAFETY: We allocated this memory with the same layout
89        unsafe { dealloc(self.ptr.as_ptr(), layout) };
90    }
91}
92
93// SAFETY: Chunk uses atomic operations for thread-safe allocation
94unsafe impl Send for Chunk {}
95unsafe impl Sync for Chunk {}
96
97/// An epoch-aware arena allocator.
98///
99/// The arena allocates memory in large chunks and uses bump-pointer allocation
100/// for fast allocation. Each arena is associated with an epoch, enabling
101/// structural sharing for MVCC.
102pub struct Arena {
103    /// The epoch this arena belongs to.
104    epoch: EpochId,
105    /// List of memory chunks.
106    chunks: RwLock<Vec<Chunk>>,
107    /// Default chunk size for new allocations.
108    chunk_size: usize,
109    /// Total bytes allocated.
110    total_allocated: AtomicUsize,
111}
112
113impl Arena {
114    /// Creates a new arena for the given epoch.
115    #[must_use]
116    pub fn new(epoch: EpochId) -> Self {
117        Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
118    }
119
120    /// Creates a new arena with a custom chunk size.
121    #[must_use]
122    pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Self {
123        let initial_chunk = Chunk::new(chunk_size);
124        Self {
125            epoch,
126            chunks: RwLock::new(vec![initial_chunk]),
127            chunk_size,
128            total_allocated: AtomicUsize::new(chunk_size),
129        }
130    }
131
132    /// Returns the epoch this arena belongs to.
133    #[must_use]
134    pub fn epoch(&self) -> EpochId {
135        self.epoch
136    }
137
138    /// Allocates `size` bytes with the given alignment.
139    ///
140    /// # Panics
141    ///
142    /// Panics if allocation fails (out of memory).
143    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
144        // First try to allocate from existing chunks
145        {
146            let chunks = self.chunks.read();
147            for chunk in chunks.iter().rev() {
148                if let Some(ptr) = chunk.try_alloc(size, align) {
149                    return ptr;
150                }
151            }
152        }
153
154        // Need a new chunk
155        self.alloc_new_chunk(size, align)
156    }
157
158    /// Allocates a value of type T.
159    pub fn alloc_value<T>(&self, value: T) -> &mut T {
160        let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>());
161        // SAFETY: We've allocated the correct size and alignment
162        unsafe {
163            let typed_ptr = ptr.as_ptr() as *mut T;
164            typed_ptr.write(value);
165            &mut *typed_ptr
166        }
167    }
168
169    /// Allocates a slice of values.
170    pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> &mut [T] {
171        if values.is_empty() {
172            return &mut [];
173        }
174
175        let size = std::mem::size_of::<T>() * values.len();
176        let align = std::mem::align_of::<T>();
177        let ptr = self.alloc(size, align);
178
179        // SAFETY: We've allocated the correct size and alignment
180        unsafe {
181            let typed_ptr = ptr.as_ptr() as *mut T;
182            std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
183            std::slice::from_raw_parts_mut(typed_ptr, values.len())
184        }
185    }
186
187    /// Allocates a new chunk and performs the allocation.
188    fn alloc_new_chunk(&self, size: usize, align: usize) -> NonNull<u8> {
189        let chunk_size = self.chunk_size.max(size + align);
190        let chunk = Chunk::new(chunk_size);
191
192        self.total_allocated
193            .fetch_add(chunk_size, Ordering::Relaxed);
194
195        let ptr = chunk
196            .try_alloc(size, align)
197            .expect("Fresh chunk should have space");
198
199        let mut chunks = self.chunks.write();
200        chunks.push(chunk);
201
202        ptr
203    }
204
205    /// Returns the total memory allocated by this arena.
206    #[must_use]
207    pub fn total_allocated(&self) -> usize {
208        self.total_allocated.load(Ordering::Relaxed)
209    }
210
211    /// Returns the total memory used (not just allocated capacity).
212    #[must_use]
213    pub fn total_used(&self) -> usize {
214        let chunks = self.chunks.read();
215        chunks.iter().map(Chunk::used).sum()
216    }
217
218    /// Returns statistics about this arena.
219    #[must_use]
220    pub fn stats(&self) -> ArenaStats {
221        let chunks = self.chunks.read();
222        ArenaStats {
223            epoch: self.epoch,
224            chunk_count: chunks.len(),
225            total_allocated: self.total_allocated.load(Ordering::Relaxed),
226            total_used: chunks.iter().map(Chunk::used).sum(),
227        }
228    }
229}
230
231/// Statistics about an arena.
232#[derive(Debug, Clone)]
233pub struct ArenaStats {
234    /// The epoch this arena belongs to.
235    pub epoch: EpochId,
236    /// Number of chunks allocated.
237    pub chunk_count: usize,
238    /// Total bytes allocated.
239    pub total_allocated: usize,
240    /// Total bytes used.
241    pub total_used: usize,
242}
243
244/// The global arena allocator managing multiple epochs.
245pub struct ArenaAllocator {
246    /// Map of epochs to arenas.
247    arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
248    /// Current epoch.
249    current_epoch: AtomicUsize,
250    /// Default chunk size.
251    chunk_size: usize,
252}
253
254impl ArenaAllocator {
255    /// Creates a new arena allocator.
256    #[must_use]
257    pub fn new() -> Self {
258        Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
259    }
260
261    /// Creates a new arena allocator with a custom chunk size.
262    #[must_use]
263    pub fn with_chunk_size(chunk_size: usize) -> Self {
264        let allocator = Self {
265            arenas: RwLock::new(hashbrown::HashMap::new()),
266            current_epoch: AtomicUsize::new(0),
267            chunk_size,
268        };
269
270        // Create the initial epoch
271        let epoch = EpochId::INITIAL;
272        allocator
273            .arenas
274            .write()
275            .insert(epoch, Arena::with_chunk_size(epoch, chunk_size));
276
277        allocator
278    }
279
280    /// Returns the current epoch.
281    #[must_use]
282    pub fn current_epoch(&self) -> EpochId {
283        EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
284    }
285
286    /// Creates a new epoch and returns its ID.
287    pub fn new_epoch(&self) -> EpochId {
288        let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
289        let epoch = EpochId::new(new_id);
290
291        let arena = Arena::with_chunk_size(epoch, self.chunk_size);
292        self.arenas.write().insert(epoch, arena);
293
294        epoch
295    }
296
297    /// Gets the arena for a specific epoch.
298    ///
299    /// # Panics
300    ///
301    /// Panics if the epoch doesn't exist.
302    pub fn arena(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
303        parking_lot::RwLockReadGuard::map(self.arenas.read(), |arenas| {
304            arenas.get(&epoch).expect("Epoch should exist")
305        })
306    }
307
308    /// Allocates in the current epoch.
309    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
310        let epoch = self.current_epoch();
311        let arenas = self.arenas.read();
312        arenas.get(&epoch).expect("Current epoch exists").alloc(size, align)
313    }
314
315    /// Drops an epoch, freeing all its memory.
316    ///
317    /// This should only be called when no readers are using this epoch.
318    pub fn drop_epoch(&self, epoch: EpochId) {
319        self.arenas.write().remove(&epoch);
320    }
321
322    /// Returns total memory allocated across all epochs.
323    #[must_use]
324    pub fn total_allocated(&self) -> usize {
325        self.arenas
326            .read()
327            .values()
328            .map(Arena::total_allocated)
329            .sum()
330    }
331}
332
333impl Default for ArenaAllocator {
334    fn default() -> Self {
335        Self::new()
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_arena_basic_allocation() {
345        let arena = Arena::new(EpochId::INITIAL);
346
347        // Allocate some bytes
348        let ptr1 = arena.alloc(100, 8);
349        let ptr2 = arena.alloc(200, 8);
350
351        // Pointers should be different
352        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
353    }
354
355    #[test]
356    fn test_arena_value_allocation() {
357        let arena = Arena::new(EpochId::INITIAL);
358
359        let value = arena.alloc_value(42u64);
360        assert_eq!(*value, 42);
361
362        *value = 100;
363        assert_eq!(*value, 100);
364    }
365
366    #[test]
367    fn test_arena_slice_allocation() {
368        let arena = Arena::new(EpochId::INITIAL);
369
370        let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]);
371        assert_eq!(slice, &[1, 2, 3, 4, 5]);
372
373        slice[0] = 10;
374        assert_eq!(slice[0], 10);
375    }
376
377    #[test]
378    fn test_arena_large_allocation() {
379        let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024);
380
381        // Allocate something larger than the chunk size
382        let ptr = arena.alloc(2048, 8);
383        assert!(!ptr.as_ptr().is_null());
384
385        // Should have created a new chunk
386        assert!(arena.stats().chunk_count >= 2);
387    }
388
389    #[test]
390    fn test_arena_allocator_epochs() {
391        let allocator = ArenaAllocator::new();
392
393        let epoch0 = allocator.current_epoch();
394        assert_eq!(epoch0, EpochId::INITIAL);
395
396        let epoch1 = allocator.new_epoch();
397        assert_eq!(epoch1, EpochId::new(1));
398
399        let epoch2 = allocator.new_epoch();
400        assert_eq!(epoch2, EpochId::new(2));
401
402        // Current epoch should be the latest
403        assert_eq!(allocator.current_epoch(), epoch2);
404    }
405
406    #[test]
407    fn test_arena_allocator_allocation() {
408        let allocator = ArenaAllocator::new();
409
410        let ptr1 = allocator.alloc(100, 8);
411        let ptr2 = allocator.alloc(100, 8);
412
413        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
414    }
415
416    #[test]
417    fn test_arena_drop_epoch() {
418        let allocator = ArenaAllocator::new();
419
420        let initial_mem = allocator.total_allocated();
421
422        let epoch1 = allocator.new_epoch();
423        // Allocate some memory in the new epoch
424        {
425            let arena = allocator.arena(epoch1);
426            arena.alloc(10000, 8);
427        }
428
429        let after_alloc = allocator.total_allocated();
430        assert!(after_alloc > initial_mem);
431
432        // Drop the epoch
433        allocator.drop_epoch(epoch1);
434
435        // Memory should decrease
436        let after_drop = allocator.total_allocated();
437        assert!(after_drop < after_alloc);
438    }
439
440    #[test]
441    fn test_arena_stats() {
442        let arena = Arena::with_chunk_size(EpochId::new(5), 4096);
443
444        let stats = arena.stats();
445        assert_eq!(stats.epoch, EpochId::new(5));
446        assert_eq!(stats.chunk_count, 1);
447        assert_eq!(stats.total_allocated, 4096);
448        assert_eq!(stats.total_used, 0);
449
450        arena.alloc(100, 8);
451        let stats = arena.stats();
452        assert!(stats.total_used >= 100);
453    }
454}