rtic/
slab.rs

1// Preallocated storage similar to slab crate, but fixed size
2
3use std::{
4    cell::UnsafeCell,
5    mem::MaybeUninit,
6    sync::{
7        atomic::{AtomicUsize, Ordering},
8        Arc,
9    },
10};
11
12/// Handle to a queued item
13#[derive(Debug)]
14pub struct SlabHandle {
15    index: usize,
16    // Pointer of the Arc<Slab> used to pin to specific instance
17    ptr: usize,
18}
19
20// N should be a power of 2 so that overflows work correctly, but 64bit usize is unlikely to ever overflow
21pub struct Slab<T, const N: usize> {
22    // SPMC ring buffer. Contains indices of free slots or usize::MAX if empty.
23    // SlabReceiver is producer in this case, because it pushes returned indices.
24    free_queue: [AtomicUsize; N],
25    // Number of used free_queue slots. Zero means that Slab is empty, N means that Slab is full.
26    free_used: AtomicUsize,
27    // Current pop location of free queue
28    free_queue_tail: AtomicUsize,
29    // Slots that store actual data
30    slots: UnsafeCell<[MaybeUninit<T>; N]>,
31}
32
33unsafe impl<T: Send, const N: usize> Send for Slab<T, N> {}
34unsafe impl<T: Send, const N: usize> Sync for Slab<T, N> {}
35
36impl<T, const N: usize> Slab<T, N> {
37    pub fn new() -> Self {
38        let free_queue = unsafe {
39            let mut buf = MaybeUninit::uninit();
40            let buf_ptr = buf.as_mut_ptr() as *mut AtomicUsize;
41            (0..N).for_each(|i| buf_ptr.add(i).write(AtomicUsize::new(i)));
42            buf.assume_init()
43        };
44
45        // Need unsafe, otherwise compiler complains about T not being copy
46        let slots = unsafe { MaybeUninit::<[MaybeUninit<T>; N]>::uninit().assume_init() };
47
48        Self {
49            free_queue,
50            free_used: AtomicUsize::new(0),
51            free_queue_tail: AtomicUsize::new(0),
52            slots: UnsafeCell::new(slots),
53        }
54    }
55
56    pub fn split(self) -> (SlabSender<T, N>, SlabReceiver<T, N>) {
57        let inner = Arc::new(self);
58
59        (
60            SlabSender {
61                inner: inner.clone(),
62            },
63            SlabReceiver {
64                inner,
65                free_queue_head: 0,
66            },
67        )
68    }
69}
70
71#[derive(Clone)]
72pub struct SlabSender<T, const N: usize> {
73    inner: Arc<Slab<T, N>>,
74}
75
76impl<T, const N: usize> SlabSender<T, N> {
77    pub fn insert(&self, item: T) -> Result<SlabHandle, T> {
78        let index = match self.get_index() {
79            Some(index) => index,
80            None => return Err(item),
81        };
82
83        unsafe {
84            (*self.inner.slots.get())[index].write(item);
85        }
86
87        Ok(SlabHandle {
88            index,
89            ptr: Arc::as_ptr(&self.inner) as usize,
90        })
91    }
92
93    fn get_index(&self) -> Option<usize> {
94        let free_used = self.inner.free_used.fetch_add(1, Ordering::Acquire);
95        if free_used >= N {
96            self.inner.free_used.fetch_sub(1, Ordering::Release);
97            return None;
98        }
99
100        let tail = self.inner.free_queue_tail.fetch_add(1, Ordering::Acquire);
101        let val = self.inner.free_queue[tail % N].swap(usize::MAX, Ordering::Release);
102        assert!(val != usize::MAX);
103        Some(val)
104    }
105}
106
107pub struct SlabReceiver<T, const N: usize> {
108    inner: Arc<Slab<T, N>>,
109    // push location of free_queue
110    free_queue_head: usize,
111}
112
113impl<T, const N: usize> SlabReceiver<T, N> {
114    pub fn remove(&mut self, handle: SlabHandle) -> T {
115        // Ensure handle belongs to this slab
116        assert!(Arc::as_ptr(&self.inner) as usize == handle.ptr);
117
118        let item = unsafe { (*self.inner.slots.get())[handle.index].as_ptr().read() };
119        self.return_index(handle.index);
120        item
121    }
122
123    fn return_index(&mut self, index: usize) {
124        let old = self.inner.free_queue[self.free_queue_head % N].swap(index, Ordering::Acquire);
125        assert!(old == usize::MAX);
126
127        let count = self.inner.free_used.fetch_sub(1, Ordering::Release);
128        assert!(count != usize::MAX);
129
130        self.free_queue_head += 1;
131    }
132}