starb/
lib.rs

1//! An implementation of STAtically allocated Ring Buffers.
2//!
3//! This is a simple ring-buffer structure that lives on the stack,
4//! rather than the heap, so that it can be used in `no-std`
5//! environments, such as embedded.
6#![no_std]
7
8use core::{
9    cell::UnsafeCell,
10    cmp,
11    mem::MaybeUninit,
12    ptr,
13    sync::atomic::{self, AtomicUsize, Ordering},
14};
15
16/// Underlying buffer capacity. Needs to be hard-coded for now,
17/// because the size of the structure needs to be known at compile
18/// time so it can be statically allocated or created on the stack.
19///
20/// This will disappear when const generics appear.
21pub const CAPACITY: usize = 1024;
22
23/// Errors that can be made when interacting with the ring buffer.
24#[derive(Debug, PartialEq)]
25pub enum Error {
26    /// The buffer is at capacity and cannot fit any more
27    /// elements. You need to `unshift` at least once to make some
28    /// space.
29    BufferFull,
30}
31
32/// A lock-free concurrent ring buffer that prevents overwriting data
33/// before it is read.
34pub struct RingBuffer<T> {
35    head: AtomicUsize,
36    tail: AtomicUsize,
37
38    // Backing store needs to be UnsafeCell to make sure it's not
39    // stored in read-only data sections.
40    buf: UnsafeCell<MaybeUninit<[T; CAPACITY]>>,
41}
42
43impl<T> RingBuffer<T> {
44    /// Create a new RingBuffer.
45    pub const fn new() -> Self {
46        Self {
47            head: AtomicUsize::new(0),
48            tail: AtomicUsize::new(0),
49            buf: UnsafeCell::new(MaybeUninit::uninit()),
50        }
51    }
52
53    /// Splits the ring buffer into a consumer/producer pair
54    /// (`Reader`/`Writer` respectively). These structures can be used
55    /// safely across threads.
56    // Honestly, this should consume `self` or at least be `mut`, but
57    // it needs to be available in const static contexts, which
58    // prevents that. Basically all the rest of the unsafe stuff in
59    // here is a consequence of that.
60    //
61    // No, lazy_static is not an option, because it doesn't work on
62    // architectures where CAS atomics are missing.
63    pub const fn split(&self) -> (Reader<T>, Writer<T>) {
64        let rbr = Reader { rb: &self };
65        let rbw = Writer { rb: &self };
66        (rbr, rbw)
67    }
68}
69unsafe impl<T> Send for RingBuffer<T> where T: Send {}
70
71/// Consumer of `RingBuffer`.
72pub struct Reader<'a, T> {
73    rb: &'a RingBuffer<T>,
74}
75unsafe impl<T> Send for Reader<'_, T> where T: Send {}
76
77/// Producer for `Ringbuffer`.
78pub struct Writer<'a, T> {
79    rb: &'a RingBuffer<T>,
80}
81unsafe impl<T> Send for Writer<'_, T> where T: Send {}
82
83impl<T> Reader<'_, T> {
84    /// The number of elements currently available for reading.
85    ///
86    /// NB: Because the `Writer` half of the ring buffer may be adding
87    /// elements in another thread, the value read from `len` is a
88    /// *minimum* of what may actually be available by the time the
89    /// reading takes place.
90    pub fn len(&self) -> usize {
91        let h = self.rb.head.load(Ordering::Relaxed);
92        let t = self.rb.tail.load(Ordering::Relaxed);
93        atomic::fence(Ordering::Acquire);
94
95        (t + CAPACITY - h) % CAPACITY
96    }
97
98    /// Whether or not the ring buffer is empty.
99    pub fn is_empty(&self) -> bool {
100        self.len() == 0
101    }
102
103    /// Returns the value at the start of the buffer and advances the
104    /// start of the buffer to the next element.
105    ///
106    /// If nothing is available in the buffer, returns `None`
107    pub fn shift(&mut self) -> Option<T> {
108        let h = self.rb.head.load(Ordering::Relaxed);
109        let t = self.rb.tail.load(Ordering::Relaxed);
110
111        if h == t {
112            None
113        } else {
114            atomic::fence(Ordering::Acquire);
115            let nh = (h + 1) % CAPACITY;
116            let rc = unsafe {
117                let buf: &MaybeUninit<[T; CAPACITY]> = &*self.rb.buf.get();
118                Some(Self::load_val_at(h, buf))
119            };
120            atomic::fence(Ordering::Release);
121            self.rb.head.store(nh, Ordering::Relaxed);
122            rc
123        }
124    }
125
126    /// Shift all available data into `buf` up to the size of `buf`.
127    ///
128    /// Returns the number of items written into `buf`.
129    pub fn shift_into(&mut self, buf: &mut [T]) -> usize {
130        let mut h = self.rb.head.load(Ordering::Relaxed);
131        let t = self.rb.tail.load(Ordering::Relaxed);
132        atomic::fence(Ordering::Acquire);
133
134        let mylen = (t + CAPACITY - h) % CAPACITY;
135        let buflen = buf.len();
136        let len = cmp::min(mylen, buflen);
137
138        unsafe {
139            let rbuf: &MaybeUninit<[T; CAPACITY]> = &*self.rb.buf.get();
140            for i in 0..len {
141                *buf.get_unchecked_mut(i) = Self::load_val_at(h, rbuf);
142                h = (h + 1) % CAPACITY;
143            }
144        }
145
146        atomic::fence(Ordering::Release);
147        self.rb.head.store(h, Ordering::Relaxed);
148        len
149    }
150
151    #[inline(always)]
152    unsafe fn load_val_at(i: usize, buf: &MaybeUninit<[T; CAPACITY]>) -> T {
153        let b: &[T; CAPACITY] = &*buf.as_ptr();
154        ptr::read(b.get_unchecked(i))
155    }
156}
157
158impl<T> Iterator for Reader<'_, T> {
159    type Item = T;
160    fn next(&mut self) -> Option<Self::Item> {
161        self.shift()
162    }
163}
164
165impl<T> Writer<'_, T> {
166    /// Put `v` at the end of the buffer.
167    ///
168    /// Returns `BufferFull` if appending `v` would overlap with the
169    /// start of the buffer.
170    pub fn unshift(&mut self, v: T) -> Result<(), Error> {
171        let h = self.rb.head.load(Ordering::Relaxed);
172        let t = self.rb.tail.load(Ordering::Relaxed);
173
174        let nt = (t + 1) % CAPACITY;
175        // We can't allow overwrites of the head position, because it
176        // would then be possible to write to the same memory location
177        // that is being read. If reading a value of `T` takes more
178        // than one memory read, then reading from the head would
179        // produce garbage in this scenario.
180        if nt == h {
181            // FIXME: this comparison is wrong in the trivial
182            // 1-element buffer case which would never allow an
183            // `unshift`. In larger buffers it wastes a buffer slot.
184            Err(Error::BufferFull)
185        } else {
186            atomic::fence(Ordering::Acquire);
187            unsafe {
188                let buf = &mut *self.rb.buf.get();
189                Self::store_val_at(t, buf, v);
190            }
191            atomic::fence(Ordering::Release);
192            self.rb.tail.store(nt, Ordering::Relaxed);
193            Ok(())
194        }
195    }
196
197    #[inline(always)]
198    unsafe fn store_val_at(i: usize, buf: &mut MaybeUninit<[T; CAPACITY]>, val: T) {
199        let b: &mut [T; CAPACITY] = &mut *buf.as_mut_ptr();
200        ptr::write(b.get_unchecked_mut(i), val);
201    }
202}
203
204// TODO: this needs to be `Copy` because we're pulling data from a
205// slice, and we can't just take stuff out of an index without
206// replacing it, and there's no good value for that.
207impl<T> Writer<'_, T>
208where
209    T: Copy,
210{
211    /// Copy as much of `buf` into the ring buffer as possible.
212    ///
213    /// Returns the number of items copied.
214    pub fn unshift_from(&mut self, buf: &[T]) -> usize {
215        let h = self.rb.head.load(Ordering::Relaxed);
216        let mut t = self.rb.tail.load(Ordering::Relaxed);
217        atomic::fence(Ordering::Acquire);
218
219        let mylen = (t + CAPACITY - h) % CAPACITY;
220        let buflen = buf.len();
221        let len = cmp::min(CAPACITY - mylen - 1, buflen);
222
223        unsafe {
224            let rbuf = &mut *self.rb.buf.get();
225            for i in 0..len {
226                Self::store_val_at(t, rbuf, *buf.get_unchecked(i));
227                t = (t + 1) % CAPACITY;
228            }
229        }
230
231        atomic::fence(Ordering::Release);
232        self.rb.tail.store(t, Ordering::Relaxed);
233        len
234    }
235}
236
237#[cfg(test)]
238mod test {
239    use super::*;
240
241    #[test]
242    fn detects_empty() {
243        let rb = RingBuffer::<bool>::new();
244        let (mut rbr, mut rbw) = rb.split();
245        assert!(rbr.is_empty());
246        rbw.unshift(true).ok();
247        assert!(!rbr.is_empty());
248        rbr.shift();
249        assert!(rbr.is_empty());
250    }
251
252    #[test]
253    fn len_matches() {
254        let rb = RingBuffer::<bool>::new();
255        let (mut rbr, mut rbw) = rb.split();
256
257        // Count length up.
258        for i in 0..CAPACITY - 1 {
259            assert_eq!(rbr.len(), i);
260            assert_eq!(rbw.unshift(true), Ok(()));
261        }
262
263        // ...and back down again.
264        for i in 0..CAPACITY - 1 {
265            assert_eq!(rbr.len(), CAPACITY - 1 - i);
266            rbr.shift();
267        }
268
269        // Final check for empty.
270        assert_eq!(rbr.len(), 0);
271    }
272
273    #[test]
274    fn can_wrap() {
275        let rb = RingBuffer::<usize>::new();
276        let (mut rbr, mut rbw) = rb.split();
277
278        // Make sure we can store n-1 elements.
279        for i in 0..CAPACITY - 1 {
280            assert_eq!(rbw.unshift(i), Ok(()))
281        }
282
283        // ...and that we can load them back again.
284        for i in 0..CAPACITY - 1 {
285            assert_eq!(rbr.shift(), Some(i))
286        }
287    }
288
289    #[test]
290    fn cannot_overwrite() {
291        let rb = RingBuffer::<usize>::new();
292        let (mut rbr, mut rbw) = rb.split();
293
294        for i in 0..CAPACITY - 1 {
295            assert_eq!(rbw.unshift(i), Ok(()));
296        }
297        assert_eq!(rbw.unshift(0xffff), Err(Error::BufferFull));
298
299        // We can drop an element to allow a slot to write to again.
300        rbr.shift();
301        assert_eq!(rbw.unshift(0xffff), Ok(()));
302    }
303
304    #[test]
305    fn can_iter() {
306        let rb = RingBuffer::<usize>::new();
307        let (rbr, mut rbw) = rb.split();
308
309        for i in 0..CAPACITY - 1 {
310            assert_eq!(rbw.unshift(i), Ok(()));
311        }
312
313        let mut i = 0;
314        for e in rbr {
315            assert_eq!(e, i);
316            i += 1;
317        }
318    }
319
320    #[test]
321    fn shift_into_smaller() {
322        let rb = RingBuffer::<usize>::new();
323        let (mut rbr, mut rbw) = rb.split();
324        for i in 0..CAPACITY - 1 {
325            assert_eq!(rbw.unshift(i), Ok(()));
326        }
327
328        let mut buf: [usize; CAPACITY / 2] = [0; CAPACITY / 2];
329        assert_eq!(rbr.shift_into(&mut buf), CAPACITY / 2, "return len wrong");
330        for i in 0..CAPACITY / 2 {
331            assert_eq!(buf[i], i, "slot {} wrong", i)
332        }
333
334        assert!(!rbr.shift().is_none());
335    }
336
337    #[test]
338    fn shift_into_bigger() {
339        let rb = RingBuffer::<usize>::new();
340        let (mut rbr, mut rbw) = rb.split();
341        for i in 0..CAPACITY - 1 {
342            assert_eq!(rbw.unshift(i), Ok(()));
343        }
344
345        let mut buf: [usize; CAPACITY * 2] = [0; CAPACITY * 2];
346        assert_eq!(rbr.shift_into(&mut buf), CAPACITY - 1, "return len wrong");
347        for i in 0..CAPACITY - 1 {
348            assert_eq!(buf[i], i, "first half")
349        }
350        for i in CAPACITY - 1..CAPACITY * 2 {
351            assert_eq!(buf[i], 0, "second half")
352        }
353
354        assert!(rbr.shift().is_none());
355    }
356
357    #[test]
358    fn unshift_from_smaller() {
359        let rb = RingBuffer::<usize>::new();
360        let (mut rbr, mut rbw) = rb.split();
361
362        let buf: [usize; CAPACITY / 2] = [0xdead; CAPACITY / 2];
363        assert_eq!(rbw.unshift_from(&buf), CAPACITY / 2);
364        for i in 0..CAPACITY / 2 {
365            assert_eq!(rbr.shift(), Some(0xdead), "wrong value at index {}", i);
366        }
367        assert!(rbr.shift().is_none());
368    }
369
370    #[test]
371    fn unshift_from_bigger() {
372        let rb = RingBuffer::<usize>::new();
373        let (mut rbr, mut rbw) = rb.split();
374
375        let buf: [usize; CAPACITY * 2] = [0xdead; CAPACITY * 2];
376        assert_eq!(rbw.unshift_from(&buf), CAPACITY - 1);
377        assert_eq!(rbw.unshift(0xbeef), Err(Error::BufferFull));
378        for i in 0..CAPACITY - 1 {
379            assert_eq!(rbr.shift(), Some(0xdead), "wrong value at index {}", i);
380        }
381        assert!(rbr.shift().is_none());
382    }
383
384    #[test]
385    fn ownership_passes_through() {
386        static mut DROPPED: bool = false;
387        struct DropTest {}
388        impl DropTest {
389            fn i_own_it_now(self) {}
390        }
391        impl Drop for DropTest {
392            fn drop(&mut self) {
393                unsafe { DROPPED = true };
394            }
395        }
396
397        let rb = RingBuffer::<DropTest>::new();
398        let (mut rbr, mut rbw) = rb.split();
399
400        // Create a closure to take ownership of a `DropTest` so we
401        // can make sure it's not dropped when the closure is over.
402        let mut cl = |dt| {
403            rbw.unshift(dt).expect("couldn't store item");
404        };
405        cl(DropTest {});
406        assert_eq!(unsafe { DROPPED }, false);
407
408        // Still, nothing should be dropped, since we now own the
409        // value.
410        let dt = rbr.shift().expect("buffer was empty");
411        assert_eq!(unsafe { DROPPED }, false);
412
413        // And, finally, by giving ownership away, it'll get dropped.
414        dt.i_own_it_now();
415        assert_eq!(unsafe { DROPPED }, true);
416    }
417}