use std::{
cell::UnsafeCell,
sync::{
atomic::{AtomicI16, AtomicUsize, Ordering},
Arc,
},
};
#[cfg(test)]
mod test;
struct Item<T> {
use_count: AtomicI16,
lap_count: UnsafeCell<u16>,
data: UnsafeCell<T>,
}
pub struct Reader<T> {
data: Arc<[Item<T>]>,
write_index: Arc<AtomicUsize>,
read_index: usize,
lap_count: u16,
}
unsafe impl<T> Send for Reader<T> where T: Send {}
pub struct Writer<T> {
data: Arc<[Item<T>]>,
write_index: Arc<AtomicUsize>,
lap_count: u16,
}
unsafe impl<T> Send for Writer<T> where T: Send {}
pub fn ring_buffer<T>(capacity: usize) -> (Reader<T>, Writer<T>)
where
T: Default,
{
assert!(capacity >= 2);
let mut data = Vec::<Item<T>>::new();
data.resize_with(capacity, || Item {
use_count: AtomicI16::new(0),
data: UnsafeCell::new(T::default()),
lap_count: UnsafeCell::new(0),
});
let data: Arc<[Item<T>]> = data.into_boxed_slice().into();
let write_index = Arc::new(AtomicUsize::new(0));
let reader = Reader {
data: Arc::clone(&data),
write_index: Arc::clone(&write_index),
read_index: 0,
lap_count: 1,
};
let writer = Writer {
data,
write_index,
lap_count: 1,
};
(reader, writer)
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum ReadResult<T> {
Ok(T),
Dropout(T),
Empty,
}
impl<T> ReadResult<T> {
pub fn is_ok(&self) -> bool {
match self {
ReadResult::Ok(_) => true,
_ => false,
}
}
pub fn is_dropout(&self) -> bool {
match self {
ReadResult::Dropout(_) => true,
_ => false,
}
}
pub fn is_empty(&self) -> bool {
match self {
ReadResult::Empty => true,
_ => false,
}
}
pub fn value(self) -> Option<T> {
match self {
ReadResult::Ok(v) => Some(v),
ReadResult::Dropout(v) => Some(v),
ReadResult::Empty => None,
}
}
}
impl<T> Reader<T>
where
T: Copy,
{
pub fn read(&mut self) -> ReadResult<T> {
let item = &self.data[self.read_index];
let mut expected_use_count = 0;
while let Err(actual_use_count) = item.use_count.compare_exchange(
expected_use_count,
expected_use_count + 1,
Ordering::SeqCst,
Ordering::SeqCst,
) {
debug_assert!(actual_use_count >= -1, "Invalid use count");
debug_assert!(actual_use_count < i16::MAX, "Reader overflow");
expected_use_count = actual_use_count.max(0);
std::hint::spin_loop();
}
let value = unsafe { *item.data.get() };
let value_lap_count = unsafe { *item.lap_count.get() };
let final_use_count = item.use_count.fetch_sub(1, Ordering::SeqCst);
debug_assert!(final_use_count >= 0);
let expected_lap_count = self.lap_count;
if value_lap_count.wrapping_add(1) == expected_lap_count {
return ReadResult::Empty;
} else if value_lap_count != expected_lap_count {
self.lap_count = value_lap_count;
}
self.read_index += 1;
if self.read_index == self.data.len() {
self.read_index = 0;
self.lap_count = self.lap_count.wrapping_add(1);
}
if value_lap_count == expected_lap_count {
ReadResult::Ok(value)
} else {
ReadResult::Dropout(value)
}
}
pub fn skip_ahead(&mut self) {
self.read_index = self.write_index.load(Ordering::SeqCst);
self.read_index = if self.read_index == 0 {
self.data.len()
} else {
self.read_index
} - 1;
self.lap_count = self.lap_count.wrapping_sub(1);
}
}
impl<T> Clone for Reader<T> {
fn clone(&self) -> Self {
Self {
data: Arc::clone(&self.data),
write_index: Arc::clone(&self.write_index),
read_index: self.read_index,
lap_count: self.lap_count,
}
}
}
impl<T> Writer<T> {
pub fn write(&mut self, value: T) {
let index = self.write_index.load(Ordering::SeqCst);
let item = &self.data[index];
while let Err(actual_use_count) =
item.use_count
.compare_exchange(0, -1, Ordering::SeqCst, Ordering::SeqCst)
{
debug_assert!(actual_use_count > 0, "Invalid use count");
std::hint::spin_loop();
}
unsafe {
*item.data.get() = value;
*item.lap_count.get() = self.lap_count;
}
let mut next_index = index + 1;
if next_index == self.data.len() {
next_index = 0;
self.lap_count = self.lap_count.wrapping_add(1);
}
self.write_index.store(next_index, Ordering::SeqCst);
item.use_count
.compare_exchange(-1, 0, Ordering::SeqCst, Ordering::SeqCst)
.unwrap();
}
}