Skip to main content

dynamo_memory/
arena.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! # Arena Allocator
5//!
6//! This module provides an arena allocator for generally heap-like allocations.
7//! An [`ArenaAllocator`] can be created by taking ownership of a [`MemoryDescriptor`] instance.
8//!
9//! The [`ArenaAllocator`] allocates memory contiguous regions using the [`offset_allocator`] crate,
10//! which builds on [Sebastian Aaltonen's ArenaAllocator](https://github.com/sebbbi/ArenaAllocator)
11
12use crate::StorageKind;
13
14use super::{MemoryDescriptor, StorageError};
15use offset_allocator::{Allocation, Allocator};
16use std::{
17    any::Any,
18    sync::{Arc, Mutex},
19};
20
21/// Errors specific to arena allocation.
22#[derive(Debug, thiserror::Error)]
23#[allow(missing_docs)]
24pub enum ArenaError {
25    #[error("Page size must be a power of 2")]
26    PageSizeNotAligned,
27
28    #[error("Allocation failed")]
29    AllocationFailed,
30
31    #[error("Failed to convert pages to u32")]
32    PagesNotConvertible,
33
34    #[error("Storage error: {0}")]
35    StorageError(#[from] StorageError),
36}
37
38/// Arena allocator backed by an instance of a [`MemoryDescriptor`] object.
39///
40/// This struct wraps an [`Allocator`] from the [`offset_allocator`] crate,
41/// and provides methods for allocating memory from the storage.
42///
43/// The allocator is thread-safe, and the storage is shared between the allocator and the buffers.
44#[derive(Clone)]
45pub struct ArenaAllocator<S: MemoryDescriptor> {
46    storage: Arc<S>,
47    allocator: Arc<Mutex<Allocator>>,
48    page_size: u64,
49}
50
51impl<S: MemoryDescriptor> std::fmt::Debug for ArenaAllocator<S> {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(
54            f,
55            "ArenaAllocator {{ storage: {:?}, page_size: {} }}",
56            self.storage, self.page_size
57        )
58    }
59}
60
61/// A buffer allocated from an [`ArenaAllocator`].
62///
63/// This struct wraps an [`Allocation`] from the [`offset_allocator`] crate,
64/// and provides methods for interacting with the allocated memory.
65///
66/// The buffer is backed by a [`MemoryDescriptor`] object, and the allocation is freed when the buffer is dropped.
67pub struct ArenaBuffer<S: MemoryDescriptor> {
68    /// Byte offset from the start of the backing storage.
69    offset: usize,
70    /// Absolute memory address of this buffer.
71    address: usize,
72    /// User-requested allocation size in bytes.
73    requested_size: usize,
74    /// Shared reference to the backing storage.
75    storage: Arc<S>,
76    /// Internal allocation handle from the offset allocator.
77    allocation: Allocation,
78    /// Shared reference to the allocator for freeing on drop.
79    allocator: Arc<Mutex<Allocator>>,
80}
81
82impl<S: MemoryDescriptor> ArenaAllocator<S> {
83    /// Create a new [`ArenaAllocator`] from a [`MemoryDescriptor`] object and a page size.
84    ///
85    /// The page size must be a power of two.
86    ///
87    /// The allocator will divide the storage into pages and allocations will consist of a set of contiguous
88    /// pages whose aggregate size is greater than or equal to the requested size.
89    ///
90    /// The allocator is thread-safe, and the storage is shared between the allocator and the buffers.
91    pub fn new(storage: S, page_size: usize) -> std::result::Result<Self, ArenaError> {
92        let storage = Arc::new(storage);
93
94        if !page_size.is_power_of_two() {
95            return Err(ArenaError::PageSizeNotAligned);
96        }
97
98        // divide storage into pages,
99        // round down such that all pages are fully and any remaining bytes are discarded
100        let pages = storage.size() / page_size;
101
102        let allocator = Allocator::new(
103            pages
104                .try_into()
105                .map_err(|_| ArenaError::PagesNotConvertible)?,
106        );
107
108        let allocator = Arc::new(Mutex::new(allocator));
109
110        Ok(Self {
111            storage,
112            allocator,
113            page_size: page_size as u64,
114        })
115    }
116
117    /// Allocates a new [`ArenaBuffer`] of the given size from this allocator.
118    ///
119    /// The actual allocation may consume more pages than strictly needed due to
120    /// page-size rounding. Returns [`ArenaError::AllocationFailed`] if there are
121    /// not enough contiguous pages available.
122    pub fn allocate(&self, size: usize) -> std::result::Result<ArenaBuffer<S>, ArenaError> {
123        let size = size as u64;
124        let pages = size.div_ceil(self.page_size);
125
126        let allocation = self
127            .allocator
128            .lock()
129            .unwrap()
130            .allocate(pages.try_into().map_err(|_| ArenaError::AllocationFailed)?)
131            .ok_or(ArenaError::AllocationFailed)?;
132
133        let offset = allocation.offset as u64 * self.page_size;
134        let address = self.storage.addr() + offset as usize;
135
136        debug_assert!(address + size as usize <= self.storage.addr() + self.storage.size());
137
138        Ok(ArenaBuffer {
139            offset: offset as usize,
140            address,
141            requested_size: size as usize,
142            allocation,
143            storage: self.storage.clone(),
144            allocator: self.allocator.clone(),
145        })
146    }
147}
148
149impl<S: MemoryDescriptor> std::fmt::Debug for ArenaBuffer<S> {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        write!(
152            f,
153            "ArenaBuffer {{ addr: {}, size: {}, kind: {:?}, allocator: {:p} }}",
154            self.address,
155            self.requested_size,
156            self.storage.storage_kind(),
157            Arc::as_ptr(&self.storage)
158        )
159    }
160}
161
162impl<S: MemoryDescriptor + 'static> MemoryDescriptor for ArenaBuffer<S> {
163    fn addr(&self) -> usize {
164        self.address
165    }
166    fn size(&self) -> usize {
167        self.requested_size
168    }
169    fn storage_kind(&self) -> StorageKind {
170        self.storage.storage_kind()
171    }
172    fn as_any(&self) -> &dyn Any {
173        self
174    }
175    fn nixl_descriptor(&self) -> Option<NixlDescriptor> {
176        if let Some(mut descriptor) = self.storage.nixl_descriptor() {
177            descriptor.addr = self.addr() as u64;
178            descriptor.size = self.size();
179            Some(descriptor)
180        } else {
181            None
182        }
183    }
184}
185
186// NIXL integration helpers
187use super::nixl::{NixlCompatible, NixlDescriptor, RegisteredView};
188
189impl<S> ArenaBuffer<S>
190where
191    S: MemoryDescriptor + NixlCompatible,
192{
193    /// Create a NIXL descriptor for this buffer with the correct offset and size.
194    ///
195    /// This can be used when the base storage implements NixlCompatible to create
196    /// a descriptor that points to just this buffer's region.
197    pub fn nixl_descriptor(&self) -> Option<NixlDescriptor> {
198        let (base_ptr, _base_size, mem_type, device_id) = self.storage.nixl_params();
199
200        // Calculate the offset pointer
201        let buffer_ptr = unsafe { base_ptr.add(self.offset) };
202
203        Some(NixlDescriptor {
204            addr: buffer_ptr as u64,
205            size: self.requested_size,
206            mem_type,
207            device_id,
208        })
209    }
210}
211
212impl<S> ArenaBuffer<S>
213where
214    S: MemoryDescriptor + RegisteredView,
215{
216    /// Get the agent name from registered storage.
217    ///
218    /// This is a convenience method when using ArenaAllocator with NixlRegistered<T> storage.
219    pub fn agent_name(&self) -> &str {
220        self.storage.agent_name()
221    }
222
223    /// Get a NIXL descriptor that includes registration information.
224    pub fn registered_descriptor(&self) -> NixlDescriptor {
225        let base_descriptor = self.storage.descriptor();
226
227        // Create a new descriptor with adjusted address and size for this buffer
228        NixlDescriptor {
229            addr: base_descriptor.addr + self.offset as u64,
230            size: self.requested_size,
231            mem_type: base_descriptor.mem_type,
232            device_id: base_descriptor.device_id,
233        }
234    }
235}
236
237impl<S: MemoryDescriptor> Drop for ArenaBuffer<S> {
238    fn drop(&mut self) {
239        self.allocator.lock().unwrap().free(self.allocation);
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::SystemStorage;
247
248    const PAGE_SIZE: usize = 4096;
249    const PAGE_COUNT: usize = 10;
250    const TOTAL_STORAGE_SIZE: usize = PAGE_SIZE * PAGE_COUNT;
251
252    fn create_allocator() -> ArenaAllocator<SystemStorage> {
253        let storage = SystemStorage::new(TOTAL_STORAGE_SIZE).unwrap();
254        ArenaAllocator::new(storage, PAGE_SIZE).unwrap()
255    }
256
257    #[test]
258    /// Tests successful creation of an `ArenaAllocator` with valid page size.
259    /// Verifies that `ArenaAllocator::new` returns `Ok`.
260    fn test_arena_allocator_new_success() {
261        let storage = SystemStorage::new(TOTAL_STORAGE_SIZE).unwrap();
262        let allocator_result = ArenaAllocator::new(storage, PAGE_SIZE);
263        assert!(allocator_result.is_ok());
264    }
265
266    #[test]
267    /// Tests `ArenaAllocator` creation with an invalid page size (not a power of 2).
268    /// Verifies that `ArenaAllocator::new` returns an `ArenaError::PageSizeNotAligned` error.
269    fn test_arena_allocator_new_invalid_page_size() {
270        let storage = SystemStorage::new(TOTAL_STORAGE_SIZE).unwrap();
271        let allocator_result = ArenaAllocator::new(storage, PAGE_SIZE + 1);
272        assert!(allocator_result.is_err());
273        assert!(matches!(
274            allocator_result,
275            Err(ArenaError::PageSizeNotAligned)
276        ));
277    }
278
279    #[test]
280    /// Tests allocation of a single buffer that is a multiple of the page size.
281    /// Verifies that the allocation is successful, the buffer has the correct size,
282    /// and its address is the start of the storage area (as it's the first allocation).
283    fn test_allocate_single_buffer() {
284        let allocator = create_allocator();
285        let buffer_size = PAGE_SIZE * 2;
286        let buffer_result = allocator.allocate(buffer_size);
287        assert!(buffer_result.is_ok());
288        let buffer = buffer_result.unwrap();
289        assert_eq!(buffer.size(), buffer_size);
290        assert_eq!(buffer.addr(), allocator.storage.addr()); // First allocation starts at addr
291    }
292
293    #[test]
294    /// Tests allocation of multiple buffers of varying sizes (multiples of page size).
295    /// Verifies that allocations are successful, buffers have correct sizes, and their
296    /// addresses are correctly offset from each other based on previous allocations.
297    fn test_allocate_multiple_buffers() {
298        let allocator = create_allocator();
299        let buffer_size1 = PAGE_SIZE * 2;
300        let buffer1_result = allocator.allocate(buffer_size1);
301        assert!(buffer1_result.is_ok());
302        let buffer1 = buffer1_result.unwrap();
303        assert_eq!(buffer1.size(), buffer_size1);
304        assert_eq!(buffer1.addr(), allocator.storage.addr());
305
306        let buffer_size2 = PAGE_SIZE * 3;
307        let buffer2_result = allocator.allocate(buffer_size2);
308        assert!(buffer2_result.is_ok());
309        let buffer2 = buffer2_result.unwrap();
310        assert_eq!(buffer2.size(), buffer_size2);
311        assert_eq!(buffer2.addr(), allocator.storage.addr() + buffer_size1);
312    }
313
314    #[test]
315    /// Tests allocation of a single buffer that consumes the entire storage space.
316    /// Verifies that the allocation is successful and the buffer has the correct size.
317    fn test_allocate_exact_size() {
318        let allocator = create_allocator();
319        let buffer_size = TOTAL_STORAGE_SIZE;
320        let buffer_result = allocator.allocate(buffer_size);
321        assert!(buffer_result.is_ok());
322        let buffer = buffer_result.unwrap();
323        assert_eq!(buffer.size(), buffer_size);
324    }
325
326    #[test]
327    /// Tests an attempt to allocate a buffer larger than the total available storage.
328    /// Verifies that the allocation fails with `ArenaError::AllocationFailed`.
329    fn test_allocate_too_large() {
330        let allocator = create_allocator();
331        let buffer_size = TOTAL_STORAGE_SIZE + PAGE_SIZE;
332        let buffer_result = allocator.allocate(buffer_size);
333        assert!(buffer_result.is_err());
334        assert!(matches!(buffer_result, Err(ArenaError::AllocationFailed)));
335    }
336
337    #[test]
338    /// Tests the `Drop` implementation of `ArenaBuffer` for freeing allocated pages.
339    /// It allocates a buffer, lets it go out of scope (triggering `drop`), and then
340    /// attempts to reallocate a buffer of the same size. This second allocation should
341    /// succeed and reuse the initially allocated space, starting at the storage address.
342    fn test_buffer_drop_and_reallocate() {
343        let allocator = create_allocator();
344        // we can not allocate two buffers of `buffer_size` as it will exceed the total storage size
345        // if the memory is properly returned, then we should be able to reallocate the same size buffer
346        let buffer_size = PAGE_SIZE * 6;
347
348        {
349            let buffer1 = allocator.allocate(buffer_size).unwrap();
350            assert_eq!(buffer1.size(), buffer_size);
351            assert_eq!(buffer1.addr(), allocator.storage.addr());
352        } // buffer1 is dropped here, freeing its pages
353
354        // Try to allocate a new buffer of the same size, it should succeed and reuse the space
355        let buffer2_result = allocator.allocate(buffer_size);
356        assert!(buffer2_result.is_ok());
357        let buffer2 = buffer2_result.unwrap();
358        assert_eq!(buffer2.size(), buffer_size);
359        assert_eq!(buffer2.addr(), allocator.storage.addr()); // Should be at the start again
360    }
361
362    #[test]
363    /// Tests filling the arena with two buffers that together consume all available pages
364    /// and then attempting one more small allocation, which should fail.
365    /// Verifies that after the allocator is full, `ArenaError::AllocationFailed` is returned.
366    fn test_allocate_fill_and_fail() {
367        let allocator = create_allocator();
368        let buffer_size_half = TOTAL_STORAGE_SIZE / 2; // Each takes 5 pages
369
370        let buffer1 = allocator.allocate(buffer_size_half).unwrap();
371        assert_eq!(buffer1.size(), buffer_size_half);
372
373        let buffer2 = allocator.allocate(buffer_size_half).unwrap();
374        assert_eq!(buffer2.size(), buffer_size_half);
375        assert_eq!(buffer2.addr(), allocator.storage.addr() + buffer_size_half);
376
377        // Now try to allocate one more page, should fail
378        let buffer3_result = allocator.allocate(PAGE_SIZE);
379        assert!(buffer3_result.is_err());
380        assert!(matches!(buffer3_result, Err(ArenaError::AllocationFailed)));
381    }
382
383    #[test]
384    /// Tests allocation of a single byte.
385    /// Verifies that the allocation is successful and the buffer reports its size as 1.
386    /// The actual page consumption is tested behaviorally in exhaustion tests.
387    fn test_allocate_non_page_aligned_single_byte() {
388        let allocator = create_allocator();
389        let buffer = allocator.allocate(1).unwrap();
390        assert_eq!(buffer.size(), 1);
391        // Internal page allocation is behaviorally tested by exhaustion tests
392    }
393
394    #[test]
395    /// Tests allocation of a size that is one byte less than a full page.
396    /// Verifies that the allocation is successful and the buffer reports the correct size.
397    /// The actual page consumption is tested behaviorally in exhaustion tests.
398    fn test_allocate_non_page_aligned_almost_full_page() {
399        let allocator = create_allocator();
400        let buffer = allocator.allocate(PAGE_SIZE - 1).unwrap();
401        assert_eq!(buffer.size(), PAGE_SIZE - 1);
402    }
403
404    #[test]
405    /// Tests allocation of a size that is one byte more than a full page.
406    /// Verifies that the allocation is successful and the buffer reports the correct size.
407    /// This will consume two pages, which is tested behaviorally in exhaustion tests.
408    fn test_allocate_non_page_aligned_just_over_one_page() {
409        let allocator = create_allocator();
410        let buffer = allocator.allocate(PAGE_SIZE + 1).unwrap();
411        assert_eq!(buffer.size(), PAGE_SIZE + 1);
412    }
413
414    #[test]
415    /// Tests a specific scenario of non-page-aligned allocations leading to arena exhaustion.
416    /// Allocates `(PAGE_COUNT / 2 * PAGE_SIZE) + 1` bytes. This requires `(PAGE_COUNT / 2) + 1` pages.
417    /// The first allocation should succeed. The second allocation of the same size should fail
418    /// because not enough pages remain, verifying the page rounding and consumption logic.
419    fn test_allocate_half_plus_one_byte_twice_exhausts_arena() {
420        let allocator = create_allocator();
421        let allocation_size = (PAGE_COUNT / 2 * PAGE_SIZE) + 1;
422        // This allocation will require (PAGE_COUNT / 2) + 1 pages.
423        // For PAGE_COUNT = 10, this is 5 * PAGE_SIZE + 1 bytes, requiring 6 pages.
424
425        let buffer1_result = allocator.allocate(allocation_size);
426        assert!(buffer1_result.is_ok(), "First allocation should succeed");
427        let buffer1 = buffer1_result.unwrap();
428        assert_eq!(buffer1.size(), allocation_size);
429        let pages_for_first_alloc = (allocation_size as u64).div_ceil(allocator.page_size);
430        assert_eq!(pages_for_first_alloc, (PAGE_COUNT / 2 + 1) as u64);
431
432        // Second allocation of the same size should fail because we don't have enough pages left.
433        // Remaining pages = PAGE_COUNT - pages_for_first_alloc
434        // For PAGE_COUNT = 10, remaining = 10 - 6 = 4 pages.
435        // We need (PAGE_COUNT / 2 + 1) = 6 pages.
436        let buffer2_result = allocator.allocate(allocation_size);
437        assert!(
438            buffer2_result.is_err(),
439            "Second allocation should fail due to insufficient pages"
440        );
441        assert!(matches!(buffer2_result, Err(ArenaError::AllocationFailed)));
442    }
443
444    #[test]
445    /// Tests filling the arena with multiple non-page-aligned allocations that each consume more
446    /// than one page due to rounding (specifically, `PAGE_SIZE + 1` bytes, consuming 2 pages each).
447    /// After filling the arena based on this consumption, it verifies that a subsequent small
448    /// allocation fails with `ArenaError::AllocationFailed`.
449    fn test_fill_with_non_aligned_and_fail() {
450        let allocator = create_allocator();
451        // This test verifies that multiple small allocations, each consuming slightly more than one page
452        // (thus taking two pages from the underlying offset_allocator), correctly fill the arena.
453        // Let's allocate (PAGE_SIZE + 1) multiple times. Each will take 2 pages.
454
455        let single_alloc_size = PAGE_SIZE + 1; // Will take 2 pages
456        let num_possible_allocs = PAGE_COUNT / 2; // e.g., 10 / 2 = 5 such allocations
457
458        let mut allocated_buffers = Vec::with_capacity(num_possible_allocs);
459
460        for i in 0..num_possible_allocs {
461            let buffer_result = allocator.allocate(single_alloc_size);
462            assert!(buffer_result.is_ok(), "Allocation {} should succeed", i + 1);
463            let buffer = buffer_result.unwrap();
464            assert_eq!(buffer.size(), single_alloc_size);
465            allocated_buffers.push(buffer);
466        }
467
468        // At this point, all pages should be consumed (num_possible_allocs * 2 pages)
469        // So, allocating even 1 byte should fail.
470        let final_alloc_result = allocator.allocate(1);
471        assert!(
472            final_alloc_result.is_err(),
473            "Final allocation of 1 byte should fail as arena is full"
474        );
475        assert!(matches!(
476            final_alloc_result,
477            Err(ArenaError::AllocationFailed)
478        ));
479    }
480}