Skip to main content

rts_alloc/
allocator.rs

1use crate::free_stack::FreeStack;
2use crate::global_free_list::GlobalFreeList;
3use crate::header::{self, WorkerLocalListHeads, WorkerLocalListPartialFullHeads};
4use crate::linked_list_node::LinkedListNode;
5use crate::remote_free_list::RemoteFreeList;
6use crate::size_classes::{size_class, NUM_SIZE_CLASSES};
7use crate::slab_meta::SlabMeta;
8use crate::sync::Ordering;
9use crate::worker_local_list::WorkerLocalList;
10use crate::{error::Error, header::Header, size_classes::size_class_index};
11use core::mem::offset_of;
12use core::ptr::NonNull;
13use std::fs::File;
14use std::sync::Arc;
15
16pub struct Allocator {
17    base: AllocatorBase,
18    worker_index: u32,
19}
20
21pub struct FreeOnlyAllocator {
22    base: AllocatorBase,
23}
24
25struct MappedRegion {
26    header: NonNull<Header>,
27    file_size: usize,
28}
29
30impl Drop for MappedRegion {
31    fn drop(&mut self) {
32        // SAFETY: The mapped region was created by `map_file` and is valid until drop.
33        let _ = crate::memory_map::unmap_file(self.header.as_ptr().cast(), self.file_size);
34    }
35}
36
37// SAFETY: `MappedRegion` holds an immutable pointer and size for a shared
38// mapping. The backing memory is process-shared and thread-safe access is
39// enforced by allocator logic and atomics in shared metadata; transferring or
40// sharing this handle across threads does not violate aliasing or
41// thread-safety guarantees.
42unsafe impl Send for MappedRegion {}
43// SAFETY: See rationale above for `Send`.
44unsafe impl Sync for MappedRegion {}
45
46#[derive(Clone)]
47struct AllocatorBase {
48    region: Arc<MappedRegion>,
49}
50
51impl Allocator {
52    /// Create a new `Allocator` in the provided file with the given parameters.
53    /// `min_workers` is the minimum number of workers to support.
54    ///
55    /// # Safety
56    /// - `create` must only be called once for a given file. Subsequent calls
57    ///   with the same file must use `join`.
58    pub unsafe fn create(
59        file: &File,
60        file_size: usize,
61        min_workers: u32,
62        slab_size: u32,
63    ) -> Result<Self, Error> {
64        let header = crate::init::create(file, file_size, min_workers, slab_size)?;
65        // SAFETY:
66        // - `header` and `file_size` are trusted arguments from the above create call.
67        let base = unsafe { AllocatorBase::from_mapping(header, file_size) };
68        // SAFETY: `base.header()` points to a valid, initialized header.
69        let worker_index = match unsafe { claim_any_worker_index(base.header()) } {
70            Some(worker_index) => worker_index,
71            None => return Err(Error::NoAvailableWorkers),
72        };
73
74        Allocator::new(base, worker_index)
75    }
76
77    /// Join an existing allocator in the provided file.
78    /// Picks the first available worker slot.
79    ///
80    /// # Note
81    ///
82    /// Prefer [`Self::join_from_existing`] to re-use `mmap`s within the same
83    /// process.
84    pub fn join(file: &File) -> Result<Self, Error> {
85        let (header, file_size) = crate::init::join(file)?;
86        // SAFETY:
87        // - `header` and `file_size` are trusted arguments from the above join call.
88        let base = unsafe { AllocatorBase::from_mapping(header, file_size) };
89        // SAFETY: `base.header()` points to a valid, initialized header.
90        let worker_index = match unsafe { claim_any_worker_index(base.header()) } {
91            Some(worker_index) => worker_index,
92            None => return Err(Error::NoAvailableWorkers),
93        };
94
95        Allocator::new(base, worker_index)
96    }
97
98    /// Join an existing allocator using the same in-process mapping.
99    /// Picks the first available worker slot.
100    pub fn join_from_existing(existing: &Allocator) -> Result<Self, Error> {
101        Self::join_from_base(&existing.base)
102    }
103
104    /// Join an existing free-only allocator using the same in-process mapping.
105    /// Picks the first available worker slot.
106    pub fn join_from_existing_free_only(existing: &FreeOnlyAllocator) -> Result<Self, Error> {
107        Self::join_from_base(&existing.base)
108    }
109
110    /// Join using a shared [`AllocatorBase`].
111    /// Picks the first available worker slot.
112    fn join_from_base(base: &AllocatorBase) -> Result<Self, Error> {
113        // SAFETY: `base.header()` points to a valid, initialized header.
114        let worker_index = match unsafe { claim_any_worker_index(base.header()) } {
115            Some(worker_index) => worker_index,
116            None => return Err(Error::NoAvailableWorkers),
117        };
118        Allocator::new(base.clone(), worker_index)
119    }
120
121    /// Creates a new `Allocator` for the given worker index.
122    fn new(base: AllocatorBase, worker_index: u32) -> Result<Self, Error> {
123        // SAFETY: The header is assumed to be valid and initialized.
124        if worker_index >= unsafe { base.header().as_ref() }.num_workers {
125            return Err(Error::InvalidWorkerIndex);
126        }
127        Ok(Allocator { base, worker_index })
128    }
129}
130
131unsafe impl Send for Allocator {}
132unsafe impl Send for FreeOnlyAllocator {}
133
134impl Drop for Allocator {
135    fn drop(&mut self) {
136        self.release_worker();
137    }
138}
139
140impl FreeOnlyAllocator {
141    /// Join an existing allocator in the provided file.
142    ///
143    /// # Note
144    ///
145    /// Prefer [`Self::join_from_existing`] to re-use `mmap`s within the same
146    /// process.
147    pub fn join(file: &File) -> Result<Self, Error> {
148        let (header, file_size) = crate::init::join(file)?;
149        // SAFETY:
150        // - `header` and `file_size` are trusted arguments from the above join call.
151        Ok(FreeOnlyAllocator {
152            base: unsafe { AllocatorBase::from_mapping(header, file_size) },
153        })
154    }
155
156    /// Join an existing allocator using the same in-process mapping.
157    pub fn join_from_existing(existing: &Allocator) -> Self {
158        Self::from_base(&existing.base)
159    }
160
161    /// Join an existing free-only allocator using the same in-process mapping.
162    pub fn join_from_existing_free_only(existing: &FreeOnlyAllocator) -> Self {
163        Self::from_base(&existing.base)
164    }
165
166    fn from_base(base: &AllocatorBase) -> Self {
167        Self { base: base.clone() }
168    }
169}
170
171impl Allocator {
172    fn release_worker(&self) {
173        self.worker_meta().claimed.store(0, Ordering::Release);
174    }
175
176    /// Allocates a block of memory of the given size.
177    /// If the size is larger than the maximum size class, returns `None`.
178    /// If the allocation fails, returns `None`.
179    pub fn allocate(&self, size: u32) -> Option<NonNull<u8>> {
180        let size_index = size_class_index(size)?;
181
182        // SAFETY: `size_index` is guaranteed to be valid by `size_class_index`.
183        let slab_index = unsafe { self.find_allocatable_slab_index(size_index) }?;
184        // SAFETY:
185        // - `slab_index` is guaranteed to be valid by `find_allocatable_slab_index`.
186        // - `size_index` is guaranteed to be valid by `size_class_index`.
187        unsafe { self.allocate_within_slab(slab_index, size_index) }
188    }
189
190    /// Try to find a suitable slab for allocation.
191    /// If a partial slab assigned to the worker is not found, then try to find
192    /// a slab from the global free list.
193    ///
194    /// # Safety
195    /// - The `size_index` must be a valid index for the size classes.
196    unsafe fn find_allocatable_slab_index(&self, size_index: usize) -> Option<u32> {
197        // SAFETY: `size_index` is guaranteed to be valid by the caller.
198        unsafe { self.worker_local_list_partial(size_index) }
199            .head()
200            .or_else(|| self.take_slab(size_index))
201    }
202
203    /// Attempt to allocate meomry within a slab.
204    /// If the slab is full or the allocation otherwise fails, returns `None`.
205    ///
206    /// # Safety
207    /// - The `slab_index` must be a valid index for the slabs
208    /// - The `size_index` must be a valid index for the size classes.
209    unsafe fn allocate_within_slab(
210        &self,
211        slab_index: u32,
212        size_index: usize,
213    ) -> Option<NonNull<u8>> {
214        // SAFETY: The slab index is guaranteed to be valid by the caller.
215        let mut free_stack = unsafe { self.slab_free_stack(slab_index) };
216        let maybe_index_within_slab = free_stack.pop();
217
218        // If the slab is empty - remove it from the worker's partial list,
219        // and move it to the worker's full list.
220        if free_stack.is_empty() {
221            // SAFETY:
222            // - The `slab_index` is guaranteed to be valid by the caller.
223            // - The `size_index` is guaranteed to be valid by the caller.
224            unsafe {
225                self.worker_local_list_partial(size_index)
226                    .remove(slab_index);
227            }
228            // SAFETY:
229            // - The `slab_index` is guaranteed to be valid by the caller.
230            // - The `size_index` is guaranteed to be valid by the caller.
231            unsafe {
232                self.worker_local_list_full(size_index).push(slab_index);
233            }
234        }
235
236        maybe_index_within_slab.map(|index_within_slab| {
237            // SAFETY: The `slab_index` is guaranteed to be valid by the caller.
238            let slab = unsafe { self.slab(slab_index) };
239            // SAFETY: The `size_index` is guaranteed to be valid by the caller.
240            let size = unsafe { size_class(size_index) };
241            self.worker_meta()
242                .outstanding_allocation_bytes
243                .fetch_add(size as u64, Ordering::Relaxed);
244            slab.byte_add(index_within_slab as usize * size as usize)
245        })
246    }
247
248    /// Attempt to take a slab from the global free list.
249    /// If the global free list is empty, returns `None`.
250    /// If the slab is successfully taken, it will be marked as assigned to the worker.
251    ///
252    /// # Safety
253    /// - The `size_index` must be a valid index for the size claasses.
254    unsafe fn take_slab(&self, size_index: usize) -> Option<u32> {
255        let slab_index = self.global_free_list().pop()?;
256
257        // SAFETY: The slab index is guaranteed to be valid by `pop`.
258        unsafe { self.slab_meta(slab_index).as_ref() }.assign(self.worker_index, size_index);
259        // SAFETY:
260        // - The slab index is guaranteed to be valid by `pop`.
261        // - The size index is guaranteed to be valid by the caller.
262        unsafe {
263            let slab_capacity = self.base.header().as_ref().slab_size / size_class(size_index);
264            self.slab_free_stack(slab_index).reset(slab_capacity as u16);
265        };
266        // SAFETY: The size index is guaranteed to be valid by caller.
267        let mut worker_local_list = unsafe { self.worker_local_list_partial(size_index) };
268        // SAFETY: The slab index is guaranteed to be valid by `pop`.
269        unsafe { worker_local_list.push(slab_index) };
270        Some(slab_index)
271    }
272}
273
274impl Allocator {
275    /// Free a block of memory previously allocated by this allocator.
276    ///
277    /// # Safety
278    /// - The `ptr` must be a valid pointer to a block of memory allocated by this allocator.
279    /// - The `ptr` must not have been freed before.
280    pub unsafe fn free(&self, ptr: NonNull<u8>) {
281        // SAFETY: The pointer is assumed to be valid and allocated by this allocator.
282        let offset = unsafe { self.offset(ptr) };
283        self.free_offset(offset);
284    }
285
286    /// Free a block of memory previously allocated by this allocator.
287    ///
288    /// # Safety
289    /// - The `offset` must be a valid offset within the memory allocated by this allocator.
290    /// - The `offset` must not have been freed before.
291    pub unsafe fn free_offset(&self, offset: usize) {
292        let allocation_indexes = self.find_allocation_indexes(offset);
293
294        // Check if the slab is assigned to this worker.
295        if self.worker_index
296            == unsafe { self.slab_meta(allocation_indexes.slab_index).as_ref() }
297                .assigned_worker
298                .load(Ordering::Acquire)
299        {
300            // SAFETY: The allocation indexes are valid and come from allocator-owned memory.
301            let (size_index, size) = unsafe { self.slab_size_class(allocation_indexes.slab_index) };
302            self.worker_meta()
303                .outstanding_allocation_bytes
304                .fetch_sub(size as u64, Ordering::Relaxed);
305            self.local_free_with_size_index(allocation_indexes, size_index);
306        } else {
307            self.remote_free(allocation_indexes);
308        }
309    }
310
311    fn local_free_with_size_index(&self, allocation_indexes: AllocationIndexes, size_index: usize) {
312        // SAFETY: The allocation indexes are guaranteed to be valid by the caller.
313        let (was_full, is_empty) = unsafe {
314            let mut free_stack = self.slab_free_stack(allocation_indexes.slab_index);
315            let was_full = free_stack.is_empty();
316            free_stack.push(allocation_indexes.index_within_slab);
317            // Names confusing:
318            // - When the **free** stack is empty, the slab is full of allocations.
319            // - When the **free** stack is full, the slab has no allocations available.
320            (was_full, free_stack.is_full())
321        };
322
323        match (was_full, is_empty) {
324            (true, true) => {
325                // The slab was full and is now empty - this cannot happen unless the slab
326                // size is equal to the size class.
327                unreachable!("slab can only contain one allocation - this is not allowed");
328            }
329            (true, false) => {
330                // The slab was full and is now partially full. It must be moved
331                // from the worker's full list to the worker's partial list.
332                // SAFETY: The allocation indexes are guaranteed to be valid by the caller.
333                unsafe {
334                    self.worker_local_list_full(size_index)
335                        .remove(allocation_indexes.slab_index);
336                }
337                // SAFETY: The allocation indexes are guaranteed to be valid by the caller.
338                unsafe {
339                    self.worker_local_list_partial(size_index)
340                        .push(allocation_indexes.slab_index);
341                }
342            }
343            (false, true) => {
344                // The slab was partially full and is now empty.
345                // It must be moved from the worker's partial list to the global free list.
346                // SAFETY: The allocation indexes are guaranteed to be valid by the caller.
347                unsafe {
348                    self.worker_local_list_partial(size_index)
349                        .remove(allocation_indexes.slab_index);
350                }
351                // SAFETY: The allocation indexes are guaranteed to be valid by the caller.
352                unsafe {
353                    self.global_free_list().push(allocation_indexes.slab_index);
354                }
355            }
356            (false, false) => {
357                // The slab was partially full and is still partially full.
358                // No action is needed, just return.
359            }
360        }
361    }
362
363    fn remote_free(&self, allocation_indexes: AllocationIndexes) {
364        // SAFETY: The allocation indexes are guaranteed to be valid by the caller.
365        unsafe {
366            self.base
367                .remote_free_list(allocation_indexes.slab_index)
368                .push(allocation_indexes.index_within_slab);
369        }
370    }
371
372    /// Find the offset given a pointer.
373    ///
374    /// # Safety
375    /// - The `ptr` must be a valid pointer in the allocator's address space.
376    pub unsafe fn offset(&self, ptr: NonNull<u8>) -> usize {
377        self.base.offset(ptr)
378    }
379
380    /// Return a ptr given a shareable offset - calculated by `offset`.
381    ///
382    /// # Safety
383    ///
384    /// - Caller must ensure the offset is valid for this allocator.
385    pub unsafe fn ptr_from_offset(&self, offset: usize) -> NonNull<u8> {
386        self.base.ptr_from_offset(offset)
387    }
388
389    /// Find the slab index and index within the slab for a given offset.
390    fn find_allocation_indexes(&self, offset: usize) -> AllocationIndexes {
391        self.base.find_allocation_indexes(offset)
392    }
393}
394
395impl FreeOnlyAllocator {
396    /// Free a block of memory previously allocated by this allocator.
397    ///
398    /// # Safety
399    /// - The `ptr` must be a valid pointer to a block of memory allocated by this allocator.
400    /// - The `ptr` must not have been freed before.
401    pub unsafe fn free(&self, ptr: NonNull<u8>) {
402        // SAFETY: The pointer is assumed to be valid and allocated by this allocator.
403        let offset = unsafe { self.offset(ptr) };
404        self.free_offset(offset);
405    }
406
407    /// Free a block of memory previously allocated by this allocator.
408    ///
409    /// # Safety
410    /// - The `offset` must be a valid offset within the memory allocated by this allocator.
411    /// - The `offset` must not have been freed before.
412    pub unsafe fn free_offset(&self, offset: usize) {
413        let allocation_indexes = self.find_allocation_indexes(offset);
414        // SAFETY: The allocation indexes are guaranteed to be valid by the caller.
415        unsafe {
416            self.base
417                .remote_free_list(allocation_indexes.slab_index)
418                .push(allocation_indexes.index_within_slab);
419        }
420    }
421
422    /// Find the offset given a pointer.
423    ///
424    /// # Safety
425    /// - The `ptr` must be a valid pointer in the allocator's address space.
426    pub unsafe fn offset(&self, ptr: NonNull<u8>) -> usize {
427        self.base.offset(ptr)
428    }
429
430    /// Return a ptr given a shareable offset - calculated by `offset`.
431    ///
432    /// # Safety
433    ///
434    /// - Caller must ensure the offset is valid for this allocator.
435    pub unsafe fn ptr_from_offset(&self, offset: usize) -> NonNull<u8> {
436        self.base.ptr_from_offset(offset)
437    }
438
439    /// Find the slab index and index within the slab for a given offset.
440    fn find_allocation_indexes(&self, offset: usize) -> AllocationIndexes {
441        self.base.find_allocation_indexes(offset)
442    }
443}
444
445impl AllocatorBase {
446    /// # Safety
447    /// - `header` must be a valid pointer to an initialized mapping of `file_size` bytes.
448    /// - `file_size` must be the size of the mapping.
449    unsafe fn from_mapping(header: NonNull<Header>, file_size: usize) -> Self {
450        Self {
451            region: Arc::new(MappedRegion { header, file_size }),
452        }
453    }
454
455    #[inline]
456    fn header(&self) -> NonNull<Header> {
457        self.region.header
458    }
459
460    /// Find the offset given a pointer.
461    ///
462    /// # Safety
463    /// - The `ptr` must be a valid pointer in the allocator's address space.
464    unsafe fn offset(&self, ptr: NonNull<u8>) -> usize {
465        ptr.byte_offset_from(self.header()) as usize
466    }
467
468    /// Return a ptr given a shareable offset - calculated by `offset`.
469    ///
470    /// # Safety
471    ///
472    /// - Caller must ensure the offset is valid for this allocator.
473    unsafe fn ptr_from_offset(&self, offset: usize) -> NonNull<u8> {
474        unsafe { self.header().byte_add(offset) }.cast()
475    }
476
477    /// Find the slab index and index within the slab for a given offset.
478    fn find_allocation_indexes(&self, offset: usize) -> AllocationIndexes {
479        let (slab_index, offset_within_slab) = {
480            // SAFETY: The header is assumed to be valid and initialized.
481            let header = unsafe { self.header().as_ref() };
482            debug_assert!(offset >= header.slabs_offset as usize);
483            let offset_from_slab_start = offset.wrapping_sub(header.slabs_offset as usize);
484            let slab_index = (offset_from_slab_start / header.slab_size as usize) as u32;
485            debug_assert!(slab_index < header.num_slabs, "slab index out of bounds");
486
487            // SAFETY: The slab size is guaranteed to be a power of 2, for a valid header.
488            let offset_within_slab =
489                unsafe { Self::offset_within_slab(header.slab_size, offset_from_slab_start) };
490
491            (slab_index, offset_within_slab)
492        };
493
494        let index_within_slab = {
495            // SAFETY: The slab index is guaranteed to be valid by the above calculations.
496            let size_class_index = unsafe { self.slab_meta(slab_index).as_ref() }
497                .size_class_index
498                .load(Ordering::Acquire);
499            // SAFETY: The size class index is guaranteed to be valid by valid slab meta.
500            let size_class = unsafe { size_class(size_class_index) };
501            (offset_within_slab / size_class) as u16
502        };
503
504        AllocationIndexes {
505            slab_index,
506            index_within_slab,
507        }
508    }
509
510    /// Return offset within a slab.
511    ///
512    /// # Safety
513    /// - The `slab_size` must be a power of 2.
514    const unsafe fn offset_within_slab(slab_size: u32, offset_from_slab_start: usize) -> u32 {
515        debug_assert!(slab_size.is_power_of_two());
516        (offset_from_slab_start & (slab_size as usize - 1)) as u32
517    }
518
519    /// Returns an instance of `RemoteFreeList` for the given slab.
520    ///
521    /// # Safety
522    /// - `slab_index` must be a valid slab index.
523    unsafe fn remote_free_list<'a>(&'a self, slab_index: u32) -> RemoteFreeList<'a> {
524        let (head, slab_item_size) = {
525            // SAFETY: The slab index is guaranteed to be valid by the caller.
526            let slab_meta = unsafe { self.slab_meta(slab_index).as_ref() };
527            // SAFETY: The slab meta is guaranteed to be valid by the caller.
528            let size_class =
529                unsafe { size_class(slab_meta.size_class_index.load(Ordering::Acquire)) };
530            (&slab_meta.remote_free_stack_head, size_class)
531        };
532        // SAFETY: The slab index is guaranteed to be valid by the caller.
533        let slab = unsafe { self.slab(slab_index) };
534
535        // SAFETY:
536        // - `slab_item_size` must be a valid size AND currently assigned to the slab.
537        // - `head` must be a valid reference to a `CacheAlignedU16
538        //   that is the head of the remote free list.
539        // - `slab` must be a valid pointer to the beginning of the slab.
540        unsafe { RemoteFreeList::new(slab_item_size, head, slab) }
541    }
542
543    /// Returns a pointer to the slab meta for the given slab index.
544    ///
545    /// # Safety
546    /// - The `slab_index` must be a valid index for the slabs.
547    unsafe fn slab_meta(&self, slab_index: u32) -> NonNull<SlabMeta> {
548        // SAFETY: The header is assumed to be valid and initialized.
549        let offset = unsafe { self.header().as_ref() }.slab_shared_meta_offset;
550        // SAFETY: The header is guaranteed to be valid and initialized.
551        let slab_metas = unsafe { self.header().byte_add(offset as usize).cast::<SlabMeta>() };
552        // SAFETY: The `slab_index` is guaranteed to be valid by the caller.
553        unsafe { slab_metas.add(slab_index as usize) }
554    }
555
556    /// Return a pointer to a slab.
557    ///
558    /// # Safety
559    /// - The `slab_index` must be a valid index for the slabs.
560    unsafe fn slab(&self, slab_index: u32) -> NonNull<u8> {
561        let (slab_size, offset) = {
562            // SAFETY: The header is assumed to be valid and initialized.
563            let header = unsafe { self.header().as_ref() };
564            (header.slab_size, header.slabs_offset)
565        };
566        // SAFETY: The header is guaranteed to be valid and initialized.
567        // The slabs are laid out sequentially after the free stacks.
568        unsafe {
569            self.header()
570                .byte_add(offset as usize)
571                .byte_add(slab_index as usize * slab_size as usize)
572                .cast()
573        }
574    }
575
576    fn free_list_elements(&self) -> NonNull<LinkedListNode> {
577        // SAFETY: The header is assumed to be valid and initialized.
578        let offset = unsafe { self.header().as_ref() }.free_list_elements_offset;
579        // SAFETY: The header is guaranteed to be valid and initialized.
580        unsafe { self.header().byte_add(offset as usize) }.cast()
581    }
582}
583
584impl Allocator {
585    pub fn outstanding_allocation_bytes(&self) -> u64 {
586        self.worker_meta()
587            .outstanding_allocation_bytes
588            .load(Ordering::Relaxed)
589    }
590
591    /// Frees all items in remote free lists.
592    pub fn clean_remote_free_lists(&self) {
593        // Do partial slabs before full slabs, because the act of freeing within
594        // the full slabs may move them to partial slabs list, which would lead
595        // to us double-iterating.
596        for size_index in 0..NUM_SIZE_CLASSES {
597            // SAFETY: The size index is guaranteed to be valid by the loop.
598            let worker_local_list = unsafe { self.worker_local_list_partial(size_index) };
599            self.clean_remote_free_lists_for_list(worker_local_list);
600
601            // SAFETY: The size index is guaranteed to be valid by the loop.
602            let worker_local_list = unsafe { self.worker_local_list_full(size_index) };
603            self.clean_remote_free_lists_for_list(worker_local_list);
604        }
605    }
606
607    /// Frees all items in the remote free list for the given worker local list.
608    fn clean_remote_free_lists_for_list(&self, worker_local_list: WorkerLocalList) {
609        for slab_index in worker_local_list.iterate() {
610            // SAFETY: Slab indices come from worker-local lists and are valid.
611            let (size_index, size) = unsafe { self.slab_size_class(slab_index) };
612            let mut drained_items = 0u64;
613            // SAFETY: The slab index is guaranteed to be valid by the iterator.
614            let remote_free_list = unsafe { self.remote_free_list(slab_index) };
615            for index_within_slab in remote_free_list.drain() {
616                self.local_free_with_size_index(
617                    AllocationIndexes {
618                        slab_index,
619                        index_within_slab,
620                    },
621                    size_index,
622                );
623                drained_items += 1;
624            }
625            if drained_items != 0 {
626                self.worker_meta()
627                    .outstanding_allocation_bytes
628                    .fetch_sub(drained_items * size as u64, Ordering::Relaxed);
629            }
630        }
631    }
632}
633
634impl Allocator {
635    /// Returns a pointer to the free list elements in allocator.
636    fn free_list_elements(&self) -> NonNull<LinkedListNode> {
637        self.base.free_list_elements()
638    }
639
640    /// Returns a `GlobalFreeList` to interact with the global free list.
641    fn global_free_list<'a>(&'a self) -> GlobalFreeList<'a> {
642        // SAFETY: The header is assumed to be valid and initialized.
643        let head = &unsafe { self.base.header().as_ref() }.global_free_list_head;
644        let list = self.free_list_elements();
645        // SAFETY:
646        // - `head` is a valid reference to the global free list head.
647        // - `list` is guaranteed to be valid wtih sufficient capacity.
648        unsafe { GlobalFreeList::new(head, list) }
649    }
650
651    /// Returns a `WorkerLocalList` for the current worker to interact with its
652    /// local free list of partially full slabs.
653    ///
654    /// # Safety
655    /// - The `size_index` must be a valid index for the size classes.
656    unsafe fn worker_local_list_partial<'a>(&'a self, size_index: usize) -> WorkerLocalList<'a> {
657        let head = &self.worker_head(size_index).partial;
658        let list = self.free_list_elements();
659
660        // SAFETY:
661        // - `head` is a valid reference to the worker's local list head.
662        // - `list` is guaranteed to be valid with sufficient capacity.
663        unsafe { WorkerLocalList::new(head, list) }
664    }
665
666    /// Returns a `WorkerLocalList` for the current worker to interact with its
667    /// local free list of full slabs.
668    ///
669    /// # Safety
670    /// - The `size_index` must be a valid index for the size classes.
671    unsafe fn worker_local_list_full<'a>(&'a self, size_index: usize) -> WorkerLocalList<'a> {
672        let head = &self.worker_head(size_index).full;
673        let list = self.free_list_elements();
674
675        // SAFETY:
676        // - `head` is a valid reference to the worker's local list head.
677        // - `list` is guaranteed to be valid with sufficient capacity.
678        unsafe { WorkerLocalList::new(head, list) }
679    }
680
681    fn worker_meta(&self) -> &WorkerLocalListHeads {
682        // SAFETY: The worker index is guaranteed to be valid by the constructor.
683        unsafe { worker_meta_ptr(self.base.header(), self.worker_index).as_ref() }
684    }
685
686    fn worker_head(&self, size_index: usize) -> &WorkerLocalListPartialFullHeads {
687        // SAFETY: The size index is guaranteed to be valid by the caller.
688        &self.worker_meta().heads[size_index]
689    }
690
691    /// Returns the slab's assigned size class index and class size in bytes.
692    ///
693    /// # Safety
694    /// - `slab_index` must be a valid slab index.
695    unsafe fn slab_size_class(&self, slab_index: u32) -> (usize, u32) {
696        let size_index = unsafe { self.slab_meta(slab_index).as_ref() }
697            .size_class_index
698            .load(Ordering::Relaxed);
699        // SAFETY: The slab meta stores a valid size class index while assigned.
700        let size = unsafe { size_class(size_index) };
701        (size_index, size)
702    }
703
704    /// Returns an instance of `RemoteFreeList` for the given slab.
705    ///
706    /// # Safety
707    /// - `slab_index` must be a valid slab index.
708    unsafe fn remote_free_list<'a>(&'a self, slab_index: u32) -> RemoteFreeList<'a> {
709        self.base.remote_free_list(slab_index)
710    }
711
712    /// Returns a pointer to the slab meta for the given slab index.
713    ///
714    /// # Safety
715    /// - The `slab_index` must be a valid index for the slabs.
716    unsafe fn slab_meta(&self, slab_index: u32) -> NonNull<SlabMeta> {
717        self.base.slab_meta(slab_index)
718    }
719
720    /// Return a mutable reference to a free stack for the given slab index.
721    ///
722    /// # Safety
723    /// - The `slab_index` must be a valid index for the slabs.
724    unsafe fn slab_free_stack<'a>(&'a self, slab_index: u32) -> FreeStack<'a> {
725        let (slab_size, offset) = {
726            // SAFETY: The header is assumed to be valid and initialized.
727            let header = unsafe { self.base.header().as_ref() };
728            (header.slab_size, header.slab_free_stacks_offset)
729        };
730        let free_stack_size = header::layout::single_free_stack_size(slab_size);
731
732        // SAFETY: The `FreeStack` layout is guaranteed to have enough room
733        // for top, capacity, and the trailing stack.
734        let mut top = unsafe {
735            self.base
736                .header()
737                .byte_add(offset as usize)
738                .byte_add(slab_index as usize * free_stack_size)
739                .cast()
740        };
741        let mut capacity = unsafe { top.add(1) };
742        let trailing_stack = unsafe { capacity.add(1) };
743        unsafe { FreeStack::new(top.as_mut(), capacity.as_mut(), trailing_stack) }
744    }
745
746    /// Return a pointer to a slab.
747    ///
748    /// # Safety
749    /// - The `slab_index` must be a valid index for the slabs.
750    unsafe fn slab(&self, slab_index: u32) -> NonNull<u8> {
751        self.base.slab(slab_index)
752    }
753}
754
755unsafe fn worker_meta_ptr(
756    header: NonNull<Header>,
757    worker_index: u32,
758) -> NonNull<WorkerLocalListHeads> {
759    let all_workers_heads = unsafe {
760        header
761            .byte_add(offset_of!(Header, worker_local_list_heads))
762            .cast::<WorkerLocalListHeads>()
763    };
764    // SAFETY: The caller guarantees the worker index is in range.
765    unsafe { all_workers_heads.add(worker_index as usize) }
766}
767
768unsafe fn claim_any_worker_index(header: NonNull<Header>) -> Option<u32> {
769    let num_workers = unsafe { header.as_ref() }.num_workers;
770    for worker_index in 0..num_workers {
771        let claimed = unsafe { &worker_meta_ptr(header, worker_index).as_ref().claimed };
772        if claimed
773            .compare_exchange(0, 1, Ordering::AcqRel, Ordering::Acquire)
774            .is_ok()
775        {
776            return Some(worker_index);
777        }
778    }
779    None
780}
781
782struct AllocationIndexes {
783    slab_index: u32,
784    index_within_slab: u16,
785}
786
787#[cfg(test)]
788mod tests {
789    use super::*;
790    use crate::size_classes::{MAX_SIZE, SIZE_CLASSES};
791
792    const TEST_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64 MiB
793
794    fn create_temp_shmem_file() -> Result<File, Error> {
795        use std::fs::OpenOptions;
796        use std::sync::atomic::{AtomicU64, Ordering};
797
798        static COUNTER: AtomicU64 = AtomicU64::new(0);
799        let temp_dir = std::env::temp_dir();
800        let n = COUNTER.fetch_add(1, Ordering::Relaxed);
801        let path = temp_dir.join(format!("rts-alloc-{n}.tmp"));
802
803        let mut open_options = OpenOptions::new();
804        open_options.read(true).write(true).create_new(true);
805
806        #[cfg(windows)]
807        {
808            use std::os::windows::fs::OpenOptionsExt;
809            use windows_sys::Win32::Storage::FileSystem::{
810                FILE_ATTRIBUTE_TEMPORARY, FILE_FLAG_DELETE_ON_CLOSE,
811            };
812
813            open_options
814                .attributes(FILE_ATTRIBUTE_TEMPORARY)
815                .custom_flags(FILE_FLAG_DELETE_ON_CLOSE);
816        }
817
818        let open_result = open_options.open(&path);
819
820        match open_result {
821            Ok(file) => {
822                #[cfg(unix)]
823                {
824                    std::fs::remove_file(&path)?;
825                }
826                Ok(file)
827            }
828            Err(err) => Err(Error::IoError(err)),
829        }
830    }
831
832    fn initialize_for_test(slab_size: u32, num_workers: u32) -> (File, Allocator) {
833        let file = create_temp_shmem_file().unwrap();
834        // SAFETY: Test helper creates allocator from a fresh temp shared-memory file.
835        let allocator =
836            unsafe { Allocator::create(&file, TEST_BUFFER_SIZE, num_workers, slab_size).unwrap() };
837        (file, allocator)
838    }
839
840    #[test]
841    fn test_allocator() {
842        let slab_size = 65536; // 64 KiB
843        let num_workers = 4;
844        let (_file, allocator) = initialize_for_test(slab_size, num_workers);
845        assert_eq!(allocator.outstanding_allocation_bytes(), 0);
846
847        let mut allocations = vec![];
848        let mut total_allocated_bytes = 0u64;
849
850        for class_size in SIZE_CLASSES[..NUM_SIZE_CLASSES - 1].iter() {
851            for size in [class_size - 1, *class_size, class_size + 1] {
852                allocations.push(allocator.allocate(size).unwrap());
853                total_allocated_bytes += size_class_index(size)
854                    .map(|i| unsafe { size_class(i) as u64 })
855                    .unwrap();
856            }
857        }
858        for size in [MAX_SIZE - 1, MAX_SIZE] {
859            allocations.push(allocator.allocate(size).unwrap());
860            total_allocated_bytes += size_class_index(size)
861                .map(|i| unsafe { size_class(i) as u64 })
862                .unwrap();
863        }
864        assert_eq!(
865            allocator.outstanding_allocation_bytes(),
866            total_allocated_bytes
867        );
868        assert!(allocator.allocate(MAX_SIZE + 1).is_none());
869
870        // The worker should have local lists for all size classes.
871        for size_index in 0..NUM_SIZE_CLASSES {
872            // SAFETY: The size index is guaranteed to be valid by the loop.
873            let worker_local_list = unsafe { allocator.worker_local_list_partial(size_index) };
874            assert!(worker_local_list.head().is_some());
875        }
876
877        for ptr in allocations {
878            // SAFETY: ptr is valid allocation from the allocator.
879            unsafe {
880                allocator.free(ptr);
881            }
882        }
883        assert_eq!(allocator.outstanding_allocation_bytes(), 0);
884
885        // The worker local lists should be empty after freeing.
886        for size_index in 0..NUM_SIZE_CLASSES {
887            // SAFETY: The size index is guaranteed to be valid by the loop.
888            let worker_local_list = unsafe { allocator.worker_local_list_partial(size_index) };
889            assert_eq!(worker_local_list.head(), None);
890        }
891    }
892
893    #[test]
894    fn test_slab_list_transitions() {
895        let slab_size = 65536; // 64 KiB
896        let num_workers = 4;
897        let (_file, allocator) = initialize_for_test(slab_size, num_workers);
898
899        let allocation_size = 2048;
900        let size_index = size_class_index(allocation_size).unwrap();
901        let allocations_per_slab = slab_size / allocation_size;
902
903        fn check_worker_list_expectations(
904            allocator: &Allocator,
905            size_index: usize,
906            expect_partial: bool,
907            expect_full: bool,
908        ) {
909            unsafe {
910                let partial_list = allocator.worker_local_list_partial(size_index);
911                assert_eq!(
912                    partial_list.head().is_some(),
913                    expect_partial,
914                    "{:?}",
915                    partial_list.head()
916                );
917
918                let full_list = allocator.worker_local_list_full(size_index);
919                assert_eq!(
920                    full_list.head().is_some(),
921                    expect_full,
922                    "{:?}",
923                    full_list.head()
924                );
925            }
926        }
927
928        // The parital list and full list should begin empty.
929        check_worker_list_expectations(&allocator, size_index, false, false);
930
931        let mut first_slab_allocations = vec![];
932        for _ in 0..allocations_per_slab - 1 {
933            first_slab_allocations.push(allocator.allocate(allocation_size).unwrap());
934        }
935
936        // The first slab should be partially full and the full list empty.
937        check_worker_list_expectations(&allocator, size_index, true, false);
938
939        // Allocate one more to fill the slab.
940        first_slab_allocations.push(allocator.allocate(allocation_size).unwrap());
941
942        // The first slab should now be full and moved to the full list.
943        check_worker_list_expectations(&allocator, size_index, false, true);
944
945        // Allocating again will give a new slab, which will be partially full.
946        let second_slab_allocation = allocator.allocate(allocation_size).unwrap();
947
948        // The second slab should be partially full and the first slab in the full list.
949        check_worker_list_expectations(&allocator, size_index, true, true);
950
951        let mut first_slab_allocations = first_slab_allocations.drain(..);
952        unsafe {
953            allocator.free(first_slab_allocations.next().unwrap());
954        }
955        // Both slabs should be partially full, and none are full.
956        check_worker_list_expectations(&allocator, size_index, true, false);
957
958        // Free the first slab allocation.
959        for ptr in first_slab_allocations {
960            unsafe {
961                allocator.free(ptr);
962            }
963        }
964        // The first slab is now empty and should be moved to the global free list,
965        // but the second slab is still partially full.
966        check_worker_list_expectations(&allocator, size_index, true, false);
967
968        // Free the second slab allocation.
969        unsafe {
970            allocator.free(second_slab_allocation);
971        }
972        // Both slabs should now be empty and moved to the global free list.
973        check_worker_list_expectations(&allocator, size_index, false, false);
974    }
975
976    #[test]
977    fn test_out_of_slabs() {
978        let slab_size = 65536; // 64 KiB
979        let num_workers = 4;
980        let (_file, allocator) = initialize_for_test(slab_size, num_workers);
981
982        let num_slabs = unsafe { allocator.base.header().as_ref() }.num_slabs;
983        for index in 0..num_slabs {
984            let slab_index = unsafe { allocator.take_slab(0) }.unwrap();
985            assert_eq!(slab_index, index);
986        }
987        // The next slab allocation should fail, as all slabs are taken.
988        assert!(unsafe { allocator.take_slab(0) }.is_none());
989    }
990
991    #[test]
992    fn test_remote_free_lists() {
993        let slab_size = 65536; // 64 KiB
994        let num_workers = 4;
995        let (file, allocator_0) = initialize_for_test(slab_size, num_workers);
996        let file_for_join = file.try_clone().unwrap();
997        let allocator_1 = Allocator::join(&file_for_join).unwrap();
998
999        let allocation_size = 2048;
1000        let size_index = size_class_index(allocation_size).unwrap();
1001        let allocations_per_slab = slab_size / allocation_size;
1002
1003        // Allocate enough to fill the first slab.
1004        let mut allocations = vec![];
1005        for _ in 0..allocations_per_slab {
1006            allocations.push(allocator_0.allocate(allocation_size).unwrap());
1007        }
1008
1009        // The first slab should be full.
1010        let slab_index = unsafe {
1011            let worker_local_list = allocator_0.worker_local_list_partial(size_index);
1012            assert!(worker_local_list.head().is_none());
1013            let worker_local_list = allocator_0.worker_local_list_full(size_index);
1014            assert!(worker_local_list.head().is_some());
1015            worker_local_list.head().unwrap()
1016        };
1017
1018        // The slab's remote list should be empty.
1019        let remote_free_list = unsafe { allocator_0.remote_free_list(slab_index) };
1020        assert!(remote_free_list.iterate().next().is_none());
1021
1022        // Free the allocations to the remote free list.
1023        for ptr in allocations {
1024            unsafe {
1025                let offset = allocator_0.offset(ptr);
1026                allocator_1.free_offset(offset);
1027            }
1028        }
1029        assert_eq!(
1030            allocator_0.outstanding_allocation_bytes(),
1031            allocations_per_slab as u64 * allocation_size as u64
1032        );
1033
1034        // Allocator 0 can NOT allocate in the same slab.
1035        let different_slab_allocation = allocator_0.allocate(allocation_size).unwrap();
1036        let allocation_indexes = unsafe {
1037            allocator_0.find_allocation_indexes(allocator_0.offset(different_slab_allocation))
1038        };
1039        assert_ne!(allocation_indexes.slab_index, slab_index);
1040        unsafe { allocator_0.free(different_slab_allocation) };
1041
1042        // If we clean the remote free lists, the next allocation should succeed in the same slab.
1043        allocator_0.clean_remote_free_lists();
1044        assert_eq!(allocator_0.outstanding_allocation_bytes(), 0);
1045        let same_slab_allocation = allocator_0.allocate(allocation_size).unwrap();
1046        let allocation_indexes = unsafe {
1047            allocator_0.find_allocation_indexes(allocator_0.offset(same_slab_allocation))
1048        };
1049        assert_eq!(allocation_indexes.slab_index, slab_index);
1050    }
1051
1052    #[test]
1053    fn test_join_from_existing_reuses_mapping() {
1054        let slab_size = 65536; // 64 KiB
1055        let num_workers = 4;
1056        let (_file, allocator_0) = initialize_for_test(slab_size, num_workers);
1057
1058        let allocator_1 = Allocator::join_from_existing(&allocator_0).unwrap();
1059        assert_ne!(allocator_0.worker_index, allocator_1.worker_index);
1060        assert_eq!(
1061            allocator_0.base.header().as_ptr(),
1062            allocator_1.base.header().as_ptr()
1063        );
1064
1065        let free_only_allocator = FreeOnlyAllocator::join_from_existing(&allocator_0);
1066        assert_eq!(
1067            allocator_0.base.header().as_ptr(),
1068            free_only_allocator.base.header().as_ptr()
1069        );
1070    }
1071
1072    #[test]
1073    fn test_drop_original_mapping_stays_alive() {
1074        let slab_size = 65536; // 64 KiB
1075        let num_workers = 4;
1076        let (_file, allocator_0) = initialize_for_test(slab_size, num_workers);
1077
1078        // Join with a second allocator.
1079        let allocator_1 = Allocator::join_from_existing(&allocator_0).unwrap();
1080
1081        // Drop the original.
1082        drop(allocator_0);
1083
1084        // We can still allocate, read, and write through the shared mapping.
1085        let allocation_size = 2048;
1086        let allocation = allocator_1.allocate(allocation_size).unwrap();
1087        unsafe {
1088            allocation
1089                .as_ptr()
1090                .write_bytes(0xAB, allocation_size as usize);
1091            assert_eq!(allocation.as_ptr().read(), 0xAB);
1092            allocator_1.free(allocation);
1093        }
1094    }
1095
1096    #[test]
1097    fn test_worker_reuse_with_free_only() {
1098        let slab_size = 65536; // 64 KiB
1099        let num_workers = 4;
1100        let (_file, allocator_0) = initialize_for_test(slab_size, num_workers);
1101        let num_workers = unsafe { allocator_0.base.header().as_ref() }.num_workers;
1102
1103        // Join with a free only allocator (doesn't consume a worker slot).
1104        let free_only_allocator = FreeOnlyAllocator::join_from_existing(&allocator_0);
1105
1106        // Fill all worker slots.
1107        let mut allocators = Vec::new();
1108        for _ in 0..(num_workers - 1) {
1109            allocators.push(Allocator::join_from_existing_free_only(&free_only_allocator).unwrap());
1110        }
1111        assert!(Allocator::join_from_existing_free_only(&free_only_allocator).is_err());
1112
1113        // Drop original and take its worker spot with a new allocator.
1114        drop(allocator_0);
1115        allocators.push(Allocator::join_from_existing_free_only(&free_only_allocator).unwrap());
1116        assert!(Allocator::join_from_existing_free_only(&free_only_allocator).is_err());
1117
1118        // Drop all allocators.
1119        drop(allocators);
1120
1121        // Re-fill all the allocators from our free only observer.
1122        let mut allocators = Vec::new();
1123        for _ in 0..num_workers {
1124            allocators.push(Allocator::join_from_existing_free_only(&free_only_allocator).unwrap());
1125        }
1126        assert!(Allocator::join_from_existing_free_only(&free_only_allocator).is_err());
1127
1128        // Verify we can allocate, write, and read through a re-joined allocator.
1129        let allocation_size = 2048u32;
1130        let allocation = allocators[0].allocate(allocation_size).unwrap();
1131        unsafe {
1132            allocation
1133                .as_ptr()
1134                .write_bytes(0xCD, allocation_size as usize);
1135            assert_eq!(allocation.as_ptr().read(), 0xCD);
1136            allocators[0].free(allocation);
1137        }
1138    }
1139
1140    #[test]
1141    fn test_free_only_allocator() {
1142        let slab_size = 65536; // 64 KiB
1143        let num_workers = 4;
1144        let (file, allocator) = initialize_for_test(slab_size, num_workers);
1145        let file_for_join = file.try_clone().unwrap();
1146        let free_only_allocator = FreeOnlyAllocator::join(&file_for_join).unwrap();
1147
1148        let allocation_size = 2048;
1149        let allocation = allocator.allocate(allocation_size).unwrap();
1150
1151        let allocation_indexes =
1152            unsafe { allocator.find_allocation_indexes(allocator.offset(allocation)) };
1153
1154        // SAFETY: allocation is a valid pointer allocated by the allocator.
1155        unsafe {
1156            let offset = allocator.offset(allocation);
1157            free_only_allocator.free_offset(offset);
1158        }
1159
1160        // Index should be in the remote free list for the slab.
1161        assert_eq!(
1162            unsafe { allocator.remote_free_list(allocation_indexes.slab_index) }
1163                .iterate()
1164                .next()
1165                .unwrap(),
1166            allocation_indexes.index_within_slab
1167        );
1168    }
1169}