embassy_hal_internal/atomic_ring_buffer.rs
1//! Atomic reusable ringbuffer.
2use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
3use core::{ptr, slice};
4
5/// Atomic reusable ringbuffer
6///
7/// This ringbuffer implementation is designed to be stored in a `static`,
8/// therefore all methods take `&self` and not `&mut self`.
9///
10/// It is "reusable": when created it has no backing buffer, you can give it
11/// one with `init` and take it back with `deinit`, and init it again in the
12/// future if needed. This is very non-idiomatic, but helps a lot when storing
13/// it in a `static`.
14///
15/// One concurrent writer and one concurrent reader are supported, even at
16/// different execution priorities (like main and irq).
17pub struct RingBuffer {
18 #[doc(hidden)]
19 pub buf: AtomicPtr<u8>,
20 len: AtomicUsize,
21
22 // start and end wrap at len*2, not at len.
23 // This allows distinguishing "full" and "empty".
24 // full is when start+len == end (modulo len*2)
25 // empty is when start == end
26 //
27 // This avoids having to consider the ringbuffer "full" at len-1 instead of len.
28 // The usual solution is adding a "full" flag, but that can't be made atomic
29 #[doc(hidden)]
30 pub start: AtomicUsize,
31 #[doc(hidden)]
32 pub end: AtomicUsize,
33}
34
35/// A type which can only read from a ring buffer.
36pub struct Reader<'a>(&'a RingBuffer);
37
38/// A type which can only write to a ring buffer.
39pub struct Writer<'a>(&'a RingBuffer);
40
41impl RingBuffer {
42 /// Create a new empty ringbuffer.
43 pub const fn new() -> Self {
44 Self {
45 buf: AtomicPtr::new(core::ptr::null_mut()),
46 len: AtomicUsize::new(0),
47 start: AtomicUsize::new(0),
48 end: AtomicUsize::new(0),
49 }
50 }
51
52 /// Initialize the ring buffer with a buffer.
53 ///
54 /// # Safety
55 /// - The buffer (`buf .. buf+len`) must be valid memory until `deinit` is called.
56 /// - Must not be called concurrently with any other methods.
57 pub unsafe fn init(&self, buf: *mut u8, len: usize) {
58 // Ordering: it's OK to use `Relaxed` because this is not called
59 // concurrently with other methods.
60 self.buf.store(buf, Ordering::Relaxed);
61 self.len.store(len, Ordering::Relaxed);
62 self.start.store(0, Ordering::Relaxed);
63 self.end.store(0, Ordering::Relaxed);
64 }
65
66 /// Deinitialize the ringbuffer.
67 ///
68 /// After calling this, the ringbuffer becomes empty, as if it was
69 /// just created with `new()`.
70 ///
71 /// # Safety
72 /// - Must not be called concurrently with any other methods.
73 pub unsafe fn deinit(&self) {
74 // Ordering: it's OK to use `Relaxed` because this is not called
75 // concurrently with other methods.
76 self.buf.store(ptr::null_mut(), Ordering::Relaxed);
77 self.len.store(0, Ordering::Relaxed);
78 self.start.store(0, Ordering::Relaxed);
79 self.end.store(0, Ordering::Relaxed);
80 }
81
82 /// Create a reader.
83 ///
84 /// # Safety
85 ///
86 /// - Only one reader can exist at a time.
87 /// - Ringbuffer must be initialized.
88 pub unsafe fn reader(&self) -> Reader<'_> {
89 Reader(self)
90 }
91
92 /// Try creating a reader, fails if not initialized.
93 ///
94 /// # Safety
95 ///
96 /// Only one reader can exist at a time.
97 pub unsafe fn try_reader(&self) -> Option<Reader<'_>> {
98 if self.buf.load(Ordering::Relaxed).is_null() {
99 return None;
100 }
101 Some(Reader(self))
102 }
103
104 /// Create a writer.
105 ///
106 /// # Safety
107 ///
108 /// - Only one writer can exist at a time.
109 /// - Ringbuffer must be initialized.
110 pub unsafe fn writer(&self) -> Writer<'_> {
111 Writer(self)
112 }
113
114 /// Try creating a writer, fails if not initialized.
115 ///
116 /// # Safety
117 ///
118 /// Only one writer can exist at a time.
119 pub unsafe fn try_writer(&self) -> Option<Writer<'_>> {
120 if self.buf.load(Ordering::Relaxed).is_null() {
121 return None;
122 }
123 Some(Writer(self))
124 }
125
126 /// Return if buffer is available.
127 pub fn is_available(&self) -> bool {
128 !self.buf.load(Ordering::Relaxed).is_null() && self.len.load(Ordering::Relaxed) != 0
129 }
130
131 /// Return length of buffer.
132 pub fn len(&self) -> usize {
133 self.len.load(Ordering::Relaxed)
134 }
135
136 /// Check if buffer is full.
137 pub fn is_full(&self) -> bool {
138 let len = self.len.load(Ordering::Relaxed);
139 let start = self.start.load(Ordering::Relaxed);
140 let end = self.end.load(Ordering::Relaxed);
141
142 self.wrap(start + len) == end
143 }
144
145 /// Check if buffer is empty.
146 pub fn is_empty(&self) -> bool {
147 let start = self.start.load(Ordering::Relaxed);
148 let end = self.end.load(Ordering::Relaxed);
149
150 start == end
151 }
152
153 fn wrap(&self, mut n: usize) -> usize {
154 let len = self.len.load(Ordering::Relaxed);
155
156 if n >= len * 2 {
157 n -= len * 2
158 }
159 n
160 }
161}
162
163impl<'a> Writer<'a> {
164 /// Push data into the buffer in-place.
165 ///
166 /// The closure `f` is called with a free part of the buffer, it must write
167 /// some data to it and return the amount of bytes written.
168 pub fn push(&mut self, f: impl FnOnce(&mut [u8]) -> usize) -> usize {
169 let (p, n) = self.push_buf();
170 let buf = unsafe { slice::from_raw_parts_mut(p, n) };
171 let n = f(buf);
172 self.push_done(n);
173 n
174 }
175
176 /// Push one data byte.
177 ///
178 /// Returns true if pushed successfully.
179 pub fn push_one(&mut self, val: u8) -> bool {
180 let n = self.push(|f| match f {
181 [] => 0,
182 [x, ..] => {
183 *x = val;
184 1
185 }
186 });
187 n != 0
188 }
189
190 /// Get a buffer where data can be pushed to.
191 ///
192 /// Equivalent to [`Self::push_buf`] but returns a slice.
193 pub fn push_slice(&mut self) -> &mut [u8] {
194 let (data, len) = self.push_buf();
195 unsafe { slice::from_raw_parts_mut(data, len) }
196 }
197
198 /// Get up to two buffers where data can be pushed to.
199 ///
200 /// Equivalent to [`Self::push_bufs`] but returns slices.
201 pub fn push_slices(&mut self) -> [&mut [u8]; 2] {
202 let [(d0, l0), (d1, l1)] = self.push_bufs();
203 unsafe { [slice::from_raw_parts_mut(d0, l0), slice::from_raw_parts_mut(d1, l1)] }
204 }
205
206 /// Get a buffer where data can be pushed to.
207 ///
208 /// Write data to the start of the buffer, then call `push_done` with
209 /// however many bytes you've pushed.
210 ///
211 /// The buffer is suitable to DMA to.
212 ///
213 /// If the ringbuf is full, size=0 will be returned.
214 ///
215 /// The buffer stays valid as long as no other `Writer` method is called
216 /// and `init`/`deinit` aren't called on the ringbuf.
217 pub fn push_buf(&mut self) -> (*mut u8, usize) {
218 // Ordering: popping writes `start` last, so we read `start` first.
219 // Read it with Acquire ordering, so that the next accesses can't be reordered up past it.
220 let mut start = self.0.start.load(Ordering::Acquire);
221 let buf = self.0.buf.load(Ordering::Relaxed);
222 let len = self.0.len.load(Ordering::Relaxed);
223 let mut end = self.0.end.load(Ordering::Relaxed);
224
225 let empty = start == end;
226
227 if start >= len {
228 start -= len
229 }
230 if end >= len {
231 end -= len
232 }
233
234 if start == end && !empty {
235 // full
236 return (buf, 0);
237 }
238 let n = if start > end { start - end } else { len - end };
239
240 trace!(" ringbuf: push_buf {:?}..{:?}", end, end + n);
241 (unsafe { buf.add(end) }, n)
242 }
243
244 /// Get up to two buffers where data can be pushed to.
245 ///
246 /// Write data starting at the beginning of the first buffer, then call
247 /// `push_done` with however many bytes you've pushed.
248 ///
249 /// The buffers are suitable to DMA to.
250 ///
251 /// If the ringbuf is full, both buffers will be zero length.
252 /// If there is only area available, the second buffer will be zero length.
253 ///
254 /// The buffer stays valid as long as no other `Writer` method is called
255 /// and `init`/`deinit` aren't called on the ringbuf.
256 pub fn push_bufs(&mut self) -> [(*mut u8, usize); 2] {
257 // Ordering: as per push_buf()
258 let mut start = self.0.start.load(Ordering::Acquire);
259 let buf = self.0.buf.load(Ordering::Relaxed);
260 let len = self.0.len.load(Ordering::Relaxed);
261 let mut end = self.0.end.load(Ordering::Relaxed);
262
263 let empty = start == end;
264
265 if start >= len {
266 start -= len
267 }
268 if end >= len {
269 end -= len
270 }
271
272 if start == end && !empty {
273 // full
274 return [(buf, 0), (buf, 0)];
275 }
276 let n0 = if start > end { start - end } else { len - end };
277 let n1 = if start <= end { start } else { 0 };
278
279 trace!(" ringbuf: push_bufs [{:?}..{:?}, {:?}..{:?}]", end, end + n0, 0, n1);
280 [(unsafe { buf.add(end) }, n0), (buf, n1)]
281 }
282
283 /// Mark n bytes as written and advance the write index.
284 pub fn push_done(&mut self, n: usize) {
285 trace!(" ringbuf: push {:?}", n);
286 let end = self.0.end.load(Ordering::Relaxed);
287
288 // Ordering: write `end` last, with Release ordering.
289 // The ordering ensures no preceding memory accesses (such as writing
290 // the actual data in the buffer) can be reordered down past it, which
291 // will guarantee the reader sees them after reading from `end`.
292 self.0.end.store(self.0.wrap(end + n), Ordering::Release);
293 }
294}
295
296impl<'a> Reader<'a> {
297 /// Pop data from the buffer in-place.
298 ///
299 /// The closure `f` is called with the next data, it must process
300 /// some data from it and return the amount of bytes processed.
301 pub fn pop(&mut self, f: impl FnOnce(&[u8]) -> usize) -> usize {
302 let (p, n) = self.pop_buf();
303 let buf = unsafe { slice::from_raw_parts(p, n) };
304 let n = f(buf);
305 self.pop_done(n);
306 n
307 }
308
309 /// Pop one data byte.
310 ///
311 /// Returns true if popped successfully.
312 pub fn pop_one(&mut self) -> Option<u8> {
313 let mut res = None;
314 self.pop(|f| match f {
315 &[] => 0,
316 &[x, ..] => {
317 res = Some(x);
318 1
319 }
320 });
321 res
322 }
323
324 /// Get a buffer where data can be popped from.
325 ///
326 /// Equivalent to [`Self::pop_buf`] but returns a slice.
327 pub fn pop_slice(&mut self) -> &mut [u8] {
328 let (data, len) = self.pop_buf();
329 unsafe { slice::from_raw_parts_mut(data, len) }
330 }
331
332 /// Get a buffer where data can be popped from.
333 ///
334 /// Read data from the start of the buffer, then call `pop_done` with
335 /// however many bytes you've processed.
336 ///
337 /// The buffer is suitable to DMA from.
338 ///
339 /// If the ringbuf is empty, size=0 will be returned.
340 ///
341 /// The buffer stays valid as long as no other `Reader` method is called
342 /// and `init`/`deinit` aren't called on the ringbuf.
343 pub fn pop_buf(&mut self) -> (*mut u8, usize) {
344 // Ordering: pushing writes `end` last, so we read `end` first.
345 // Read it with Acquire ordering, so that the next accesses can't be reordered up past it.
346 // This is needed to guarantee we "see" the data written by the writer.
347 let mut end = self.0.end.load(Ordering::Acquire);
348 let buf = self.0.buf.load(Ordering::Relaxed);
349 let len = self.0.len.load(Ordering::Relaxed);
350 let mut start = self.0.start.load(Ordering::Relaxed);
351
352 if start == end {
353 return (buf, 0);
354 }
355
356 if start >= len {
357 start -= len
358 }
359 if end >= len {
360 end -= len
361 }
362
363 let n = if end > start { end - start } else { len - start };
364
365 trace!(" ringbuf: pop_buf {:?}..{:?}", start, start + n);
366 (unsafe { buf.add(start) }, n)
367 }
368
369 /// Mark n bytes as read and allow advance the read index.
370 pub fn pop_done(&mut self, n: usize) {
371 trace!(" ringbuf: pop {:?}", n);
372
373 let start = self.0.start.load(Ordering::Relaxed);
374
375 // Ordering: write `start` last, with Release ordering.
376 // The ordering ensures no preceding memory accesses (such as reading
377 // the actual data) can be reordered down past it. This is necessary
378 // because writing to `start` is effectively freeing the read part of the
379 // buffer, which "gives permission" to the writer to write to it again.
380 // Therefore, all buffer accesses must be completed before this.
381 self.0.start.store(self.0.wrap(start + n), Ordering::Release);
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[test]
390 fn push_pop() {
391 let mut b = [0; 4];
392 let rb = RingBuffer::new();
393 unsafe {
394 rb.init(b.as_mut_ptr(), 4);
395
396 assert_eq!(rb.is_empty(), true);
397 assert_eq!(rb.is_full(), false);
398
399 rb.writer().push(|buf| {
400 assert_eq!(4, buf.len());
401 buf[0] = 1;
402 buf[1] = 2;
403 buf[2] = 3;
404 buf[3] = 4;
405 4
406 });
407
408 assert_eq!(rb.is_empty(), false);
409 assert_eq!(rb.is_full(), true);
410
411 rb.writer().push(|buf| {
412 // If it's full, we can push 0 bytes.
413 assert_eq!(0, buf.len());
414 0
415 });
416
417 assert_eq!(rb.is_empty(), false);
418 assert_eq!(rb.is_full(), true);
419
420 rb.reader().pop(|buf| {
421 assert_eq!(4, buf.len());
422 assert_eq!(1, buf[0]);
423 1
424 });
425
426 assert_eq!(rb.is_empty(), false);
427 assert_eq!(rb.is_full(), false);
428
429 rb.reader().pop(|buf| {
430 assert_eq!(3, buf.len());
431 0
432 });
433
434 assert_eq!(rb.is_empty(), false);
435 assert_eq!(rb.is_full(), false);
436
437 rb.reader().pop(|buf| {
438 assert_eq!(3, buf.len());
439 assert_eq!(2, buf[0]);
440 assert_eq!(3, buf[1]);
441 2
442 });
443 rb.reader().pop(|buf| {
444 assert_eq!(1, buf.len());
445 assert_eq!(4, buf[0]);
446 1
447 });
448
449 assert_eq!(rb.is_empty(), true);
450 assert_eq!(rb.is_full(), false);
451
452 rb.reader().pop(|buf| {
453 assert_eq!(0, buf.len());
454 0
455 });
456
457 rb.writer().push(|buf| {
458 assert_eq!(4, buf.len());
459 buf[0] = 10;
460 1
461 });
462
463 rb.writer().push(|buf| {
464 assert_eq!(3, buf.len());
465 buf[0] = 11;
466 buf[1] = 12;
467 2
468 });
469
470 assert_eq!(rb.is_empty(), false);
471 assert_eq!(rb.is_full(), false);
472
473 rb.writer().push(|buf| {
474 assert_eq!(1, buf.len());
475 buf[0] = 13;
476 1
477 });
478
479 assert_eq!(rb.is_empty(), false);
480 assert_eq!(rb.is_full(), true);
481 }
482 }
483
484 #[test]
485 fn zero_len() {
486 let mut b = [0; 0];
487
488 let rb = RingBuffer::new();
489 unsafe {
490 rb.init(b.as_mut_ptr(), b.len());
491
492 assert_eq!(rb.is_empty(), true);
493 assert_eq!(rb.is_full(), true);
494
495 rb.writer().push(|buf| {
496 assert_eq!(0, buf.len());
497 0
498 });
499
500 rb.reader().pop(|buf| {
501 assert_eq!(0, buf.len());
502 0
503 });
504 }
505 }
506
507 #[test]
508 fn push_slices() {
509 let mut b = [0; 4];
510 let rb = RingBuffer::new();
511 unsafe {
512 rb.init(b.as_mut_ptr(), 4);
513
514 /* push 3 -> [1 2 3 x] */
515 let mut w = rb.writer();
516 let ps = w.push_slices();
517 assert_eq!(4, ps[0].len());
518 assert_eq!(0, ps[1].len());
519 ps[0][0] = 1;
520 ps[0][1] = 2;
521 ps[0][2] = 3;
522 w.push_done(3);
523 drop(w);
524
525 /* pop 2 -> [x x 3 x] */
526 rb.reader().pop(|buf| {
527 assert_eq!(3, buf.len());
528 assert_eq!(1, buf[0]);
529 assert_eq!(2, buf[1]);
530 assert_eq!(3, buf[2]);
531 2
532 });
533
534 /* push 3 -> [5 6 3 4] */
535 let mut w = rb.writer();
536 let ps = w.push_slices();
537 assert_eq!(1, ps[0].len());
538 assert_eq!(2, ps[1].len());
539 ps[0][0] = 4;
540 ps[1][0] = 5;
541 ps[1][1] = 6;
542 w.push_done(3);
543 drop(w);
544
545 /* buf is now full */
546 let mut w = rb.writer();
547 let ps = w.push_slices();
548 assert_eq!(0, ps[0].len());
549 assert_eq!(0, ps[1].len());
550
551 /* pop 2 -> [5 6 x x] */
552 rb.reader().pop(|buf| {
553 assert_eq!(2, buf.len());
554 assert_eq!(3, buf[0]);
555 assert_eq!(4, buf[1]);
556 2
557 });
558
559 /* should now have one push slice again */
560 let mut w = rb.writer();
561 let ps = w.push_slices();
562 assert_eq!(2, ps[0].len());
563 assert_eq!(0, ps[1].len());
564 drop(w);
565
566 /* pop 2 -> [x x x x] */
567 rb.reader().pop(|buf| {
568 assert_eq!(2, buf.len());
569 assert_eq!(5, buf[0]);
570 assert_eq!(6, buf[1]);
571 2
572 });
573
574 /* should now have two push slices */
575 let mut w = rb.writer();
576 let ps = w.push_slices();
577 assert_eq!(2, ps[0].len());
578 assert_eq!(2, ps[1].len());
579 drop(w);
580
581 /* make sure we exercise all wrap around cases properly */
582 for _ in 0..10 {
583 /* should be empty, push 1 */
584 let mut w = rb.writer();
585 let ps = w.push_slices();
586 assert_eq!(4, ps[0].len() + ps[1].len());
587 w.push_done(1);
588 drop(w);
589
590 /* should have 1 element */
591 let mut w = rb.writer();
592 let ps = w.push_slices();
593 assert_eq!(3, ps[0].len() + ps[1].len());
594 drop(w);
595
596 /* pop 1 */
597 rb.reader().pop(|buf| {
598 assert_eq!(1, buf.len());
599 1
600 });
601 }
602 }
603 }
604}