#[cfg(test)]
mod tests;
use std::cmp;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
pub trait RB<T: Clone + Copy + Default> {
fn clear(&self);
fn producer(&self) -> Producer<T>;
fn consumer(&self) -> Consumer<T>;
}
pub trait RbInspector {
fn is_empty(&self) -> bool;
fn is_full(&self) -> bool;
fn capacity(&self) -> usize;
fn slots_free(&self) -> usize;
fn count(&self) -> usize;
}
pub trait RbProducer<T> {
fn write(&self, data: &[T]) -> Result<usize>;
fn write_blocking(&self, data: &[T]) -> Option<usize>;
fn write_blocking_timeout(&self, data: &[T], timeout: Duration) -> Result<Option<usize>>;
}
pub trait RbConsumer<T> {
fn skip_pending(&self) -> Result<usize>;
fn skip(&self, cnt: usize) -> Result<usize>;
fn get(&self, data: &mut [T]) -> Result<usize>;
fn read(&self, data: &mut [T]) -> Result<usize>;
fn read_blocking(&self, data: &mut [T]) -> Option<usize>;
fn read_blocking_timeout(&self, data: &mut [T], timeout: Duration) -> Result<Option<usize>>;
}
#[derive(Debug)]
pub enum RbError {
Full,
Empty,
TimedOut,
}
impl fmt::Display for RbError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
RbError::Full => write!(f, "No free slots in the buffer"),
RbError::Empty => write!(f, "Buffer is empty"),
RbError::TimedOut => write!(f, "Timed out waiting for available slots"),
}
}
}
pub type Result<T> = ::std::result::Result<T, RbError>;
struct Inspector {
read_pos: Arc<AtomicUsize>,
write_pos: Arc<AtomicUsize>,
size: usize,
}
pub struct SpscRb<T> {
buf: Arc<Mutex<Vec<T>>>,
inspector: Arc<Inspector>,
slots_free: Arc<Condvar>,
data_available: Arc<Condvar>,
}
impl<T: Clone + Copy + Default> SpscRb<T> {
pub fn new(size: usize) -> Self {
let (read_pos, write_pos) = (Arc::new(AtomicUsize::new(0)), Arc::new(AtomicUsize::new(0)));
SpscRb {
buf: Arc::new(Mutex::new(vec![T::default(); size + 1])),
slots_free: Arc::new(Condvar::new()),
data_available: Arc::new(Condvar::new()),
inspector: Arc::new(Inspector {
read_pos: read_pos.clone(),
write_pos: write_pos.clone(),
size: size + 1,
}),
}
}
}
impl<T: Clone + Copy + Default> RB<T> for SpscRb<T> {
fn clear(&self) {
let mut buf = self.buf.lock().unwrap();
buf.iter_mut().map(|_| T::default()).count();
self.inspector.read_pos.store(0, Ordering::Relaxed);
self.inspector.write_pos.store(0, Ordering::Relaxed);
}
fn producer(&self) -> Producer<T> {
Producer {
buf: self.buf.clone(),
inspector: self.inspector.clone(),
slots_free: self.slots_free.clone(),
data_available: self.data_available.clone(),
}
}
fn consumer(&self) -> Consumer<T> {
Consumer {
buf: self.buf.clone(),
inspector: self.inspector.clone(),
slots_free: self.slots_free.clone(),
data_available: self.data_available.clone(),
}
}
}
impl<T: Clone + Copy + Default> RbInspector for SpscRb<T> {
fn is_empty(&self) -> bool {
self.inspector.is_empty()
}
fn is_full(&self) -> bool {
self.inspector.is_full()
}
fn capacity(&self) -> usize {
self.inspector.capacity()
}
fn slots_free(&self) -> usize {
self.inspector.slots_free()
}
fn count(&self) -> usize {
self.inspector.count()
}
}
impl RbInspector for Inspector {
#[inline(always)]
fn is_empty(&self) -> bool {
self.slots_free() == self.capacity()
}
#[inline(always)]
fn is_full(&self) -> bool {
self.slots_free() == 0
}
#[inline(always)]
fn capacity(&self) -> usize {
self.size - 1
}
#[inline(always)]
fn slots_free(&self) -> usize {
let wr_pos = self.write_pos.load(Ordering::Relaxed);
let re_pos = self.read_pos.load(Ordering::Relaxed);
if wr_pos < re_pos {
re_pos - wr_pos - 1
} else {
self.capacity() - wr_pos + re_pos
}
}
#[inline(always)]
fn count(&self) -> usize {
self.capacity() - self.slots_free()
}
}
pub struct Producer<T> {
buf: Arc<Mutex<Vec<T>>>,
inspector: Arc<Inspector>,
slots_free: Arc<Condvar>,
data_available: Arc<Condvar>,
}
pub struct Consumer<T> {
buf: Arc<Mutex<Vec<T>>>,
inspector: Arc<Inspector>,
slots_free: Arc<Condvar>,
data_available: Arc<Condvar>,
}
impl<T: Clone + Copy> RbProducer<T> for Producer<T> {
fn write(&self, data: &[T]) -> Result<usize> {
if data.is_empty() {
return Ok(0);
}
if self.inspector.is_full() {
return Err(RbError::Full);
}
let cnt = cmp::min(data.len(), self.inspector.slots_free());
let mut buf = self.buf.lock().unwrap();
let buf_len = buf.len();
let wr_pos = self.inspector.write_pos.load(Ordering::Relaxed);
if (wr_pos + cnt) < buf_len {
buf[wr_pos..wr_pos + cnt].copy_from_slice(&data[..cnt]);
} else {
let d = buf_len - wr_pos;
buf[wr_pos..].copy_from_slice(&data[..d]);
buf[..(cnt - d)].copy_from_slice(&data[d..cnt]);
}
self.inspector
.write_pos
.store((wr_pos + cnt) % buf_len, Ordering::Relaxed);
self.data_available.notify_one();
Ok(cnt)
}
fn write_blocking(&self, data: &[T]) -> Option<usize> {
self.write_blocking_timeout(data, Duration::MAX)
.expect("Max duration should not time out")
}
fn write_blocking_timeout(&self, data: &[T], timeout: Duration) -> Result<Option<usize>> {
if data.is_empty() {
return Ok(None);
}
let guard = self.buf.lock().unwrap();
let mut buf = if self.inspector.is_full() {
if timeout == Duration::MAX {
self.slots_free.wait(guard).unwrap()
} else {
let (guard, result) = self.slots_free.wait_timeout(guard, timeout).unwrap();
if result.timed_out() {
return Err(RbError::TimedOut);
}
guard
}
} else {
guard
};
let buf_len = buf.len();
let data_len = data.len();
let wr_pos = self.inspector.write_pos.load(Ordering::Relaxed);
let cnt = cmp::min(data_len, self.inspector.slots_free());
if (wr_pos + cnt) < buf_len {
buf[wr_pos..wr_pos + cnt].copy_from_slice(&data[..cnt]);
} else {
let d = buf_len - wr_pos;
buf[wr_pos..].copy_from_slice(&data[..d]);
buf[..(cnt - d)].copy_from_slice(&data[d..cnt]);
}
self.inspector
.write_pos
.store((wr_pos + cnt) % buf_len, Ordering::Relaxed);
self.data_available.notify_one();
Ok(Some(cnt))
}
}
impl<T: Clone + Copy> RbConsumer<T> for Consumer<T> {
fn skip_pending(&self) -> Result<usize> {
if self.inspector.is_empty() {
Err(RbError::Empty)
} else {
let write_pos = self.inspector.write_pos.load(Ordering::Relaxed);
let count = self.inspector.count();
self.inspector.read_pos.store(write_pos, Ordering::Relaxed);
Ok(count)
}
}
fn skip(&self, cnt: usize) -> Result<usize> {
if self.inspector.is_empty() {
Err(RbError::Empty)
} else {
let count = cmp::min(cnt, self.inspector.count());
let prev_read_pos = self.inspector.read_pos.load(Ordering::Relaxed);
self.inspector.read_pos.store(
(prev_read_pos + count) % self.inspector.size,
Ordering::Relaxed,
);
Ok(count)
}
}
fn get(&self, data: &mut [T]) -> Result<usize> {
if data.is_empty() {
return Ok(0);
}
if self.inspector.is_empty() {
return Err(RbError::Empty);
}
let cnt = cmp::min(data.len(), self.inspector.count());
let buf = self.buf.lock().unwrap();
let buf_len = buf.len();
let re_pos = self.inspector.read_pos.load(Ordering::Relaxed);
if (re_pos + cnt) < buf_len {
data[..cnt].copy_from_slice(&buf[re_pos..re_pos + cnt]);
} else {
let d = buf_len - re_pos;
data[..d].copy_from_slice(&buf[re_pos..]);
data[d..cnt].copy_from_slice(&buf[..(cnt - d)]);
}
Ok(cnt)
}
fn read(&self, data: &mut [T]) -> Result<usize> {
if data.is_empty() {
return Ok(0);
}
if self.inspector.is_empty() {
return Err(RbError::Empty);
}
let cnt = cmp::min(data.len(), self.inspector.count());
let buf = self.buf.lock().unwrap();
let buf_len = buf.len();
let re_pos = self.inspector.read_pos.load(Ordering::Relaxed);
if (re_pos + cnt) < buf_len {
data[..cnt].copy_from_slice(&buf[re_pos..re_pos + cnt]);
} else {
let d = buf_len - re_pos;
data[..d].copy_from_slice(&buf[re_pos..]);
data[d..cnt].copy_from_slice(&buf[..(cnt - d)]);
}
self.inspector
.read_pos
.store((re_pos + cnt) % buf_len, Ordering::Relaxed);
self.slots_free.notify_one();
Ok(cnt)
}
fn read_blocking(&self, data: &mut [T]) -> Option<usize> {
self.read_blocking_timeout(data, Duration::MAX)
.expect("Max duration shouldn't time out")
}
fn read_blocking_timeout(&self, data: &mut [T], timeout: Duration) -> Result<Option<usize>> {
if data.is_empty() {
return Ok(None);
}
let guard = self.buf.lock().unwrap();
let buf = if self.inspector.is_empty() {
if timeout == Duration::MAX {
self.data_available.wait(guard).unwrap()
} else {
let (guard, result) = self.data_available.wait_timeout(guard, timeout).unwrap();
if result.timed_out() {
return Err(RbError::TimedOut);
}
guard
}
} else {
guard
};
let buf_len = buf.len();
let cnt = cmp::min(data.len(), self.inspector.count());
let re_pos = self.inspector.read_pos.load(Ordering::Relaxed);
if (re_pos + cnt) < buf_len {
data[..cnt].copy_from_slice(&buf[re_pos..re_pos + cnt]);
} else {
let d = buf_len - re_pos;
data[..d].copy_from_slice(&buf[re_pos..]);
data[d..cnt].copy_from_slice(&buf[..(cnt - d)]);
}
self.inspector
.read_pos
.store((re_pos + cnt) % buf_len, Ordering::Relaxed);
self.slots_free.notify_one();
Ok(Some(cnt))
}
}