Skip to main content

grafeo_common/memory/
arena.rs

1//! Epoch-based arena allocator for MVCC.
2//!
3//! This is how Grafeo manages memory for versioned data. Each epoch gets its
4//! own arena, and when all readers from an old epoch finish, we free the whole
5//! thing at once. Much faster than tracking individual allocations.
6//!
7//! Use [`ArenaAllocator`] to manage multiple epochs, or [`Arena`] directly
8//! if you're working with a single epoch.
9
10// Arena allocators require unsafe code for memory management
11#![allow(unsafe_code)]
12
13use std::alloc::{Layout, alloc, dealloc};
14use std::ptr::NonNull;
15use std::sync::atomic::{AtomicUsize, Ordering};
16
17use parking_lot::RwLock;
18
19use crate::types::EpochId;
20
21/// Default chunk size for arena allocations (1 MB).
22const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
23
24/// A memory chunk in the arena.
25struct Chunk {
26    /// Pointer to the start of the chunk.
27    ptr: NonNull<u8>,
28    /// Total capacity of the chunk.
29    capacity: usize,
30    /// Current allocation offset.
31    offset: AtomicUsize,
32}
33
34impl Chunk {
35    /// Creates a new chunk with the given capacity.
36    fn new(capacity: usize) -> Self {
37        let layout = Layout::from_size_align(capacity, 16).expect("Invalid layout");
38        // SAFETY: We're allocating a valid layout
39        let ptr = unsafe { alloc(layout) };
40        let ptr = NonNull::new(ptr).expect("Allocation failed");
41
42        Self {
43            ptr,
44            capacity,
45            offset: AtomicUsize::new(0),
46        }
47    }
48
49    /// Tries to allocate `size` bytes with the given alignment.
50    /// Returns None if there's not enough space.
51    fn try_alloc(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
52        loop {
53            let current = self.offset.load(Ordering::Relaxed);
54
55            // Calculate aligned offset
56            let aligned = (current + align - 1) & !(align - 1);
57            let new_offset = aligned + size;
58
59            if new_offset > self.capacity {
60                return None;
61            }
62
63            // Try to reserve the space
64            match self.offset.compare_exchange_weak(
65                current,
66                new_offset,
67                Ordering::AcqRel,
68                Ordering::Relaxed,
69            ) {
70                Ok(_) => {
71                    // SAFETY: We've reserved this range exclusively
72                    let ptr = unsafe { self.ptr.as_ptr().add(aligned) };
73                    return NonNull::new(ptr);
74                }
75                Err(_) => continue, // Retry
76            }
77        }
78    }
79
80    /// Returns the amount of memory used in this chunk.
81    fn used(&self) -> usize {
82        self.offset.load(Ordering::Relaxed)
83    }
84
85    /// Returns the remaining capacity in this chunk.
86    #[allow(dead_code)]
87    fn remaining(&self) -> usize {
88        self.capacity - self.used()
89    }
90}
91
92impl Drop for Chunk {
93    fn drop(&mut self) {
94        let layout = Layout::from_size_align(self.capacity, 16).expect("Invalid layout");
95        // SAFETY: We allocated this memory with the same layout
96        unsafe { dealloc(self.ptr.as_ptr(), layout) };
97    }
98}
99
100// SAFETY: Chunk uses atomic operations for thread-safe allocation
101unsafe impl Send for Chunk {}
102unsafe impl Sync for Chunk {}
103
104/// A single epoch's memory arena.
105///
106/// Allocates by bumping a pointer forward - extremely fast. You can't free
107/// individual allocations; instead, drop the whole arena when the epoch
108/// is no longer needed.
109///
110/// Thread-safe: multiple threads can allocate concurrently using atomics.
111pub struct Arena {
112    /// The epoch this arena belongs to.
113    epoch: EpochId,
114    /// List of memory chunks.
115    chunks: RwLock<Vec<Chunk>>,
116    /// Default chunk size for new allocations.
117    chunk_size: usize,
118    /// Total bytes allocated.
119    total_allocated: AtomicUsize,
120}
121
122impl Arena {
123    /// Creates a new arena for the given epoch.
124    #[must_use]
125    pub fn new(epoch: EpochId) -> Self {
126        Self::with_chunk_size(epoch, DEFAULT_CHUNK_SIZE)
127    }
128
129    /// Creates a new arena with a custom chunk size.
130    #[must_use]
131    pub fn with_chunk_size(epoch: EpochId, chunk_size: usize) -> Self {
132        let initial_chunk = Chunk::new(chunk_size);
133        Self {
134            epoch,
135            chunks: RwLock::new(vec![initial_chunk]),
136            chunk_size,
137            total_allocated: AtomicUsize::new(chunk_size),
138        }
139    }
140
141    /// Returns the epoch this arena belongs to.
142    #[must_use]
143    pub fn epoch(&self) -> EpochId {
144        self.epoch
145    }
146
147    /// Allocates `size` bytes with the given alignment.
148    ///
149    /// # Panics
150    ///
151    /// Panics if allocation fails (out of memory).
152    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
153        // First try to allocate from existing chunks
154        {
155            let chunks = self.chunks.read();
156            for chunk in chunks.iter().rev() {
157                if let Some(ptr) = chunk.try_alloc(size, align) {
158                    return ptr;
159                }
160            }
161        }
162
163        // Need a new chunk
164        self.alloc_new_chunk(size, align)
165    }
166
167    /// Allocates a value of type T.
168    pub fn alloc_value<T>(&self, value: T) -> &mut T {
169        let ptr = self.alloc(std::mem::size_of::<T>(), std::mem::align_of::<T>());
170        // SAFETY: We've allocated the correct size and alignment
171        unsafe {
172            let typed_ptr = ptr.as_ptr() as *mut T;
173            typed_ptr.write(value);
174            &mut *typed_ptr
175        }
176    }
177
178    /// Allocates a slice of values.
179    pub fn alloc_slice<T: Copy>(&self, values: &[T]) -> &mut [T] {
180        if values.is_empty() {
181            return &mut [];
182        }
183
184        let size = std::mem::size_of::<T>() * values.len();
185        let align = std::mem::align_of::<T>();
186        let ptr = self.alloc(size, align);
187
188        // SAFETY: We've allocated the correct size and alignment
189        unsafe {
190            let typed_ptr = ptr.as_ptr() as *mut T;
191            std::ptr::copy_nonoverlapping(values.as_ptr(), typed_ptr, values.len());
192            std::slice::from_raw_parts_mut(typed_ptr, values.len())
193        }
194    }
195
196    /// Allocates a new chunk and performs the allocation.
197    fn alloc_new_chunk(&self, size: usize, align: usize) -> NonNull<u8> {
198        let chunk_size = self.chunk_size.max(size + align);
199        let chunk = Chunk::new(chunk_size);
200
201        self.total_allocated
202            .fetch_add(chunk_size, Ordering::Relaxed);
203
204        let ptr = chunk
205            .try_alloc(size, align)
206            .expect("Fresh chunk should have space");
207
208        let mut chunks = self.chunks.write();
209        chunks.push(chunk);
210
211        ptr
212    }
213
214    /// Returns the total memory allocated by this arena.
215    #[must_use]
216    pub fn total_allocated(&self) -> usize {
217        self.total_allocated.load(Ordering::Relaxed)
218    }
219
220    /// Returns the total memory used (not just allocated capacity).
221    #[must_use]
222    pub fn total_used(&self) -> usize {
223        let chunks = self.chunks.read();
224        chunks.iter().map(Chunk::used).sum()
225    }
226
227    /// Returns statistics about this arena.
228    #[must_use]
229    pub fn stats(&self) -> ArenaStats {
230        let chunks = self.chunks.read();
231        ArenaStats {
232            epoch: self.epoch,
233            chunk_count: chunks.len(),
234            total_allocated: self.total_allocated.load(Ordering::Relaxed),
235            total_used: chunks.iter().map(Chunk::used).sum(),
236        }
237    }
238}
239
240/// Statistics about an arena.
241#[derive(Debug, Clone)]
242pub struct ArenaStats {
243    /// The epoch this arena belongs to.
244    pub epoch: EpochId,
245    /// Number of chunks allocated.
246    pub chunk_count: usize,
247    /// Total bytes allocated.
248    pub total_allocated: usize,
249    /// Total bytes used.
250    pub total_used: usize,
251}
252
253/// Manages arenas across multiple epochs.
254///
255/// Use this to create new epochs, allocate in the current epoch, and
256/// clean up old epochs when they're no longer needed.
257pub struct ArenaAllocator {
258    /// Map of epochs to arenas.
259    arenas: RwLock<hashbrown::HashMap<EpochId, Arena>>,
260    /// Current epoch.
261    current_epoch: AtomicUsize,
262    /// Default chunk size.
263    chunk_size: usize,
264}
265
266impl ArenaAllocator {
267    /// Creates a new arena allocator.
268    #[must_use]
269    pub fn new() -> Self {
270        Self::with_chunk_size(DEFAULT_CHUNK_SIZE)
271    }
272
273    /// Creates a new arena allocator with a custom chunk size.
274    #[must_use]
275    pub fn with_chunk_size(chunk_size: usize) -> Self {
276        let allocator = Self {
277            arenas: RwLock::new(hashbrown::HashMap::new()),
278            current_epoch: AtomicUsize::new(0),
279            chunk_size,
280        };
281
282        // Create the initial epoch
283        let epoch = EpochId::INITIAL;
284        allocator
285            .arenas
286            .write()
287            .insert(epoch, Arena::with_chunk_size(epoch, chunk_size));
288
289        allocator
290    }
291
292    /// Returns the current epoch.
293    #[must_use]
294    pub fn current_epoch(&self) -> EpochId {
295        EpochId::new(self.current_epoch.load(Ordering::Acquire) as u64)
296    }
297
298    /// Creates a new epoch and returns its ID.
299    pub fn new_epoch(&self) -> EpochId {
300        let new_id = self.current_epoch.fetch_add(1, Ordering::AcqRel) as u64 + 1;
301        let epoch = EpochId::new(new_id);
302
303        let arena = Arena::with_chunk_size(epoch, self.chunk_size);
304        self.arenas.write().insert(epoch, arena);
305
306        epoch
307    }
308
309    /// Gets the arena for a specific epoch.
310    ///
311    /// # Panics
312    ///
313    /// Panics if the epoch doesn't exist.
314    pub fn arena(&self, epoch: EpochId) -> impl std::ops::Deref<Target = Arena> + '_ {
315        parking_lot::RwLockReadGuard::map(self.arenas.read(), |arenas| {
316            arenas.get(&epoch).expect("Epoch should exist")
317        })
318    }
319
320    /// Allocates in the current epoch.
321    pub fn alloc(&self, size: usize, align: usize) -> NonNull<u8> {
322        let epoch = self.current_epoch();
323        let arenas = self.arenas.read();
324        arenas
325            .get(&epoch)
326            .expect("Current epoch exists")
327            .alloc(size, align)
328    }
329
330    /// Drops an epoch, freeing all its memory.
331    ///
332    /// This should only be called when no readers are using this epoch.
333    pub fn drop_epoch(&self, epoch: EpochId) {
334        self.arenas.write().remove(&epoch);
335    }
336
337    /// Returns total memory allocated across all epochs.
338    #[must_use]
339    pub fn total_allocated(&self) -> usize {
340        self.arenas
341            .read()
342            .values()
343            .map(Arena::total_allocated)
344            .sum()
345    }
346}
347
348impl Default for ArenaAllocator {
349    fn default() -> Self {
350        Self::new()
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_arena_basic_allocation() {
360        let arena = Arena::new(EpochId::INITIAL);
361
362        // Allocate some bytes
363        let ptr1 = arena.alloc(100, 8);
364        let ptr2 = arena.alloc(200, 8);
365
366        // Pointers should be different
367        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
368    }
369
370    #[test]
371    fn test_arena_value_allocation() {
372        let arena = Arena::new(EpochId::INITIAL);
373
374        let value = arena.alloc_value(42u64);
375        assert_eq!(*value, 42);
376
377        *value = 100;
378        assert_eq!(*value, 100);
379    }
380
381    #[test]
382    fn test_arena_slice_allocation() {
383        let arena = Arena::new(EpochId::INITIAL);
384
385        let slice = arena.alloc_slice(&[1u32, 2, 3, 4, 5]);
386        assert_eq!(slice, &[1, 2, 3, 4, 5]);
387
388        slice[0] = 10;
389        assert_eq!(slice[0], 10);
390    }
391
392    #[test]
393    fn test_arena_large_allocation() {
394        let arena = Arena::with_chunk_size(EpochId::INITIAL, 1024);
395
396        // Allocate something larger than the chunk size
397        let _ptr = arena.alloc(2048, 8);
398
399        // Should have created a new chunk
400        assert!(arena.stats().chunk_count >= 2);
401    }
402
403    #[test]
404    fn test_arena_allocator_epochs() {
405        let allocator = ArenaAllocator::new();
406
407        let epoch0 = allocator.current_epoch();
408        assert_eq!(epoch0, EpochId::INITIAL);
409
410        let epoch1 = allocator.new_epoch();
411        assert_eq!(epoch1, EpochId::new(1));
412
413        let epoch2 = allocator.new_epoch();
414        assert_eq!(epoch2, EpochId::new(2));
415
416        // Current epoch should be the latest
417        assert_eq!(allocator.current_epoch(), epoch2);
418    }
419
420    #[test]
421    fn test_arena_allocator_allocation() {
422        let allocator = ArenaAllocator::new();
423
424        let ptr1 = allocator.alloc(100, 8);
425        let ptr2 = allocator.alloc(100, 8);
426
427        assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
428    }
429
430    #[test]
431    fn test_arena_drop_epoch() {
432        let allocator = ArenaAllocator::new();
433
434        let initial_mem = allocator.total_allocated();
435
436        let epoch1 = allocator.new_epoch();
437        // Allocate some memory in the new epoch
438        {
439            let arena = allocator.arena(epoch1);
440            arena.alloc(10000, 8);
441        }
442
443        let after_alloc = allocator.total_allocated();
444        assert!(after_alloc > initial_mem);
445
446        // Drop the epoch
447        allocator.drop_epoch(epoch1);
448
449        // Memory should decrease
450        let after_drop = allocator.total_allocated();
451        assert!(after_drop < after_alloc);
452    }
453
454    #[test]
455    fn test_arena_stats() {
456        let arena = Arena::with_chunk_size(EpochId::new(5), 4096);
457
458        let stats = arena.stats();
459        assert_eq!(stats.epoch, EpochId::new(5));
460        assert_eq!(stats.chunk_count, 1);
461        assert_eq!(stats.total_allocated, 4096);
462        assert_eq!(stats.total_used, 0);
463
464        arena.alloc(100, 8);
465        let stats = arena.stats();
466        assert!(stats.total_used >= 100);
467    }
468}