#![cfg_attr(rustc_nightly, feature(test))]
#[cfg(test)]
#[cfg(rustc_nightly)]
extern crate test;
#[cfg(test)]
mod tests;
#[cfg(test)]
#[cfg(rustc_nightly)]
mod benchmarks;
use std::mem;
use std::cell::{UnsafeCell};
use std::sync::{Arc, atomic::{Ordering, AtomicUsize}};
use std::io::{self, Read, Write};
#[derive(Debug, PartialEq, Eq)]
pub enum PushError<T: Sized> {
Full(T),
}
#[derive(Debug, PartialEq, Eq)]
pub enum PopError {
Empty,
}
#[derive(Debug, PartialEq, Eq)]
pub enum PushSliceError {
Full,
}
#[derive(Debug, PartialEq, Eq)]
pub enum PopSliceError {
Empty,
}
#[derive(Debug, PartialEq, Eq)]
pub enum MoveSliceError {
Empty,
Full,
}
#[derive(Debug, PartialEq, Eq)]
pub enum PushAccessError {
Full,
BadLen,
}
#[derive(Debug, PartialEq, Eq)]
pub enum PopAccessError {
Empty,
BadLen,
}
#[derive(Debug)]
pub enum ReadFromError {
Read(io::Error),
RbFull,
}
#[derive(Debug)]
pub enum WriteIntoError {
Write(io::Error),
RbEmpty,
}
struct SharedVec<T: Sized> {
cell: UnsafeCell<Vec<T>>,
}
unsafe impl<T: Sized> Sync for SharedVec<T> {}
impl<T: Sized> SharedVec<T> {
fn new(data: Vec<T>) -> Self {
Self { cell: UnsafeCell::new(data) }
}
unsafe fn get_ref(&self) -> &Vec<T> {
self.cell.get().as_ref().unwrap()
}
unsafe fn get_mut(&self) -> &mut Vec<T> {
self.cell.get().as_mut().unwrap()
}
}
pub struct RingBuffer<T: Sized> {
data: SharedVec<T>,
head: AtomicUsize,
tail: AtomicUsize,
}
pub struct Producer<T> {
rb: Arc<RingBuffer<T>>,
}
pub struct Consumer<T> {
rb: Arc<RingBuffer<T>>,
}
impl<T: Sized> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let vec_cap = capacity + 1;
let mut data = Vec::with_capacity(vec_cap);
unsafe { data.set_len(vec_cap); }
Self {
data: SharedVec::new(data),
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
}
}
pub fn split(self) -> (Producer<T>, Consumer<T>) {
let arc = Arc::new(self);
(
Producer { rb: arc.clone() },
Consumer { rb: arc },
)
}
pub fn capacity(&self) -> usize {
unsafe { self.data.get_ref() }.len() - 1
}
pub fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
head == tail
}
pub fn is_full(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
(tail + 1) % (self.capacity() + 1) == head
}
pub fn len(&self) -> usize {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
(tail + self.capacity() + 1 - head) % (self.capacity() + 1)
}
pub fn remaining(&self) -> usize {
self.capacity() - self.len()
}
}
impl<T: Sized> Drop for RingBuffer<T> {
fn drop(&mut self) {
let data = unsafe { self.data.get_mut() };
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
let len = data.len();
let slices = if head <= tail {
(head..tail, 0..0)
} else {
(head..len, 0..tail)
};
let drop = |elem_ref: &mut T| {
mem::drop(mem::replace(elem_ref, unsafe { mem::uninitialized() }));
};
for elem in data[slices.0].iter_mut() {
drop(elem);
}
for elem in data[slices.1].iter_mut() {
drop(elem);
}
unsafe { data.set_len(0); }
}
}
impl<T: Sized> Producer<T> {
pub fn capacity(&self) -> usize {
self.rb.capacity()
}
pub fn is_empty(&self) -> bool {
self.rb.is_empty()
}
pub fn is_full(&self) -> bool {
self.rb.is_full()
}
pub fn len(&self) -> usize {
self.rb.len()
}
pub fn remaining(&self) -> usize {
self.rb.remaining()
}
pub fn push(&mut self, elem: T) -> Result<(), PushError<T>> {
let mut elem_opt = Some(elem);
match unsafe { self.push_access(|slice, _| {
mem::forget(mem::replace(&mut slice[0], elem_opt.take().unwrap()));
Ok((1, ()))
}) } {
Ok(res) => match res {
Ok((n, ())) => {
debug_assert_eq!(n, 1);
Ok(())
},
Err(()) => unreachable!(),
},
Err(e) => match e {
PushAccessError::Full => Err(PushError::Full(elem_opt.unwrap())),
PushAccessError::BadLen => unreachable!(),
}
}
}
}
impl<T: Sized + Copy> Producer<T> {
pub fn push_slice(&mut self, elems: &[T]) -> Result<usize, PushSliceError> {
let push_fn = |left: &mut [T], right: &mut [T]| {
Ok((if elems.len() < left.len() {
left[0..elems.len()].copy_from_slice(elems);
elems.len()
} else {
left.copy_from_slice(&elems[0..left.len()]);
if elems.len() < left.len() + right.len() {
right[0..(elems.len() - left.len())]
.copy_from_slice(&elems[left.len()..elems.len()]);
elems.len()
} else {
right.copy_from_slice(&elems[left.len()..(left.len() + right.len())]);
left.len() + right.len()
}
}, ()))
};
match unsafe { self.push_access(push_fn) } {
Ok(res) => match res {
Ok((n, ())) => {
Ok(n)
},
Err(()) => unreachable!(),
},
Err(e) => match e {
PushAccessError::Full => Err(PushSliceError::Full),
PushAccessError::BadLen => unreachable!(),
}
}
}
pub fn move_slice(&mut self, other: &mut Consumer<T>, count: Option<usize>)
-> Result<usize, MoveSliceError> {
let move_fn = |left: &mut [T], right: &mut [T]|
-> Result<(usize, ()), PopSliceError> {
let (left, right) = match count {
Some(c) => {
if c < left.len() {
(&mut left[0..c], &mut right[0..0])
} else if c < left.len() + right.len() {
let l = c - left.len();
(left, &mut right[0..l])
} else {
(left, right)
}
},
None => (left, right)
};
other.pop_slice(left).and_then(|n| {
if n == left.len() {
other.pop_slice(right).and_then(|m| {
Ok((n + m, ()))
}).or_else(|e| {
match e {
PopSliceError::Empty => Ok((n, ())),
}
})
} else {
debug_assert!(n < left.len());
Ok((n, ()))
}
})
};
match unsafe { self.push_access(move_fn) } {
Ok(res) => match res {
Ok((n, ())) => Ok(n),
Err(e) => match e {
PopSliceError::Empty => Err(MoveSliceError::Empty),
},
},
Err(e) => match e {
PushAccessError::Full => Err(MoveSliceError::Full),
PushAccessError::BadLen => unreachable!(),
}
}
}
}
impl Producer<u8> {
pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>)
-> Result<usize, ReadFromError> {
let push_fn = |left: &mut [u8], _right: &mut [u8]|
-> Result<(usize, ()), io::Error> {
let left = match count {
Some(c) => {
if c < left.len() {
&mut left[0..c]
} else {
left
}
},
None => left,
};
reader.read(left).and_then(|n| {
if n <= left.len() {
Ok((n, ()))
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Read operation returned invalid number",
))
}
})
};
match unsafe { self.push_access(push_fn) } {
Ok(res) => match res {
Ok((n, ())) => Ok(n),
Err(e) => Err(ReadFromError::Read(e)),
},
Err(e) => match e {
PushAccessError::Full => Err(ReadFromError::RbFull),
PushAccessError::BadLen => unreachable!(),
}
}
}
}
impl Write for Producer<u8> {
fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
self.push_slice(buffer).or_else(|e| match e {
PushSliceError::Full => Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Ring buffer is full",
))
})
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<T: Sized> Producer<T> {
pub unsafe fn push_access<R, E, F>(&mut self, f: F) -> Result<Result<(usize, R), E>, PushAccessError>
where R: Sized, E: Sized, F: FnOnce(&mut [T], &mut [T]) -> Result<(usize, R), E> {
let head = self.rb.head.load(Ordering::SeqCst);
let tail = self.rb.tail.load(Ordering::SeqCst);
let len = self.rb.data.get_ref().len();
let ranges = if tail >= head {
if head > 0 {
Ok((tail..len, 0..(head - 1)))
} else {
if tail < len - 1 {
Ok((tail..(len - 1), 0..0))
} else {
Err(PushAccessError::Full)
}
}
} else {
if tail < head - 1 {
Ok((tail..(head - 1), 0..0))
} else {
Err(PushAccessError::Full)
}
}?;
let slices = (
&mut self.rb.data.get_mut()[ranges.0],
&mut self.rb.data.get_mut()[ranges.1],
);
match f(slices.0, slices.1) {
Ok((n, r)) => {
if n > slices.0.len() + slices.1.len() {
Err(PushAccessError::BadLen)
} else {
let new_tail = (tail + n) % len;
self.rb.tail.store(new_tail, Ordering::SeqCst);
Ok(Ok((n, r)))
}
},
Err(e) => {
Ok(Err(e))
}
}
}
}
impl<T: Sized> Consumer<T> {
pub fn capacity(&self) -> usize {
self.rb.capacity()
}
pub fn is_empty(&self) -> bool {
self.rb.is_empty()
}
pub fn is_full(&self) -> bool {
self.rb.is_full()
}
pub fn len(&self) -> usize {
self.rb.len()
}
pub fn remaining(&self) -> usize {
self.rb.remaining()
}
pub fn pop(&mut self) -> Result<T, PopError> {
match unsafe { self.pop_access(|slice, _| {
let elem = mem::replace(&mut slice[0], mem::uninitialized());
Ok((1, elem))
}) } {
Ok(res) => match res {
Ok((n, elem)) => {
debug_assert_eq!(n, 1);
Ok(elem)
},
Err(()) => unreachable!(),
},
Err(e) => match e {
PopAccessError::Empty => Err(PopError::Empty),
PopAccessError::BadLen => unreachable!(),
}
}
}
}
impl<T: Sized + Copy> Consumer<T> {
pub fn pop_slice(&mut self, elems: &mut [T]) -> Result<usize, PopSliceError> {
let pop_fn = |left: &mut [T], right: &mut [T]| {
let elems_len = elems.len();
Ok((if elems_len < left.len() {
elems.copy_from_slice(&left[0..elems_len]);
elems_len
} else {
elems[0..left.len()].copy_from_slice(left);
if elems_len < left.len() + right.len() {
elems[left.len()..elems_len]
.copy_from_slice(&right[0..(elems_len - left.len())]);
elems_len
} else {
elems[left.len()..(left.len() + right.len())].copy_from_slice(right);
left.len() + right.len()
}
}, ()))
};
match unsafe { self.pop_access(pop_fn) } {
Ok(res) => match res {
Ok((n, ())) => {
Ok(n)
},
Err(()) => unreachable!(),
},
Err(e) => match e {
PopAccessError::Empty => Err(PopSliceError::Empty),
PopAccessError::BadLen => unreachable!(),
}
}
}
pub fn move_slice(&mut self, other: &mut Producer<T>, count: Option<usize>)
-> Result<usize, MoveSliceError> {
other.move_slice(self, count)
}
}
impl Consumer<u8> {
pub fn write_into(&mut self, writer: &mut dyn Write, count: Option<usize>)
-> Result<usize, WriteIntoError> {
let pop_fn = |left: &mut [u8], _right: &mut [u8]|
-> Result<(usize, ()), io::Error> {
let left = match count {
Some(c) => {
if c < left.len() {
&mut left[0..c]
} else {
left
}
},
None => left,
};
writer.write(left).and_then(|n| {
if n <= left.len() {
Ok((n, ()))
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Write operation returned invalid number",
))
}
})
};
match unsafe { self.pop_access(pop_fn) } {
Ok(res) => match res {
Ok((n, ())) => Ok(n),
Err(e) => Err(WriteIntoError::Write(e)),
},
Err(e) => match e {
PopAccessError::Empty => Err(WriteIntoError::RbEmpty),
PopAccessError::BadLen => unreachable!(),
}
}
}
}
impl Read for Consumer<u8> {
fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
self.pop_slice(buffer).or_else(|e| match e {
PopSliceError::Empty => Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Ring buffer is empty",
))
})
}
}
impl<T: Sized> Consumer<T> {
pub unsafe fn pop_access<R, E, F>(&mut self, f: F) -> Result<Result<(usize, R), E>, PopAccessError>
where R: Sized, E: Sized, F: FnOnce(&mut [T], &mut [T]) -> Result<(usize, R), E> {
let head = self.rb.head.load(Ordering::SeqCst);
let tail = self.rb.tail.load(Ordering::SeqCst);
let len = self.rb.data.get_ref().len();
let ranges = if head < tail {
Ok((head..tail, 0..0))
} else if head > tail {
Ok((head..len, 0..tail))
} else {
Err(PopAccessError::Empty)
}?;
let slices = (
&mut self.rb.data.get_mut()[ranges.0],
&mut self.rb.data.get_mut()[ranges.1],
);
match f(slices.0, slices.1) {
Ok((n, r)) => {
if n > slices.0.len() + slices.1.len() {
Err(PopAccessError::BadLen)
} else {
let new_head = (head + n) % len;
self.rb.head.store(new_head, Ordering::SeqCst);
Ok(Ok((n, r)))
}
},
Err(e) => {
Ok(Err(e))
}
}
}
}
#[cfg(test)]
#[test]
fn dummy_test() {
let rb = RingBuffer::<i32>::new(16);
rb.split();
}