use core::slice;
use std::sync::mpsc::{channel, Receiver, Sender};
use crate::generic;
use crate::generic::CircularError;
use crate::generic::NoMetadata;
use crate::generic::Notifier;
struct BlockingNotifier {
chan: Sender<()>,
armed: bool,
}
impl Notifier for BlockingNotifier {
fn arm(&mut self) {
self.armed = true;
}
fn notify(&mut self) {
if self.armed {
let _ = self.chan.send(());
self.armed = false;
}
}
}
pub struct Circular;
impl Circular {
#[allow(clippy::new_ret_no_self)]
pub fn new<T>() -> Result<Writer<T>, CircularError> {
Self::with_capacity(0)
}
pub fn with_capacity<T>(min_items: usize) -> Result<Writer<T>, CircularError> {
let writer = generic::Circular::with_capacity(min_items)?;
let (tx, rx) = channel();
Ok(Writer {
writer,
writer_sender: tx,
chan: rx,
})
}
}
pub struct Writer<T> {
writer_sender: Sender<()>,
chan: Receiver<()>,
writer: generic::Writer<T, BlockingNotifier, NoMetadata>,
}
impl<T> Writer<T> {
pub fn add_reader(&self) -> Reader<T> {
let w_notifier = BlockingNotifier {
chan: self.writer_sender.clone(),
armed: false,
};
let (tx, rx) = channel();
let r_notififer = BlockingNotifier {
chan: tx,
armed: false,
};
let reader = self.writer.add_reader(r_notififer, w_notifier);
Reader { reader, chan: rx }
}
pub fn slice(&mut self) -> &mut [T] {
let (p, s) = loop {
match self.writer.slice(true) {
[] => {
let _ = self.chan.recv();
}
s => break (s.as_mut_ptr(), s.len()),
}
};
unsafe { slice::from_raw_parts_mut(p, s) }
}
#[inline]
pub fn try_slice(&mut self) -> &mut [T] {
self.writer.slice(false)
}
#[inline]
pub fn produce(&mut self, n: usize) {
self.writer.produce(n, &[]);
}
}
pub struct Reader<T> {
chan: Receiver<()>,
reader: generic::Reader<T, BlockingNotifier, NoMetadata>,
}
impl<T> Reader<T> {
pub fn slice(&mut self) -> Option<&[T]> {
let r = loop {
match self.reader.slice(true) {
Some([]) => {
let _ = self.chan.recv();
}
Some(s) => break Some((s.as_ptr(), s.len())),
None => break None,
}
};
if let Some((p, s)) = r {
unsafe { Some(slice::from_raw_parts(p, s)) }
} else {
None
}
}
#[inline]
pub fn try_slice(&mut self) -> Option<&[T]> {
self.reader.slice(false)
}
#[inline]
pub fn consume(&mut self, n: usize) {
self.reader.consume(n);
}
}