1#![no_std]
7
8use core::{
9 cell::UnsafeCell,
10 cmp,
11 mem::MaybeUninit,
12 ptr,
13 sync::atomic::{self, AtomicUsize, Ordering},
14};
15
16pub const CAPACITY: usize = 1024;
22
23#[derive(Debug, PartialEq)]
25pub enum Error {
26 BufferFull,
30}
31
32pub struct RingBuffer<T> {
35 head: AtomicUsize,
36 tail: AtomicUsize,
37
38 buf: UnsafeCell<MaybeUninit<[T; CAPACITY]>>,
41}
42
43impl<T> RingBuffer<T> {
44 pub const fn new() -> Self {
46 Self {
47 head: AtomicUsize::new(0),
48 tail: AtomicUsize::new(0),
49 buf: UnsafeCell::new(MaybeUninit::uninit()),
50 }
51 }
52
53 pub const fn split(&self) -> (Reader<T>, Writer<T>) {
64 let rbr = Reader { rb: &self };
65 let rbw = Writer { rb: &self };
66 (rbr, rbw)
67 }
68}
69unsafe impl<T> Send for RingBuffer<T> where T: Send {}
70
71pub struct Reader<'a, T> {
73 rb: &'a RingBuffer<T>,
74}
75unsafe impl<T> Send for Reader<'_, T> where T: Send {}
76
77pub struct Writer<'a, T> {
79 rb: &'a RingBuffer<T>,
80}
81unsafe impl<T> Send for Writer<'_, T> where T: Send {}
82
83impl<T> Reader<'_, T> {
84 pub fn len(&self) -> usize {
91 let h = self.rb.head.load(Ordering::Relaxed);
92 let t = self.rb.tail.load(Ordering::Relaxed);
93 atomic::fence(Ordering::Acquire);
94
95 (t + CAPACITY - h) % CAPACITY
96 }
97
98 pub fn is_empty(&self) -> bool {
100 self.len() == 0
101 }
102
103 pub fn shift(&mut self) -> Option<T> {
108 let h = self.rb.head.load(Ordering::Relaxed);
109 let t = self.rb.tail.load(Ordering::Relaxed);
110
111 if h == t {
112 None
113 } else {
114 atomic::fence(Ordering::Acquire);
115 let nh = (h + 1) % CAPACITY;
116 let rc = unsafe {
117 let buf: &MaybeUninit<[T; CAPACITY]> = &*self.rb.buf.get();
118 Some(Self::load_val_at(h, buf))
119 };
120 atomic::fence(Ordering::Release);
121 self.rb.head.store(nh, Ordering::Relaxed);
122 rc
123 }
124 }
125
126 pub fn shift_into(&mut self, buf: &mut [T]) -> usize {
130 let mut h = self.rb.head.load(Ordering::Relaxed);
131 let t = self.rb.tail.load(Ordering::Relaxed);
132 atomic::fence(Ordering::Acquire);
133
134 let mylen = (t + CAPACITY - h) % CAPACITY;
135 let buflen = buf.len();
136 let len = cmp::min(mylen, buflen);
137
138 unsafe {
139 let rbuf: &MaybeUninit<[T; CAPACITY]> = &*self.rb.buf.get();
140 for i in 0..len {
141 *buf.get_unchecked_mut(i) = Self::load_val_at(h, rbuf);
142 h = (h + 1) % CAPACITY;
143 }
144 }
145
146 atomic::fence(Ordering::Release);
147 self.rb.head.store(h, Ordering::Relaxed);
148 len
149 }
150
151 #[inline(always)]
152 unsafe fn load_val_at(i: usize, buf: &MaybeUninit<[T; CAPACITY]>) -> T {
153 let b: &[T; CAPACITY] = &*buf.as_ptr();
154 ptr::read(b.get_unchecked(i))
155 }
156}
157
158impl<T> Iterator for Reader<'_, T> {
159 type Item = T;
160 fn next(&mut self) -> Option<Self::Item> {
161 self.shift()
162 }
163}
164
165impl<T> Writer<'_, T> {
166 pub fn unshift(&mut self, v: T) -> Result<(), Error> {
171 let h = self.rb.head.load(Ordering::Relaxed);
172 let t = self.rb.tail.load(Ordering::Relaxed);
173
174 let nt = (t + 1) % CAPACITY;
175 if nt == h {
181 Err(Error::BufferFull)
185 } else {
186 atomic::fence(Ordering::Acquire);
187 unsafe {
188 let buf = &mut *self.rb.buf.get();
189 Self::store_val_at(t, buf, v);
190 }
191 atomic::fence(Ordering::Release);
192 self.rb.tail.store(nt, Ordering::Relaxed);
193 Ok(())
194 }
195 }
196
197 #[inline(always)]
198 unsafe fn store_val_at(i: usize, buf: &mut MaybeUninit<[T; CAPACITY]>, val: T) {
199 let b: &mut [T; CAPACITY] = &mut *buf.as_mut_ptr();
200 ptr::write(b.get_unchecked_mut(i), val);
201 }
202}
203
204impl<T> Writer<'_, T>
208where
209 T: Copy,
210{
211 pub fn unshift_from(&mut self, buf: &[T]) -> usize {
215 let h = self.rb.head.load(Ordering::Relaxed);
216 let mut t = self.rb.tail.load(Ordering::Relaxed);
217 atomic::fence(Ordering::Acquire);
218
219 let mylen = (t + CAPACITY - h) % CAPACITY;
220 let buflen = buf.len();
221 let len = cmp::min(CAPACITY - mylen - 1, buflen);
222
223 unsafe {
224 let rbuf = &mut *self.rb.buf.get();
225 for i in 0..len {
226 Self::store_val_at(t, rbuf, *buf.get_unchecked(i));
227 t = (t + 1) % CAPACITY;
228 }
229 }
230
231 atomic::fence(Ordering::Release);
232 self.rb.tail.store(t, Ordering::Relaxed);
233 len
234 }
235}
236
237#[cfg(test)]
238mod test {
239 use super::*;
240
241 #[test]
242 fn detects_empty() {
243 let rb = RingBuffer::<bool>::new();
244 let (mut rbr, mut rbw) = rb.split();
245 assert!(rbr.is_empty());
246 rbw.unshift(true).ok();
247 assert!(!rbr.is_empty());
248 rbr.shift();
249 assert!(rbr.is_empty());
250 }
251
252 #[test]
253 fn len_matches() {
254 let rb = RingBuffer::<bool>::new();
255 let (mut rbr, mut rbw) = rb.split();
256
257 for i in 0..CAPACITY - 1 {
259 assert_eq!(rbr.len(), i);
260 assert_eq!(rbw.unshift(true), Ok(()));
261 }
262
263 for i in 0..CAPACITY - 1 {
265 assert_eq!(rbr.len(), CAPACITY - 1 - i);
266 rbr.shift();
267 }
268
269 assert_eq!(rbr.len(), 0);
271 }
272
273 #[test]
274 fn can_wrap() {
275 let rb = RingBuffer::<usize>::new();
276 let (mut rbr, mut rbw) = rb.split();
277
278 for i in 0..CAPACITY - 1 {
280 assert_eq!(rbw.unshift(i), Ok(()))
281 }
282
283 for i in 0..CAPACITY - 1 {
285 assert_eq!(rbr.shift(), Some(i))
286 }
287 }
288
289 #[test]
290 fn cannot_overwrite() {
291 let rb = RingBuffer::<usize>::new();
292 let (mut rbr, mut rbw) = rb.split();
293
294 for i in 0..CAPACITY - 1 {
295 assert_eq!(rbw.unshift(i), Ok(()));
296 }
297 assert_eq!(rbw.unshift(0xffff), Err(Error::BufferFull));
298
299 rbr.shift();
301 assert_eq!(rbw.unshift(0xffff), Ok(()));
302 }
303
304 #[test]
305 fn can_iter() {
306 let rb = RingBuffer::<usize>::new();
307 let (rbr, mut rbw) = rb.split();
308
309 for i in 0..CAPACITY - 1 {
310 assert_eq!(rbw.unshift(i), Ok(()));
311 }
312
313 let mut i = 0;
314 for e in rbr {
315 assert_eq!(e, i);
316 i += 1;
317 }
318 }
319
320 #[test]
321 fn shift_into_smaller() {
322 let rb = RingBuffer::<usize>::new();
323 let (mut rbr, mut rbw) = rb.split();
324 for i in 0..CAPACITY - 1 {
325 assert_eq!(rbw.unshift(i), Ok(()));
326 }
327
328 let mut buf: [usize; CAPACITY / 2] = [0; CAPACITY / 2];
329 assert_eq!(rbr.shift_into(&mut buf), CAPACITY / 2, "return len wrong");
330 for i in 0..CAPACITY / 2 {
331 assert_eq!(buf[i], i, "slot {} wrong", i)
332 }
333
334 assert!(!rbr.shift().is_none());
335 }
336
337 #[test]
338 fn shift_into_bigger() {
339 let rb = RingBuffer::<usize>::new();
340 let (mut rbr, mut rbw) = rb.split();
341 for i in 0..CAPACITY - 1 {
342 assert_eq!(rbw.unshift(i), Ok(()));
343 }
344
345 let mut buf: [usize; CAPACITY * 2] = [0; CAPACITY * 2];
346 assert_eq!(rbr.shift_into(&mut buf), CAPACITY - 1, "return len wrong");
347 for i in 0..CAPACITY - 1 {
348 assert_eq!(buf[i], i, "first half")
349 }
350 for i in CAPACITY - 1..CAPACITY * 2 {
351 assert_eq!(buf[i], 0, "second half")
352 }
353
354 assert!(rbr.shift().is_none());
355 }
356
357 #[test]
358 fn unshift_from_smaller() {
359 let rb = RingBuffer::<usize>::new();
360 let (mut rbr, mut rbw) = rb.split();
361
362 let buf: [usize; CAPACITY / 2] = [0xdead; CAPACITY / 2];
363 assert_eq!(rbw.unshift_from(&buf), CAPACITY / 2);
364 for i in 0..CAPACITY / 2 {
365 assert_eq!(rbr.shift(), Some(0xdead), "wrong value at index {}", i);
366 }
367 assert!(rbr.shift().is_none());
368 }
369
370 #[test]
371 fn unshift_from_bigger() {
372 let rb = RingBuffer::<usize>::new();
373 let (mut rbr, mut rbw) = rb.split();
374
375 let buf: [usize; CAPACITY * 2] = [0xdead; CAPACITY * 2];
376 assert_eq!(rbw.unshift_from(&buf), CAPACITY - 1);
377 assert_eq!(rbw.unshift(0xbeef), Err(Error::BufferFull));
378 for i in 0..CAPACITY - 1 {
379 assert_eq!(rbr.shift(), Some(0xdead), "wrong value at index {}", i);
380 }
381 assert!(rbr.shift().is_none());
382 }
383
384 #[test]
385 fn ownership_passes_through() {
386 static mut DROPPED: bool = false;
387 struct DropTest {}
388 impl DropTest {
389 fn i_own_it_now(self) {}
390 }
391 impl Drop for DropTest {
392 fn drop(&mut self) {
393 unsafe { DROPPED = true };
394 }
395 }
396
397 let rb = RingBuffer::<DropTest>::new();
398 let (mut rbr, mut rbw) = rb.split();
399
400 let mut cl = |dt| {
403 rbw.unshift(dt).expect("couldn't store item");
404 };
405 cl(DropTest {});
406 assert_eq!(unsafe { DROPPED }, false);
407
408 let dt = rbr.shift().expect("buffer was empty");
411 assert_eq!(unsafe { DROPPED }, false);
412
413 dt.i_own_it_now();
415 assert_eq!(unsafe { DROPPED }, true);
416 }
417}