use slab::Slab;
use spin::Mutex;
use std::sync::Arc;
use thiserror::Error;
use crate::double_mapped_buffer::{DoubleMappedBuffer, DoubleMappedBufferError};
#[derive(Error, Debug)]
pub enum CircularError {
#[error("Failed to allocate double mapped buffer.")]
Allocation(DoubleMappedBufferError),
}
pub use crate::{Metadata, NoMetadata, Notifier};
pub struct Circular;
impl Circular {
pub fn with_capacity<T, N, M>(min_items: usize) -> Result<Writer<T, N, M>, CircularError>
where
N: Notifier,
M: Metadata,
{
let buffer = match DoubleMappedBuffer::new(min_items) {
Ok(buffer) => Arc::new(buffer),
Err(e) => return Err(CircularError::Allocation(e)),
};
let state = Arc::new(Mutex::new(State {
writer_offset: 0,
writer_ab: false,
writer_done: false,
readers: Slab::new(),
}));
let writer = Writer {
buffer,
state,
last_space: 0,
};
Ok(writer)
}
}
struct State<N, M>
where
N: Notifier,
M: Metadata,
{
writer_offset: usize,
writer_ab: bool,
writer_done: bool,
readers: Slab<ReaderState<N, M>>,
}
struct ReaderState<N, M> {
ab: bool,
offset: usize,
reader_notifier: N,
writer_notifier: N,
meta: M,
}
pub struct Writer<T, N, M>
where
N: Notifier,
M: Metadata,
{
last_space: usize,
buffer: Arc<DoubleMappedBuffer<T>>,
state: Arc<Mutex<State<N, M>>>,
}
impl<T, N, M> Writer<T, N, M>
where
N: Notifier,
M: Metadata,
{
#[inline(always)]
fn writer_space(capacity: usize, w_off: usize, w_ab: bool, r_off: usize, r_ab: bool) -> usize {
if w_off > r_off {
r_off + capacity - w_off
} else if w_off < r_off {
r_off - w_off
} else if r_ab == w_ab {
capacity
} else {
0
}
}
pub fn add_reader(&self, reader_notifier: N, writer_notifier: N) -> Reader<T, N, M> {
let mut state = self.state.lock();
let reader_state = ReaderState {
ab: state.writer_ab,
offset: state.writer_offset,
reader_notifier,
writer_notifier,
meta: M::new(),
};
let id = state.readers.insert(reader_state);
Reader {
id,
last_space: 0,
buffer: self.buffer.clone(),
state: self.state.clone(),
}
}
#[inline(always)]
fn space_and_offset_locked(
state: &mut State<N, M>,
capacity: usize,
arm: bool,
) -> (usize, usize) {
let w_off = state.writer_offset;
let w_ab = state.writer_ab;
let mut space = capacity;
for (_, reader) in state.readers.iter_mut() {
let s = Self::writer_space(capacity, w_off, w_ab, reader.offset, reader.ab);
space = std::cmp::min(space, s);
if s == 0 && arm {
reader.writer_notifier.arm();
break;
}
if s == 0 {
break;
}
}
(space, w_off)
}
pub fn slice(&mut self, arm: bool) -> &mut [T] {
let mut state = self.state.lock();
let (space, offset) =
Self::space_and_offset_locked(&mut state, self.buffer.capacity(), arm);
self.last_space = space;
unsafe { &mut self.buffer.slice_with_offset_mut(offset)[0..space] }
}
pub fn produce(&mut self, n: usize, meta: &[M::Item]) {
if n == 0 {
return;
}
assert!(n <= self.last_space, "vmcircbuffer: produced too much");
self.last_space -= n;
let mut state = self.state.lock();
let capacity = self.buffer.capacity();
debug_assert!(Self::space_and_offset_locked(&mut state, capacity, false).0 >= n);
let w_off = state.writer_offset;
let w_ab = state.writer_ab;
for (_, r) in state.readers.iter_mut() {
let space = Reader::<T, N, M>::reader_space(capacity, w_off, w_ab, r.offset, r.ab);
if !meta.is_empty() {
r.meta.add_from_slice(space, meta);
}
r.reader_notifier.notify();
}
if state.writer_offset + n >= self.buffer.capacity() {
state.writer_ab = !state.writer_ab;
}
state.writer_offset = (state.writer_offset + n) % self.buffer.capacity();
}
}
impl<T, N, M> Drop for Writer<T, N, M>
where
N: Notifier,
M: Metadata,
{
fn drop(&mut self) {
let mut state = self.state.lock();
state.writer_done = true;
for (_, r) in state.readers.iter_mut() {
r.reader_notifier.notify();
}
}
}
pub struct Reader<T, N, M>
where
N: Notifier,
M: Metadata,
{
id: usize,
last_space: usize,
buffer: Arc<DoubleMappedBuffer<T>>,
state: Arc<Mutex<State<N, M>>>,
}
impl<T, N, M> Reader<T, N, M>
where
N: Notifier,
M: Metadata,
{
#[inline(always)]
fn reader_space(capacity: usize, w_off: usize, w_ab: bool, r_off: usize, r_ab: bool) -> usize {
if r_off > w_off {
w_off + capacity - r_off
} else if r_off < w_off {
w_off - r_off
} else if r_ab == w_ab {
0
} else {
capacity
}
}
#[inline(always)]
fn space_and_offset_locked(
state: &mut State<N, M>,
id: usize,
capacity: usize,
arm: bool,
) -> (usize, usize, bool) {
let done = state.writer_done;
let w_off = state.writer_offset;
let w_ab = state.writer_ab;
let my = unsafe { state.readers.get_unchecked_mut(id) };
let space = Self::reader_space(capacity, w_off, w_ab, my.offset, my.ab);
if space == 0 && arm {
my.reader_notifier.arm();
}
(space, my.offset, done)
}
pub fn slice(&mut self, arm: bool) -> Option<&[T]> {
let mut state = self.state.lock();
let (space, offset, done) =
Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), arm);
self.last_space = space;
if space == 0 && done {
return None;
}
unsafe { Some(&self.buffer.slice_with_offset(offset)[0..space]) }
}
pub fn slice_with_metadata_into(&mut self, arm: bool, out: &mut Vec<M::Item>) -> Option<&[T]> {
let mut state = self.state.lock();
let (space, offset, done) =
Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), arm);
let my = unsafe { state.readers.get_unchecked_mut(self.id) };
my.meta.get_into(out);
self.last_space = space;
if space == 0 && done {
out.clear();
return None;
}
unsafe { Some(&self.buffer.slice_with_offset(offset)[0..space]) }
}
pub fn consume(&mut self, n: usize) {
if n == 0 {
return;
}
assert!(n <= self.last_space, "vmcircbuffer: consumed too much!");
self.last_space -= n;
let mut state = self.state.lock();
debug_assert!(
Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), false).0
>= n
);
let my = unsafe { state.readers.get_unchecked_mut(self.id) };
my.meta.consume(n);
if my.offset + n >= self.buffer.capacity() {
my.ab = !my.ab;
}
my.offset = (my.offset + n) % self.buffer.capacity();
my.writer_notifier.notify();
}
}
impl<T, N, M> Drop for Reader<T, N, M>
where
N: Notifier,
M: Metadata,
{
fn drop(&mut self) {
let mut state = self.state.lock();
let mut s = state.readers.remove(self.id);
s.writer_notifier.notify();
}
}