use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use thiserror::Error;
use crate::double_mapped_buffer::{DoubleMappedBuffer, DoubleMappedBufferError};
use crate::Metadata;
#[derive(Error, Debug)]
pub enum CircularError {
#[error("Failed to allocate double mapped buffer.")]
Allocation(DoubleMappedBufferError),
#[error("Failed to add reader: maximum number of readers reached.")]
TooManyReaders,
}
pub struct Circular;
impl Circular {
pub fn with_capacity<T, M>(
min_items: usize,
max_readers: usize,
) -> Result<Writer<T, M>, CircularError>
where
M: Metadata,
{
let buffer = match DoubleMappedBuffer::new(min_items) {
Ok(buffer) => buffer,
Err(e) => return Err(CircularError::Allocation(e)),
};
let mut readers = Vec::with_capacity(max_readers);
for _ in 0..max_readers {
readers.push(ReaderSlot::new());
}
let inner = Arc::new(Inner {
buffer,
meta_epoch: AtomicUsize::new(0),
writer_pos: AtomicUsize::new(0),
writer_done: AtomicBool::new(false),
active_readers: AtomicUsize::new(0),
readers,
});
Ok(Writer {
inner,
last_space: 0,
})
}
#[allow(clippy::new_ret_no_self)]
pub fn new<T, M>(max_readers: usize) -> Result<Writer<T, M>, CircularError>
where
M: Metadata,
{
Self::with_capacity(0, max_readers)
}
}
struct Inner<T, M>
where
M: Metadata,
{
buffer: DoubleMappedBuffer<T>,
meta_epoch: AtomicUsize,
writer_pos: AtomicUsize,
writer_done: AtomicBool,
active_readers: AtomicUsize,
readers: Vec<ReaderSlot<M>>,
}
const READER_INACTIVE: usize = 0;
const READER_ACTIVE: usize = 1;
struct ReaderSlot<M>
where
M: Metadata,
{
state: AtomicUsize,
pos: AtomicUsize,
meta_dirty: AtomicBool,
meta: spin::Mutex<M>,
}
impl<M> ReaderSlot<M>
where
M: Metadata,
{
fn new() -> Self {
Self {
state: AtomicUsize::new(READER_INACTIVE),
pos: AtomicUsize::new(0),
meta_dirty: AtomicBool::new(false),
meta: spin::Mutex::new(M::new()),
}
}
}
pub struct Writer<T, M>
where
M: Metadata,
{
last_space: usize,
inner: Arc<Inner<T, M>>,
}
impl<T, M> Writer<T, M>
where
M: Metadata,
{
pub fn add_reader(&self) -> Result<Reader<T, M>, CircularError> {
let id = self
.inner
.active_readers
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |n| {
if n < self.inner.readers.len() {
Some(n + 1)
} else {
None
}
})
.map_err(|_| CircularError::TooManyReaders)?;
let slot = &self.inner.readers[id];
let w = self.inner.writer_pos.load(Ordering::Acquire);
slot.pos.store(w, Ordering::Relaxed);
{
let mut meta = slot.meta.lock();
*meta = M::new();
}
slot.meta_dirty.store(false, Ordering::Release);
slot.state.store(READER_ACTIVE, Ordering::Release);
Ok(Reader {
id,
last_space: 0,
inner: self.inner.clone(),
})
}
fn space_and_offset(&self) -> (usize, usize) {
let cap = self.inner.buffer.capacity();
let w = self.inner.writer_pos.load(Ordering::Acquire);
let mut max_dist = 0usize;
let mut any = false;
let active = self.inner.active_readers.load(Ordering::Acquire);
for slot in &self.inner.readers[..active] {
if slot.state.load(Ordering::Acquire) == READER_ACTIVE {
any = true;
let r = slot.pos.load(Ordering::Acquire);
let dist = w.wrapping_sub(r);
if dist > max_dist {
max_dist = dist;
}
}
}
let space = if !any {
cap
} else {
cap.saturating_sub(max_dist)
};
(space, w % cap)
}
pub fn slice(&mut self) -> &mut [T] {
let (space, offset) = self.space_and_offset();
self.last_space = space;
unsafe { &mut self.inner.buffer.slice_with_offset_mut(offset)[0..space] }
}
pub fn produce(&mut self, n: usize, meta: &[M::Item]) {
if n == 0 {
return;
}
assert!(n <= self.last_space, "vmcircbuffer: produced too much");
self.last_space -= n;
let w = self.inner.writer_pos.load(Ordering::Acquire);
if !meta.is_empty() {
self.inner.meta_epoch.fetch_add(1, Ordering::AcqRel);
let active = self.inner.active_readers.load(Ordering::Acquire);
for slot in &self.inner.readers[..active] {
if slot.state.load(Ordering::Acquire) == READER_ACTIVE {
let mut m = slot.meta.lock();
let r = slot.pos.load(Ordering::Acquire);
let dist = w.wrapping_sub(r);
m.add_from_slice(dist, meta);
slot.meta_dirty.store(true, Ordering::Release);
}
}
self.inner
.writer_pos
.store(w.wrapping_add(n), Ordering::Release);
self.inner.meta_epoch.fetch_add(1, Ordering::Release);
return;
}
self.inner
.writer_pos
.store(w.wrapping_add(n), Ordering::Release);
}
}
impl<T, M> Drop for Writer<T, M>
where
M: Metadata,
{
fn drop(&mut self) {
self.inner.writer_done.store(true, Ordering::Release);
}
}
pub struct Reader<T, M>
where
M: Metadata,
{
id: usize,
last_space: usize,
inner: Arc<Inner<T, M>>,
}
impl<T, M> Reader<T, M>
where
M: Metadata,
{
fn space_and_offset(&self) -> (usize, usize) {
let cap = self.inner.buffer.capacity();
let slot = &self.inner.readers[self.id];
let w = self.inner.writer_pos.load(Ordering::Acquire);
let r = slot.pos.load(Ordering::Acquire);
let avail = w.wrapping_sub(r);
let space = if avail >= cap { cap } else { avail };
(space, r % cap)
}
fn space_and_offset_and_meta_into(&self, out: &mut Vec<M::Item>) -> (usize, usize, bool) {
let cap = self.inner.buffer.capacity();
let slot = &self.inner.readers[self.id];
loop {
let e1 = self.inner.meta_epoch.load(Ordering::Acquire);
if (e1 & 1) != 0 {
std::hint::spin_loop();
continue;
}
let m = slot.meta.lock();
let w = self.inner.writer_pos.load(Ordering::Acquire);
let done = self.inner.writer_done.load(Ordering::Acquire);
let r = slot.pos.load(Ordering::Acquire);
let avail = w.wrapping_sub(r);
let space = if avail >= cap { cap } else { avail };
if slot.meta_dirty.load(Ordering::Acquire) {
m.get_into(out);
if out.is_empty() {
slot.meta_dirty.store(false, Ordering::Release);
}
} else {
out.clear();
}
let e2 = self.inner.meta_epoch.load(Ordering::Acquire);
if e1 == e2 {
return (space, r % cap, done);
}
}
}
pub fn slice(&mut self) -> &[T] {
let (space, offset) = self.space_and_offset();
self.last_space = space;
unsafe { &self.inner.buffer.slice_with_offset(offset)[0..space] }
}
pub fn slice_with_meta_into(&mut self, out: &mut Vec<M::Item>) -> Option<&[T]> {
let (space, offset, done) = self.space_and_offset_and_meta_into(out);
self.last_space = space;
if space == 0 && done {
out.clear();
return None;
}
unsafe { Some(&self.inner.buffer.slice_with_offset(offset)[0..space]) }
}
pub fn consume(&mut self, n: usize) {
if n == 0 {
return;
}
assert!(n <= self.last_space, "vmcircbuffer: consumed too much");
self.last_space -= n;
let slot = &self.inner.readers[self.id];
{
let mut m = slot.meta.lock();
m.consume(n);
let r = slot.pos.load(Ordering::Acquire);
slot.pos.store(r.wrapping_add(n), Ordering::Release);
}
}
}
impl<T, M> Drop for Reader<T, M>
where
M: Metadata,
{
fn drop(&mut self) {
let slot = &self.inner.readers[self.id];
{
let mut m = slot.meta.lock();
*m = M::new();
}
slot.meta_dirty.store(false, Ordering::Release);
slot.state.store(READER_INACTIVE, Ordering::Release);
}
}