use slab::Slab;
use std::sync::{Arc, Mutex};
use thiserror::Error;
use crate::double_mapped_buffer::DoubleMappedBuffer;
#[derive(Error, Debug)]
pub enum CircularError {
#[error("Failed to allocate double mapped buffer.")]
Allocation,
}
pub trait Notifier {
fn arm(&mut self);
fn notify(&mut self);
}
pub trait Metadata {
type Item: Clone;
fn new() -> Self;
fn add(&mut self, offset: usize, tags: Vec<Self::Item>);
fn get(&self) -> Vec<Self::Item>;
fn consume(&mut self, items: usize);
}
pub struct NoMetadata;
impl Metadata for NoMetadata {
type Item = ();
fn new() -> Self {
Self
}
fn add(&mut self, _offset: usize, _tags: Vec<Self::Item>) {}
fn get(&self) -> Vec<Self::Item> {
Vec::new()
}
fn consume(&mut self, _items: usize) {}
}
pub struct Circular;
impl Circular {
pub fn with_capacity<T, N, M>(min_items: usize) -> Result<Writer<T, N, M>, CircularError>
where
N: Notifier,
M: Metadata,
{
let buffer = match DoubleMappedBuffer::new(min_items) {
Ok(buffer) => Arc::new(buffer),
Err(_) => return Err(CircularError::Allocation),
};
let state = Arc::new(Mutex::new(State {
writer_offset: 0,
writer_ab: false,
writer_done: false,
readers: Slab::new(),
}));
let writer = Writer {
buffer,
state,
last_space: 0,
};
Ok(writer)
}
}
struct State<N, M>
where
N: Notifier,
M: Metadata,
{
writer_offset: usize,
writer_ab: bool,
writer_done: bool,
readers: Slab<ReaderState<N, M>>,
}
struct ReaderState<N, M> {
ab: bool,
offset: usize,
reader_notifier: N,
writer_notifier: N,
meta: M,
}
pub struct Writer<T, N, M>
where
N: Notifier,
M: Metadata,
{
last_space: usize,
buffer: Arc<DoubleMappedBuffer<T>>,
state: Arc<Mutex<State<N, M>>>,
}
impl<T, N, M> Writer<T, N, M>
where
N: Notifier,
M: Metadata,
{
pub fn add_reader(&self, reader_notifier: N, writer_notifier: N) -> Reader<T, N, M> {
let mut state = self.state.lock().unwrap();
let reader_state = ReaderState {
ab: state.writer_ab,
offset: state.writer_offset,
reader_notifier,
writer_notifier,
meta: M::new(),
};
let id = state.readers.insert(reader_state);
Reader {
id,
last_space: 0,
buffer: self.buffer.clone(),
state: self.state.clone(),
}
}
fn space_and_offset(&self, arm: bool) -> (usize, usize) {
let mut state = self.state.lock().unwrap();
let capacity = self.buffer.capacity();
let w_off = state.writer_offset;
let w_ab = state.writer_ab;
let mut space = capacity;
for (_, reader) in state.readers.iter_mut() {
let r_off = reader.offset;
let r_ab = reader.ab;
let s = if w_off > r_off {
r_off + capacity - w_off
} else if w_off < r_off {
r_off - w_off
} else if r_ab == w_ab {
capacity
} else {
0
};
space = std::cmp::min(space, s);
if s == 0 && arm {
reader.writer_notifier.arm();
break;
}
if s == 0 {
break;
}
}
(space, w_off)
}
pub fn slice(&mut self, arm: bool) -> &mut [T] {
let (space, offset) = self.space_and_offset(arm);
self.last_space = space;
unsafe { &mut self.buffer.slice_with_offset_mut(offset)[0..space] }
}
pub fn produce(&mut self, n: usize, meta: Vec<M::Item>) {
if n == 0 {
return;
}
debug_assert!(self.space_and_offset(false).0 >= n);
assert!(n <= self.last_space, "vmcircbuffer: produced too much");
self.last_space -= n;
let mut state = self.state.lock().unwrap();
let w_off = state.writer_offset;
let w_ab = state.writer_ab;
let capacity = self.buffer.capacity();
for (_, r) in state.readers.iter_mut() {
let r_off = r.offset;
let r_ab = r.ab;
let space = if r_off > w_off {
w_off + capacity - r_off
} else if r_off < w_off {
w_off - r_off
} else if r_ab == w_ab {
0
} else {
capacity
};
r.meta.add(space, meta.clone());
r.reader_notifier.notify();
}
if state.writer_offset + n >= self.buffer.capacity() {
state.writer_ab = !state.writer_ab;
}
state.writer_offset = (state.writer_offset + n) % self.buffer.capacity();
}
}
impl<T, N, M> Drop for Writer<T, N, M>
where
N: Notifier,
M: Metadata,
{
fn drop(&mut self) {
let mut state = self.state.lock().unwrap();
state.writer_done = true;
for (_, r) in state.readers.iter_mut() {
r.reader_notifier.notify();
}
}
}
pub struct Reader<T, N, M>
where
N: Notifier,
M: Metadata,
{
id: usize,
last_space: usize,
buffer: Arc<DoubleMappedBuffer<T>>,
state: Arc<Mutex<State<N, M>>>,
}
impl<T, N, M> Reader<T, N, M>
where
N: Notifier,
M: Metadata,
{
fn space_and_offset_and_meta(&self, arm: bool) -> (usize, usize, bool, Vec<M::Item>) {
let mut state = self.state.lock().unwrap();
let capacity = self.buffer.capacity();
let done = state.writer_done;
let w_off = state.writer_offset;
let w_ab = state.writer_ab;
let my = unsafe { state.readers.get_unchecked_mut(self.id) };
let r_off = my.offset;
let r_ab = my.ab;
let space = if r_off > w_off {
w_off + capacity - r_off
} else if r_off < w_off {
w_off - r_off
} else if r_ab == w_ab {
0
} else {
capacity
};
if space == 0 && arm {
my.reader_notifier.arm();
}
(space, r_off, done, my.meta.get())
}
pub fn slice(&mut self, arm: bool) -> Option<(&[T], Vec<M::Item>)> {
let (space, offset, done, tags) = self.space_and_offset_and_meta(arm);
self.last_space = space;
if space == 0 && done {
None
} else {
unsafe { Some((&self.buffer.slice_with_offset(offset)[0..space], tags)) }
}
}
pub fn consume(&mut self, n: usize) {
if n == 0 {
return;
}
debug_assert!(self.space_and_offset_and_meta(false).0 >= n);
assert!(n <= self.last_space, "vmcircbuffer: consumed too much!");
self.last_space -= n;
let mut state = self.state.lock().unwrap();
let my = unsafe { state.readers.get_unchecked_mut(self.id) };
my.meta.consume(n);
if my.offset + n >= self.buffer.capacity() {
my.ab = !my.ab;
}
my.offset = (my.offset + n) % self.buffer.capacity();
my.writer_notifier.notify();
}
}
impl<T, N, M> Drop for Reader<T, N, M>
where
N: Notifier,
M: Metadata,
{
fn drop(&mut self) {
let mut state = self.state.lock().unwrap();
let mut s = state.readers.remove(self.id);
s.writer_notifier.notify();
}
}