#![no_std]
use core::{
cell::UnsafeCell,
cmp,
mem::MaybeUninit,
ptr,
sync::atomic::{self, AtomicUsize, Ordering},
};
pub const CAPACITY: usize = 1024;
#[derive(Debug, PartialEq)]
pub enum Error {
BufferFull,
}
pub struct RingBuffer<T> {
head: AtomicUsize,
tail: AtomicUsize,
buf: UnsafeCell<MaybeUninit<[T; CAPACITY]>>,
}
impl<T> RingBuffer<T> {
pub const fn new() -> Self {
Self {
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
buf: UnsafeCell::new(MaybeUninit::uninit()),
}
}
pub const fn split(&self) -> (Reader<T>, Writer<T>) {
let rbr = Reader { rb: &self };
let rbw = Writer { rb: &self };
(rbr, rbw)
}
}
unsafe impl<T> Send for RingBuffer<T> where T: Send {}
pub struct Reader<'a, T> {
rb: &'a RingBuffer<T>,
}
unsafe impl<T> Send for Reader<'_, T> where T: Send {}
pub struct Writer<'a, T> {
rb: &'a RingBuffer<T>,
}
unsafe impl<T> Send for Writer<'_, T> where T: Send {}
impl<T> Reader<'_, T> {
pub fn len(&self) -> usize {
let h = self.rb.head.load(Ordering::Relaxed);
let t = self.rb.tail.load(Ordering::Relaxed);
atomic::fence(Ordering::Acquire);
(t + CAPACITY - h) % CAPACITY
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn shift(&mut self) -> Option<T> {
let h = self.rb.head.load(Ordering::Relaxed);
let t = self.rb.tail.load(Ordering::Relaxed);
if h == t {
None
} else {
atomic::fence(Ordering::Acquire);
let nh = (h + 1) % CAPACITY;
let rc = unsafe {
let buf: &MaybeUninit<[T; CAPACITY]> = &*self.rb.buf.get();
Some(Self::load_val_at(h, buf))
};
atomic::fence(Ordering::Release);
self.rb.head.store(nh, Ordering::Relaxed);
rc
}
}
pub fn shift_into(&mut self, buf: &mut [T]) -> usize {
let mut h = self.rb.head.load(Ordering::Relaxed);
let t = self.rb.tail.load(Ordering::Relaxed);
atomic::fence(Ordering::Acquire);
let mylen = (t + CAPACITY - h) % CAPACITY;
let buflen = buf.len();
let len = cmp::min(mylen, buflen);
unsafe {
let rbuf: &MaybeUninit<[T; CAPACITY]> = &*self.rb.buf.get();
for i in 0..len {
*buf.get_unchecked_mut(i) = Self::load_val_at(h, rbuf);
h = (h + 1) % CAPACITY;
}
}
atomic::fence(Ordering::Release);
self.rb.head.store(h, Ordering::Relaxed);
len
}
#[inline(always)]
unsafe fn load_val_at(i: usize, buf: &MaybeUninit<[T; CAPACITY]>) -> T {
let b: &[T; CAPACITY] = &*buf.as_ptr();
ptr::read(b.get_unchecked(i))
}
}
impl<T> Iterator for Reader<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.shift()
}
}
impl<T> Writer<'_, T> {
pub fn unshift(&mut self, v: T) -> Result<(), Error> {
let h = self.rb.head.load(Ordering::Relaxed);
let t = self.rb.tail.load(Ordering::Relaxed);
let nt = (t + 1) % CAPACITY;
if nt == h {
Err(Error::BufferFull)
} else {
atomic::fence(Ordering::Acquire);
unsafe {
let buf = &mut *self.rb.buf.get();
Self::store_val_at(t, buf, v);
}
atomic::fence(Ordering::Release);
self.rb.tail.store(nt, Ordering::Relaxed);
Ok(())
}
}
#[inline(always)]
unsafe fn store_val_at(i: usize, buf: &mut MaybeUninit<[T; CAPACITY]>, val: T) {
let b: &mut [T; CAPACITY] = &mut *buf.as_mut_ptr();
ptr::write(b.get_unchecked_mut(i), val);
}
}
impl<T> Writer<'_, T>
where
T: Copy,
{
pub fn unshift_from(&mut self, buf: &[T]) -> usize {
let h = self.rb.head.load(Ordering::Relaxed);
let mut t = self.rb.tail.load(Ordering::Relaxed);
atomic::fence(Ordering::Acquire);
let mylen = (t + CAPACITY - h) % CAPACITY;
let buflen = buf.len();
let len = cmp::min(CAPACITY - mylen - 1, buflen);
unsafe {
let rbuf = &mut *self.rb.buf.get();
for i in 0..len {
Self::store_val_at(t, rbuf, *buf.get_unchecked(i));
t = (t + 1) % CAPACITY;
}
}
atomic::fence(Ordering::Release);
self.rb.tail.store(t, Ordering::Relaxed);
len
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn detects_empty() {
let rb = RingBuffer::<bool>::new();
let (mut rbr, mut rbw) = rb.split();
assert!(rbr.is_empty());
rbw.unshift(true).ok();
assert!(!rbr.is_empty());
rbr.shift();
assert!(rbr.is_empty());
}
#[test]
fn len_matches() {
let rb = RingBuffer::<bool>::new();
let (mut rbr, mut rbw) = rb.split();
for i in 0..CAPACITY - 1 {
assert_eq!(rbr.len(), i);
assert_eq!(rbw.unshift(true), Ok(()));
}
for i in 0..CAPACITY - 1 {
assert_eq!(rbr.len(), CAPACITY - 1 - i);
rbr.shift();
}
assert_eq!(rbr.len(), 0);
}
#[test]
fn can_wrap() {
let rb = RingBuffer::<usize>::new();
let (mut rbr, mut rbw) = rb.split();
for i in 0..CAPACITY - 1 {
assert_eq!(rbw.unshift(i), Ok(()))
}
for i in 0..CAPACITY - 1 {
assert_eq!(rbr.shift(), Some(i))
}
}
#[test]
fn cannot_overwrite() {
let rb = RingBuffer::<usize>::new();
let (mut rbr, mut rbw) = rb.split();
for i in 0..CAPACITY - 1 {
assert_eq!(rbw.unshift(i), Ok(()));
}
assert_eq!(rbw.unshift(0xffff), Err(Error::BufferFull));
rbr.shift();
assert_eq!(rbw.unshift(0xffff), Ok(()));
}
#[test]
fn can_iter() {
let rb = RingBuffer::<usize>::new();
let (rbr, mut rbw) = rb.split();
for i in 0..CAPACITY - 1 {
assert_eq!(rbw.unshift(i), Ok(()));
}
let mut i = 0;
for e in rbr {
assert_eq!(e, i);
i += 1;
}
}
#[test]
fn shift_into_smaller() {
let rb = RingBuffer::<usize>::new();
let (mut rbr, mut rbw) = rb.split();
for i in 0..CAPACITY - 1 {
assert_eq!(rbw.unshift(i), Ok(()));
}
let mut buf: [usize; CAPACITY / 2] = [0; CAPACITY / 2];
assert_eq!(rbr.shift_into(&mut buf), CAPACITY / 2, "return len wrong");
for i in 0..CAPACITY / 2 {
assert_eq!(buf[i], i, "slot {} wrong", i)
}
assert!(!rbr.shift().is_none());
}
#[test]
fn shift_into_bigger() {
let rb = RingBuffer::<usize>::new();
let (mut rbr, mut rbw) = rb.split();
for i in 0..CAPACITY - 1 {
assert_eq!(rbw.unshift(i), Ok(()));
}
let mut buf: [usize; CAPACITY * 2] = [0; CAPACITY * 2];
assert_eq!(rbr.shift_into(&mut buf), CAPACITY - 1, "return len wrong");
for i in 0..CAPACITY - 1 {
assert_eq!(buf[i], i, "first half")
}
for i in CAPACITY - 1..CAPACITY * 2 {
assert_eq!(buf[i], 0, "second half")
}
assert!(rbr.shift().is_none());
}
#[test]
fn unshift_from_smaller() {
let rb = RingBuffer::<usize>::new();
let (mut rbr, mut rbw) = rb.split();
let buf: [usize; CAPACITY / 2] = [0xdead; CAPACITY / 2];
assert_eq!(rbw.unshift_from(&buf), CAPACITY / 2);
for i in 0..CAPACITY / 2 {
assert_eq!(rbr.shift(), Some(0xdead), "wrong value at index {}", i);
}
assert!(rbr.shift().is_none());
}
#[test]
fn unshift_from_bigger() {
let rb = RingBuffer::<usize>::new();
let (mut rbr, mut rbw) = rb.split();
let buf: [usize; CAPACITY * 2] = [0xdead; CAPACITY * 2];
assert_eq!(rbw.unshift_from(&buf), CAPACITY - 1);
assert_eq!(rbw.unshift(0xbeef), Err(Error::BufferFull));
for i in 0..CAPACITY - 1 {
assert_eq!(rbr.shift(), Some(0xdead), "wrong value at index {}", i);
}
assert!(rbr.shift().is_none());
}
#[test]
fn ownership_passes_through() {
static mut DROPPED: bool = false;
struct DropTest {}
impl DropTest {
fn i_own_it_now(self) {}
}
impl Drop for DropTest {
fn drop(&mut self) {
unsafe { DROPPED = true };
}
}
let rb = RingBuffer::<DropTest>::new();
let (mut rbr, mut rbw) = rb.split();
let mut cl = |dt| {
rbw.unshift(dt).expect("couldn't store item");
};
cl(DropTest {});
assert_eq!(unsafe { DROPPED }, false);
let dt = rbr.shift().expect("buffer was empty");
assert_eq!(unsafe { DROPPED }, false);
dt.i_own_it_now();
assert_eq!(unsafe { DROPPED }, true);
}
}