dynamo-llm 1.0.2

Dynamo LLM Library
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! # Arena Allocator
//!
//! This module provides an arena allocator for generally heap-like allocations.
//! An [`ArenaAllocator`] can be create by taking ownership of a [`Storage`] instance.
//!
//! The [`ArenaAllocator`] allocates memory contiguous regions using the [`offset_allocator`] crate,
//! which builds on  [Sebastian Aaltonen's ArenaAllocator](https://github.com/sebbbi/ArenaAllocator)
//!
//! ## Usage
//!
//! TODO: provide rust example

use super::{Storage, StorageError};
use offset_allocator::{Allocation, Allocator};
use std::sync::{Arc, Mutex};

#[derive(Debug, thiserror::Error)]
pub enum ArenaError {
    #[error("Page size must be a power of 2")]
    PageSizeNotAligned,

    #[error("Allocation failed")]
    AllocationFailed,

    #[error("Failed to convert pages to u32")]
    PagesNotConvertible,

    #[error("Storage not registered with NIXL")]
    NotRegisteredWithNixl,

    #[error("Storage error: {0}")]
    StorageError(#[from] StorageError),
}

/// Arena allocator backed by an instance of a [`Storage`] object.
///
/// This struct wraps an [`Allocator`] from the [`offset_allocator`] crate,
/// and provides methods for allocating memory from the storage.
///
/// The allocator is thread-safe, and the storage is shared between the allocator and the buffers.
#[derive(Clone)]
pub struct ArenaAllocator<S: Storage> {
    storage: Arc<S>,
    allocator: Arc<Mutex<Allocator>>,
    page_size: u64,
}

impl<S: Storage> std::fmt::Debug for ArenaAllocator<S> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "ArenaAllocator {{ storage: {:?}, page_size: {} }}",
            self.storage, self.page_size
        )
    }
}

/// A buffer allocated from an [`ArenaAllocator`].
///
/// This struct wraps an [`Allocation`] from the [`offset_allocator`] crate,
/// and provides methods for interacting with the allocated memory.
///
/// The buffer is backed by a [`Storage`] object, and the allocation is freed when the buffer is dropped.
pub struct ArenaBuffer<S: Storage> {
    offset: u64,
    address: u64,
    requested_size: usize,
    storage: Arc<S>,
    allocation: Allocation,
    allocator: Arc<Mutex<Allocator>>,
}

impl<S: Storage> ArenaAllocator<S> {
    /// Create a new [`ArenaAllocator`] from a [`Storage`] object and a page size.
    ///
    /// The page size must be a power of two.
    ///
    /// The allocator will divide the storage into pages and allocations will consist of a set of contiguous
    /// pages whose aggregate size is greater than or equal to the requested size.
    ///
    /// The allocator is thread-safe, and the storage is shared between the allocator and the buffers.
    pub fn new(storage: S, page_size: usize) -> Result<Self, ArenaError> {
        let storage = Arc::new(storage);

        if !page_size.is_power_of_two() {
            return Err(ArenaError::PageSizeNotAligned);
        }

        // divide storage into pages,
        // round down such that all pages are fully and any remaining bytes are discarded
        let pages = storage.size() / page_size;

        let allocator = Allocator::new(
            pages
                .try_into()
                .map_err(|_| ArenaError::PagesNotConvertible)?,
        );

        let allocator = Arc::new(Mutex::new(allocator));

        Ok(Self {
            storage,
            allocator,
            page_size: page_size as u64,
        })
    }

    /// Allocate a new [`ArenaBuffer`] from the allocator.
    pub fn allocate(&self, size: usize) -> Result<ArenaBuffer<S>, ArenaError> {
        let size = size as u64;
        let pages = size.div_ceil(self.page_size);

        let allocation = self
            .allocator
            .lock()
            .unwrap()
            .allocate(pages.try_into().map_err(|_| ArenaError::AllocationFailed)?)
            .ok_or(ArenaError::AllocationFailed)?;

        let offset = allocation.offset as u64 * self.page_size;
        let address = self.storage.addr() + offset;

        debug_assert!(address + size <= self.storage.addr() + self.storage.size() as u64);

        Ok(ArenaBuffer {
            offset,
            address,
            requested_size: size as usize,
            allocation,
            storage: self.storage.clone(),
            allocator: self.allocator.clone(),
        })
    }
}

impl<S: Storage> std::fmt::Debug for ArenaBuffer<S> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "ArenaBuffer {{ addr {}, size: {}, kind: {:?}, allocator: {:p}}}",
            self.address,
            self.requested_size,
            self.storage.storage_type(),
            Arc::as_ptr(&self.storage)
        )
    }
}

impl<S: Storage> ArenaBuffer<S> {
    /// Starting address of the buffer
    pub fn address(&self) -> u64 {
        self.address
    }

    /// Size of the buffer
    pub fn size(&self) -> usize {
        self.requested_size
    }
}

mod nixl {
    use super::super::nixl::*;
    use super::super::*;
    use super::*;

    impl<S: Storage> ArenaBuffer<S>
    where
        S: NixlRegisterableStorage,
    {
        /// Create a [`NixlRemoteDescriptor`] from the buffer.
        pub fn nixl_remote_descriptor(&self) -> Result<NixlRemoteDescriptor, ArenaError> {
            let agent = self.storage.nixl_agent_name();

            match agent {
                Some(agent) => {
                    // update storage with the buffer address and size
                    let storage = NixlStorage::from_storage_with_offset(
                        self.storage.as_ref(),
                        self.offset as usize,
                        self.requested_size,
                    )?;

                    Ok(NixlRemoteDescriptor::new(storage, agent))
                }
                _ => Err(ArenaError::NotRegisteredWithNixl),
            }
        }
    }

    impl<S: Storage> MemoryRegion for ArenaBuffer<S>
    where
        S: MemoryRegion,
    {
        unsafe fn as_ptr(&self) -> *const u8 {
            unsafe { Storage::as_ptr(self.storage.as_ref()) }
        }

        fn size(&self) -> usize {
            Storage::size(self.storage.as_ref())
        }
    }

    impl<S: Storage> NixlDescriptor for ArenaBuffer<S>
    where
        S: NixlDescriptor,
    {
        fn mem_type(&self) -> MemType {
            NixlDescriptor::mem_type(self.storage.as_ref())
        }

        fn device_id(&self) -> u64 {
            NixlDescriptor::device_id(self.storage.as_ref())
        }
    }
}

impl<S: Storage> Drop for ArenaBuffer<S> {
    fn drop(&mut self) {
        self.allocator.lock().unwrap().free(self.allocation);
    }
}

#[cfg(test)]
mod tests {
    use assert_matches::assert_matches;

    use super::*;
    use crate::block_manager::storage::SystemStorage;

    const PAGE_SIZE: usize = 4096;
    const PAGE_COUNT: usize = 10;
    const TOTAL_STORAGE_SIZE: usize = PAGE_SIZE * PAGE_COUNT;

    fn create_allocator() -> ArenaAllocator<SystemStorage> {
        let storage = SystemStorage::new(TOTAL_STORAGE_SIZE).unwrap();
        ArenaAllocator::new(storage, PAGE_SIZE).unwrap()
    }

    #[test]
    /// Tests successful creation of an `ArenaAllocator` with valid page size.
    /// Verifies that `ArenaAllocator::new` returns `Ok`.
    fn test_arena_allocator_new_success() {
        let storage = SystemStorage::new(TOTAL_STORAGE_SIZE).unwrap();
        let allocator_result = ArenaAllocator::new(storage, PAGE_SIZE);
        assert!(allocator_result.is_ok());
    }

    #[test]
    /// Tests `ArenaAllocator` creation with an invalid page size (not a power of 2).
    /// Verifies that `ArenaAllocator::new` returns an `ArenaError::PageSizeNotAligned` error.
    fn test_arena_allocator_new_invalid_page_size() {
        let storage = SystemStorage::new(TOTAL_STORAGE_SIZE).unwrap();
        let allocator_result = ArenaAllocator::new(storage, PAGE_SIZE + 1);
        assert!(allocator_result.is_err());
        assert_matches!(allocator_result, Err(ArenaError::PageSizeNotAligned));
    }

    #[test]
    /// Tests allocation of a single buffer that is a multiple of the page size.
    /// Verifies that the allocation is successful, the buffer has the correct size,
    /// and its address is the start of the storage area (as it's the first allocation).
    fn test_allocate_single_buffer() {
        let allocator = create_allocator();
        let buffer_size = PAGE_SIZE * 2;
        let buffer_result = allocator.allocate(buffer_size);
        assert!(buffer_result.is_ok());
        let buffer = buffer_result.unwrap();
        assert_eq!(buffer.size(), buffer_size);
        assert_eq!(buffer.address(), allocator.storage.addr()); // First allocation starts at addr
    }

    #[test]
    /// Tests allocation of multiple buffers of varying sizes (multiples of page size).
    /// Verifies that allocations are successful, buffers have correct sizes, and their
    /// addresses are correctly offset from each other based on previous allocations.
    fn test_allocate_multiple_buffers() {
        let allocator = create_allocator();
        let buffer_size1 = PAGE_SIZE * 2;
        let buffer1_result = allocator.allocate(buffer_size1);
        assert!(buffer1_result.is_ok());
        let buffer1 = buffer1_result.unwrap();
        assert_eq!(buffer1.size(), buffer_size1);
        assert_eq!(buffer1.address(), allocator.storage.addr());

        let buffer_size2 = PAGE_SIZE * 3;
        let buffer2_result = allocator.allocate(buffer_size2);
        assert!(buffer2_result.is_ok());
        let buffer2 = buffer2_result.unwrap();
        assert_eq!(buffer2.size(), buffer_size2);
        assert_eq!(
            buffer2.address(),
            allocator.storage.addr() + buffer_size1 as u64
        );
    }

    #[test]
    /// Tests allocation of a single buffer that consumes the entire storage space.
    /// Verifies that the allocation is successful and the buffer has the correct size.
    fn test_allocate_exact_size() {
        let allocator = create_allocator();
        let buffer_size = TOTAL_STORAGE_SIZE;
        let buffer_result = allocator.allocate(buffer_size);
        assert!(buffer_result.is_ok());
        let buffer = buffer_result.unwrap();
        assert_eq!(buffer.size(), buffer_size);
    }

    #[test]
    /// Tests an attempt to allocate a buffer larger than the total available storage.
    /// Verifies that the allocation fails with `ArenaError::AllocationFailed`.
    fn test_allocate_too_large() {
        let allocator = create_allocator();
        let buffer_size = TOTAL_STORAGE_SIZE + PAGE_SIZE;
        let buffer_result = allocator.allocate(buffer_size);
        assert!(buffer_result.is_err());
        assert_matches!(buffer_result, Err(ArenaError::AllocationFailed));
    }

    #[test]
    /// Tests the `Drop` implementation of `ArenaBuffer` for freeing allocated pages.
    /// It allocates a buffer, lets it go out of scope (triggering `drop`), and then
    /// attempts to reallocate a buffer of the same size. This second allocation should
    /// succeed and reuse the initially allocated space, starting at the storage address.
    fn test_buffer_drop_and_reallocate() {
        let allocator = create_allocator();
        // we can not allocate two buffers of `buffer_size` as it will exceed the total storage size
        // if the memory is properly returned, then we should be able to reallocate the same size buffer
        let buffer_size = PAGE_SIZE * 6;

        {
            let buffer1 = allocator.allocate(buffer_size).unwrap();
            assert_eq!(buffer1.size(), buffer_size);
            assert_eq!(buffer1.address(), allocator.storage.addr());
        } // buffer1 is dropped here, freeing its pages

        // Try to allocate a new buffer of the same size, it should succeed and reuse the space
        let buffer2_result = allocator.allocate(buffer_size);
        assert!(buffer2_result.is_ok());
        let buffer2 = buffer2_result.unwrap();
        assert_eq!(buffer2.size(), buffer_size);
        assert_eq!(buffer2.address(), allocator.storage.addr()); // Should be at the start again
    }

    #[test]
    /// Tests filling the arena with two buffers that together consume all available pages
    /// and then attempting one more small allocation, which should fail.
    /// Verifies that after the allocator is full, `ArenaError::AllocationFailed` is returned.
    fn test_allocate_fill_and_fail() {
        let allocator = create_allocator();
        let buffer_size_half = TOTAL_STORAGE_SIZE / 2; // Each takes 5 pages

        let buffer1 = allocator.allocate(buffer_size_half).unwrap();
        assert_eq!(buffer1.size(), buffer_size_half);

        let buffer2 = allocator.allocate(buffer_size_half).unwrap();
        assert_eq!(buffer2.size(), buffer_size_half);
        assert_eq!(
            buffer2.address(),
            allocator.storage.addr() + buffer_size_half as u64
        );

        // Now try to allocate one more page, should fail
        let buffer3_result = allocator.allocate(PAGE_SIZE);
        assert!(buffer3_result.is_err());
        assert_matches!(buffer3_result, Err(ArenaError::AllocationFailed));
    }

    #[test]
    /// Tests allocation of a single byte.
    /// Verifies that the allocation is successful and the buffer reports its size as 1.
    /// The actual page consumption is tested behaviorally in exhaustion tests.
    fn test_allocate_non_page_aligned_single_byte() {
        let allocator = create_allocator();
        let buffer = allocator.allocate(1).unwrap();
        assert_eq!(buffer.size(), 1);
        // Internal page allocation is behaviorally tested by exhaustion tests
    }

    #[test]
    /// Tests allocation of a size that is one byte less than a full page.
    /// Verifies that the allocation is successful and the buffer reports the correct size.
    /// The actual page consumption is tested behaviorally in exhaustion tests.
    fn test_allocate_non_page_aligned_almost_full_page() {
        let allocator = create_allocator();
        let buffer = allocator.allocate(PAGE_SIZE - 1).unwrap();
        assert_eq!(buffer.size(), PAGE_SIZE - 1);
    }

    #[test]
    /// Tests allocation of a size that is one byte more than a full page.
    /// Verifies that the allocation is successful and the buffer reports the correct size.
    /// This will consume two pages, which is tested behaviorally in exhaustion tests.
    fn test_allocate_non_page_aligned_just_over_one_page() {
        let allocator = create_allocator();
        let buffer = allocator.allocate(PAGE_SIZE + 1).unwrap();
        assert_eq!(buffer.size(), PAGE_SIZE + 1);
    }

    #[test]
    /// Tests a specific scenario of non-page-aligned allocations leading to arena exhaustion.
    /// Allocates `(PAGE_COUNT / 2 * PAGE_SIZE) + 1` bytes. This requires `(PAGE_COUNT / 2) + 1` pages.
    /// The first allocation should succeed. The second allocation of the same size should fail
    /// because not enough pages remain, verifying the page rounding and consumption logic.
    fn test_allocate_half_plus_one_byte_twice_exhausts_arena() {
        let allocator = create_allocator();
        let allocation_size = (PAGE_COUNT / 2 * PAGE_SIZE) + 1;
        // This allocation will require (PAGE_COUNT / 2) + 1 pages.
        // For PAGE_COUNT = 10, this is 5 * PAGE_SIZE + 1 bytes, requiring 6 pages.

        let buffer1_result = allocator.allocate(allocation_size);
        assert!(buffer1_result.is_ok(), "First allocation should succeed");
        let buffer1 = buffer1_result.unwrap();
        assert_eq!(buffer1.size(), allocation_size);
        let pages_for_first_alloc = (allocation_size as u64).div_ceil(allocator.page_size);
        assert_eq!(pages_for_first_alloc, (PAGE_COUNT / 2 + 1) as u64);

        // Second allocation of the same size should fail because we don't have enough pages left.
        // Remaining pages = PAGE_COUNT - pages_for_first_alloc
        // For PAGE_COUNT = 10, remaining = 10 - 6 = 4 pages.
        // We need (PAGE_COUNT / 2 + 1) = 6 pages.
        let buffer2_result = allocator.allocate(allocation_size);
        assert!(
            buffer2_result.is_err(),
            "Second allocation should fail due to insufficient pages"
        );
        assert_matches!(buffer2_result, Err(ArenaError::AllocationFailed));
    }

    #[test]
    /// Tests filling the arena with multiple non-page-aligned allocations that each consume more
    /// than one page due to rounding (specifically, `PAGE_SIZE + 1` bytes, consuming 2 pages each).
    /// After filling the arena based on this consumption, it verifies that a subsequent small
    /// allocation fails with `ArenaError::AllocationFailed`.
    fn test_fill_with_non_aligned_and_fail() {
        let allocator = create_allocator();
        // This test verifies that multiple small allocations, each consuming slightly more than one page
        // (thus taking two pages from the underlying offset_allocator), correctly fill the arena.
        // Let's allocate (PAGE_SIZE + 1) multiple times. Each will take 2 pages.

        let single_alloc_size = PAGE_SIZE + 1; // Will take 2 pages
        let num_possible_allocs = PAGE_COUNT / 2; // e.g., 10 / 2 = 5 such allocations

        let mut allocated_buffers = Vec::with_capacity(num_possible_allocs);

        for i in 0..num_possible_allocs {
            let buffer_result = allocator.allocate(single_alloc_size);
            assert!(buffer_result.is_ok(), "Allocation {} should succeed", i + 1);
            let buffer = buffer_result.unwrap();
            assert_eq!(buffer.size(), single_alloc_size);
            allocated_buffers.push(buffer);
        }

        // At this point, all pages should be consumed (num_possible_allocs * 2 pages)
        // So, allocating even 1 byte should fail.
        let final_alloc_result = allocator.allocate(1);
        assert!(
            final_alloc_result.is_err(),
            "Final allocation of 1 byte should fail as arena is full"
        );
        assert_matches!(final_alloc_result, Err(ArenaError::AllocationFailed));
    }
}