embassy_hal_internal/
atomic_ring_buffer.rs

1//! Atomic reusable ringbuffer.
2use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
3use core::{ptr, slice};
4
5/// Atomic reusable ringbuffer
6///
7/// This ringbuffer implementation is designed to be stored in a `static`,
8/// therefore all methods take `&self` and not `&mut self`.
9///
10/// It is "reusable": when created it has no backing buffer, you can give it
11/// one with `init` and take it back with `deinit`, and init it again in the
12/// future if needed. This is very non-idiomatic, but helps a lot when storing
13/// it in a `static`.
14///
15/// One concurrent writer and one concurrent reader are supported, even at
16/// different execution priorities (like main and irq).
17pub struct RingBuffer {
18    #[doc(hidden)]
19    pub buf: AtomicPtr<u8>,
20    len: AtomicUsize,
21
22    // start and end wrap at len*2, not at len.
23    // This allows distinguishing "full" and "empty".
24    // full is when start+len == end (modulo len*2)
25    // empty is when start == end
26    //
27    // This avoids having to consider the ringbuffer "full" at len-1 instead of len.
28    // The usual solution is adding a "full" flag, but that can't be made atomic
29    #[doc(hidden)]
30    pub start: AtomicUsize,
31    #[doc(hidden)]
32    pub end: AtomicUsize,
33}
34
35/// A type which can only read from a ring buffer.
36pub struct Reader<'a>(&'a RingBuffer);
37
38/// A type which can only write to a ring buffer.
39pub struct Writer<'a>(&'a RingBuffer);
40
41impl RingBuffer {
42    /// Create a new empty ringbuffer.
43    pub const fn new() -> Self {
44        Self {
45            buf: AtomicPtr::new(core::ptr::null_mut()),
46            len: AtomicUsize::new(0),
47            start: AtomicUsize::new(0),
48            end: AtomicUsize::new(0),
49        }
50    }
51
52    /// Initialize the ring buffer with a buffer.
53    ///
54    /// # Safety
55    /// - The buffer (`buf .. buf+len`) must be valid memory until `deinit` is called.
56    /// - Must not be called concurrently with any other methods.
57    pub unsafe fn init(&self, buf: *mut u8, len: usize) {
58        // Ordering: it's OK to use `Relaxed` because this is not called
59        // concurrently with other methods.
60        self.buf.store(buf, Ordering::Relaxed);
61        self.len.store(len, Ordering::Relaxed);
62        self.start.store(0, Ordering::Relaxed);
63        self.end.store(0, Ordering::Relaxed);
64    }
65
66    /// Deinitialize the ringbuffer.
67    ///
68    /// After calling this, the ringbuffer becomes empty, as if it was
69    /// just created with `new()`.
70    ///
71    /// # Safety
72    /// - Must not be called concurrently with any other methods.
73    pub unsafe fn deinit(&self) {
74        // Ordering: it's OK to use `Relaxed` because this is not called
75        // concurrently with other methods.
76        self.buf.store(ptr::null_mut(), Ordering::Relaxed);
77        self.len.store(0, Ordering::Relaxed);
78        self.start.store(0, Ordering::Relaxed);
79        self.end.store(0, Ordering::Relaxed);
80    }
81
82    /// Create a reader.
83    ///
84    /// # Safety
85    ///
86    /// - Only one reader can exist at a time.
87    /// - Ringbuffer must be initialized.
88    pub unsafe fn reader(&self) -> Reader<'_> {
89        Reader(self)
90    }
91
92    /// Try creating a reader, fails if not initialized.
93    ///
94    /// # Safety
95    ///
96    /// Only one reader can exist at a time.
97    pub unsafe fn try_reader(&self) -> Option<Reader<'_>> {
98        if self.buf.load(Ordering::Relaxed).is_null() {
99            return None;
100        }
101        Some(Reader(self))
102    }
103
104    /// Create a writer.
105    ///
106    /// # Safety
107    ///
108    /// - Only one writer can exist at a time.
109    /// - Ringbuffer must be initialized.
110    pub unsafe fn writer(&self) -> Writer<'_> {
111        Writer(self)
112    }
113
114    /// Try creating a writer, fails if not initialized.
115    ///
116    /// # Safety
117    ///
118    /// Only one writer can exist at a time.
119    pub unsafe fn try_writer(&self) -> Option<Writer<'_>> {
120        if self.buf.load(Ordering::Relaxed).is_null() {
121            return None;
122        }
123        Some(Writer(self))
124    }
125
126    /// Return if buffer is available.
127    pub fn is_available(&self) -> bool {
128        !self.buf.load(Ordering::Relaxed).is_null() && self.len.load(Ordering::Relaxed) != 0
129    }
130
131    /// Return length of buffer.
132    pub fn len(&self) -> usize {
133        self.len.load(Ordering::Relaxed)
134    }
135
136    /// Check if buffer is full.
137    pub fn is_full(&self) -> bool {
138        let len = self.len.load(Ordering::Relaxed);
139        let start = self.start.load(Ordering::Relaxed);
140        let end = self.end.load(Ordering::Relaxed);
141
142        self.wrap(start + len) == end
143    }
144
145    /// Check if buffer is empty.
146    pub fn is_empty(&self) -> bool {
147        let start = self.start.load(Ordering::Relaxed);
148        let end = self.end.load(Ordering::Relaxed);
149
150        start == end
151    }
152
153    fn wrap(&self, mut n: usize) -> usize {
154        let len = self.len.load(Ordering::Relaxed);
155
156        if n >= len * 2 {
157            n -= len * 2
158        }
159        n
160    }
161}
162
163impl<'a> Writer<'a> {
164    /// Push data into the buffer in-place.
165    ///
166    /// The closure `f` is called with a free part of the buffer, it must write
167    /// some data to it and return the amount of bytes written.
168    pub fn push(&mut self, f: impl FnOnce(&mut [u8]) -> usize) -> usize {
169        let (p, n) = self.push_buf();
170        let buf = unsafe { slice::from_raw_parts_mut(p, n) };
171        let n = f(buf);
172        self.push_done(n);
173        n
174    }
175
176    /// Push one data byte.
177    ///
178    /// Returns true if pushed successfully.
179    pub fn push_one(&mut self, val: u8) -> bool {
180        let n = self.push(|f| match f {
181            [] => 0,
182            [x, ..] => {
183                *x = val;
184                1
185            }
186        });
187        n != 0
188    }
189
190    /// Get a buffer where data can be pushed to.
191    ///
192    /// Equivalent to [`Self::push_buf`] but returns a slice.
193    pub fn push_slice(&mut self) -> &mut [u8] {
194        let (data, len) = self.push_buf();
195        unsafe { slice::from_raw_parts_mut(data, len) }
196    }
197
198    /// Get up to two buffers where data can be pushed to.
199    ///
200    /// Equivalent to [`Self::push_bufs`] but returns slices.
201    pub fn push_slices(&mut self) -> [&mut [u8]; 2] {
202        let [(d0, l0), (d1, l1)] = self.push_bufs();
203        unsafe { [slice::from_raw_parts_mut(d0, l0), slice::from_raw_parts_mut(d1, l1)] }
204    }
205
206    /// Get a buffer where data can be pushed to.
207    ///
208    /// Write data to the start of the buffer, then call `push_done` with
209    /// however many bytes you've pushed.
210    ///
211    /// The buffer is suitable to DMA to.
212    ///
213    /// If the ringbuf is full, size=0 will be returned.
214    ///
215    /// The buffer stays valid as long as no other `Writer` method is called
216    /// and `init`/`deinit` aren't called on the ringbuf.
217    pub fn push_buf(&mut self) -> (*mut u8, usize) {
218        // Ordering: popping writes `start` last, so we read `start` first.
219        // Read it with Acquire ordering, so that the next accesses can't be reordered up past it.
220        let mut start = self.0.start.load(Ordering::Acquire);
221        let buf = self.0.buf.load(Ordering::Relaxed);
222        let len = self.0.len.load(Ordering::Relaxed);
223        let mut end = self.0.end.load(Ordering::Relaxed);
224
225        let empty = start == end;
226
227        if start >= len {
228            start -= len
229        }
230        if end >= len {
231            end -= len
232        }
233
234        if start == end && !empty {
235            // full
236            return (buf, 0);
237        }
238        let n = if start > end { start - end } else { len - end };
239
240        trace!("  ringbuf: push_buf {:?}..{:?}", end, end + n);
241        (unsafe { buf.add(end) }, n)
242    }
243
244    /// Get up to two buffers where data can be pushed to.
245    ///
246    /// Write data starting at the beginning of the first buffer, then call
247    /// `push_done` with however many bytes you've pushed.
248    ///
249    /// The buffers are suitable to DMA to.
250    ///
251    /// If the ringbuf is full, both buffers will be zero length.
252    /// If there is only area available, the second buffer will be zero length.
253    ///
254    /// The buffer stays valid as long as no other `Writer` method is called
255    /// and `init`/`deinit` aren't called on the ringbuf.
256    pub fn push_bufs(&mut self) -> [(*mut u8, usize); 2] {
257        // Ordering: as per push_buf()
258        let mut start = self.0.start.load(Ordering::Acquire);
259        let buf = self.0.buf.load(Ordering::Relaxed);
260        let len = self.0.len.load(Ordering::Relaxed);
261        let mut end = self.0.end.load(Ordering::Relaxed);
262
263        let empty = start == end;
264
265        if start >= len {
266            start -= len
267        }
268        if end >= len {
269            end -= len
270        }
271
272        if start == end && !empty {
273            // full
274            return [(buf, 0), (buf, 0)];
275        }
276        let n0 = if start > end { start - end } else { len - end };
277        let n1 = if start <= end { start } else { 0 };
278
279        trace!("  ringbuf: push_bufs [{:?}..{:?}, {:?}..{:?}]", end, end + n0, 0, n1);
280        [(unsafe { buf.add(end) }, n0), (buf, n1)]
281    }
282
283    /// Mark n bytes as written and advance the write index.
284    pub fn push_done(&mut self, n: usize) {
285        trace!("  ringbuf: push {:?}", n);
286        let end = self.0.end.load(Ordering::Relaxed);
287
288        // Ordering: write `end` last, with Release ordering.
289        // The ordering ensures no preceding memory accesses (such as writing
290        // the actual data in the buffer) can be reordered down past it, which
291        // will guarantee the reader sees them after reading from `end`.
292        self.0.end.store(self.0.wrap(end + n), Ordering::Release);
293    }
294}
295
296impl<'a> Reader<'a> {
297    /// Pop data from the buffer in-place.
298    ///
299    /// The closure `f` is called with the next data, it must process
300    /// some data from it and return the amount of bytes processed.
301    pub fn pop(&mut self, f: impl FnOnce(&[u8]) -> usize) -> usize {
302        let (p, n) = self.pop_buf();
303        let buf = unsafe { slice::from_raw_parts(p, n) };
304        let n = f(buf);
305        self.pop_done(n);
306        n
307    }
308
309    /// Pop one data byte.
310    ///
311    /// Returns true if popped successfully.
312    pub fn pop_one(&mut self) -> Option<u8> {
313        let mut res = None;
314        self.pop(|f| match f {
315            &[] => 0,
316            &[x, ..] => {
317                res = Some(x);
318                1
319            }
320        });
321        res
322    }
323
324    /// Get a buffer where data can be popped from.
325    ///
326    /// Equivalent to [`Self::pop_buf`] but returns a slice.
327    pub fn pop_slice(&mut self) -> &mut [u8] {
328        let (data, len) = self.pop_buf();
329        unsafe { slice::from_raw_parts_mut(data, len) }
330    }
331
332    /// Get a buffer where data can be popped from.
333    ///
334    /// Read data from the start of the buffer, then call `pop_done` with
335    /// however many bytes you've processed.
336    ///
337    /// The buffer is suitable to DMA from.
338    ///
339    /// If the ringbuf is empty, size=0 will be returned.
340    ///
341    /// The buffer stays valid as long as no other `Reader` method is called
342    /// and `init`/`deinit` aren't called on the ringbuf.
343    pub fn pop_buf(&mut self) -> (*mut u8, usize) {
344        // Ordering: pushing writes `end` last, so we read `end` first.
345        // Read it with Acquire ordering, so that the next accesses can't be reordered up past it.
346        // This is needed to guarantee we "see" the data written by the writer.
347        let mut end = self.0.end.load(Ordering::Acquire);
348        let buf = self.0.buf.load(Ordering::Relaxed);
349        let len = self.0.len.load(Ordering::Relaxed);
350        let mut start = self.0.start.load(Ordering::Relaxed);
351
352        if start == end {
353            return (buf, 0);
354        }
355
356        if start >= len {
357            start -= len
358        }
359        if end >= len {
360            end -= len
361        }
362
363        let n = if end > start { end - start } else { len - start };
364
365        trace!("  ringbuf: pop_buf {:?}..{:?}", start, start + n);
366        (unsafe { buf.add(start) }, n)
367    }
368
369    /// Mark n bytes as read and allow advance the read index.
370    pub fn pop_done(&mut self, n: usize) {
371        trace!("  ringbuf: pop {:?}", n);
372
373        let start = self.0.start.load(Ordering::Relaxed);
374
375        // Ordering: write `start` last, with Release ordering.
376        // The ordering ensures no preceding memory accesses (such as reading
377        // the actual data) can be reordered down past it. This is necessary
378        // because writing to `start` is effectively freeing the read part of the
379        // buffer, which "gives permission" to the writer to write to it again.
380        // Therefore, all buffer accesses must be completed before this.
381        self.0.start.store(self.0.wrap(start + n), Ordering::Release);
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn push_pop() {
391        let mut b = [0; 4];
392        let rb = RingBuffer::new();
393        unsafe {
394            rb.init(b.as_mut_ptr(), 4);
395
396            assert_eq!(rb.is_empty(), true);
397            assert_eq!(rb.is_full(), false);
398
399            rb.writer().push(|buf| {
400                assert_eq!(4, buf.len());
401                buf[0] = 1;
402                buf[1] = 2;
403                buf[2] = 3;
404                buf[3] = 4;
405                4
406            });
407
408            assert_eq!(rb.is_empty(), false);
409            assert_eq!(rb.is_full(), true);
410
411            rb.writer().push(|buf| {
412                // If it's full, we can push 0 bytes.
413                assert_eq!(0, buf.len());
414                0
415            });
416
417            assert_eq!(rb.is_empty(), false);
418            assert_eq!(rb.is_full(), true);
419
420            rb.reader().pop(|buf| {
421                assert_eq!(4, buf.len());
422                assert_eq!(1, buf[0]);
423                1
424            });
425
426            assert_eq!(rb.is_empty(), false);
427            assert_eq!(rb.is_full(), false);
428
429            rb.reader().pop(|buf| {
430                assert_eq!(3, buf.len());
431                0
432            });
433
434            assert_eq!(rb.is_empty(), false);
435            assert_eq!(rb.is_full(), false);
436
437            rb.reader().pop(|buf| {
438                assert_eq!(3, buf.len());
439                assert_eq!(2, buf[0]);
440                assert_eq!(3, buf[1]);
441                2
442            });
443            rb.reader().pop(|buf| {
444                assert_eq!(1, buf.len());
445                assert_eq!(4, buf[0]);
446                1
447            });
448
449            assert_eq!(rb.is_empty(), true);
450            assert_eq!(rb.is_full(), false);
451
452            rb.reader().pop(|buf| {
453                assert_eq!(0, buf.len());
454                0
455            });
456
457            rb.writer().push(|buf| {
458                assert_eq!(4, buf.len());
459                buf[0] = 10;
460                1
461            });
462
463            rb.writer().push(|buf| {
464                assert_eq!(3, buf.len());
465                buf[0] = 11;
466                buf[1] = 12;
467                2
468            });
469
470            assert_eq!(rb.is_empty(), false);
471            assert_eq!(rb.is_full(), false);
472
473            rb.writer().push(|buf| {
474                assert_eq!(1, buf.len());
475                buf[0] = 13;
476                1
477            });
478
479            assert_eq!(rb.is_empty(), false);
480            assert_eq!(rb.is_full(), true);
481        }
482    }
483
484    #[test]
485    fn zero_len() {
486        let mut b = [0; 0];
487
488        let rb = RingBuffer::new();
489        unsafe {
490            rb.init(b.as_mut_ptr(), b.len());
491
492            assert_eq!(rb.is_empty(), true);
493            assert_eq!(rb.is_full(), true);
494
495            rb.writer().push(|buf| {
496                assert_eq!(0, buf.len());
497                0
498            });
499
500            rb.reader().pop(|buf| {
501                assert_eq!(0, buf.len());
502                0
503            });
504        }
505    }
506
507    #[test]
508    fn push_slices() {
509        let mut b = [0; 4];
510        let rb = RingBuffer::new();
511        unsafe {
512            rb.init(b.as_mut_ptr(), 4);
513
514            /* push 3 -> [1 2 3 x] */
515            let mut w = rb.writer();
516            let ps = w.push_slices();
517            assert_eq!(4, ps[0].len());
518            assert_eq!(0, ps[1].len());
519            ps[0][0] = 1;
520            ps[0][1] = 2;
521            ps[0][2] = 3;
522            w.push_done(3);
523            drop(w);
524
525            /* pop 2 -> [x x 3 x] */
526            rb.reader().pop(|buf| {
527                assert_eq!(3, buf.len());
528                assert_eq!(1, buf[0]);
529                assert_eq!(2, buf[1]);
530                assert_eq!(3, buf[2]);
531                2
532            });
533
534            /* push 3 -> [5 6 3 4] */
535            let mut w = rb.writer();
536            let ps = w.push_slices();
537            assert_eq!(1, ps[0].len());
538            assert_eq!(2, ps[1].len());
539            ps[0][0] = 4;
540            ps[1][0] = 5;
541            ps[1][1] = 6;
542            w.push_done(3);
543            drop(w);
544
545            /* buf is now full */
546            let mut w = rb.writer();
547            let ps = w.push_slices();
548            assert_eq!(0, ps[0].len());
549            assert_eq!(0, ps[1].len());
550
551            /* pop 2 -> [5 6 x x] */
552            rb.reader().pop(|buf| {
553                assert_eq!(2, buf.len());
554                assert_eq!(3, buf[0]);
555                assert_eq!(4, buf[1]);
556                2
557            });
558
559            /* should now have one push slice again */
560            let mut w = rb.writer();
561            let ps = w.push_slices();
562            assert_eq!(2, ps[0].len());
563            assert_eq!(0, ps[1].len());
564            drop(w);
565
566            /* pop 2 -> [x x x x] */
567            rb.reader().pop(|buf| {
568                assert_eq!(2, buf.len());
569                assert_eq!(5, buf[0]);
570                assert_eq!(6, buf[1]);
571                2
572            });
573
574            /* should now have two push slices */
575            let mut w = rb.writer();
576            let ps = w.push_slices();
577            assert_eq!(2, ps[0].len());
578            assert_eq!(2, ps[1].len());
579            drop(w);
580
581            /* make sure we exercise all wrap around cases properly */
582            for _ in 0..10 {
583                /* should be empty, push 1 */
584                let mut w = rb.writer();
585                let ps = w.push_slices();
586                assert_eq!(4, ps[0].len() + ps[1].len());
587                w.push_done(1);
588                drop(w);
589
590                /* should have 1 element */
591                let mut w = rb.writer();
592                let ps = w.push_slices();
593                assert_eq!(3, ps[0].len() + ps[1].len());
594                drop(w);
595
596                /* pop 1 */
597                rb.reader().pop(|buf| {
598                    assert_eq!(1, buf.len());
599                    1
600                });
601            }
602        }
603    }
604}