use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::StreamExt;
use std::slice;
use crate::generic;
use crate::generic::CircularError;
use crate::generic::NoMetadata;
use crate::generic::Notifier;
struct AsyncNotifier {
chan: Sender<()>,
armed: bool,
}
impl Notifier for AsyncNotifier {
fn arm(&mut self) {
self.armed = true;
}
fn notify(&mut self) {
if self.armed {
let _ = self.chan.try_send(());
self.armed = false;
}
}
}
pub struct Circular;
impl Circular {
#[allow(clippy::new_ret_no_self)]
pub fn new<T>() -> Result<Writer<T>, CircularError> {
Self::with_capacity(0)
}
pub fn with_capacity<T>(min_items: usize) -> Result<Writer<T>, CircularError> {
let writer = generic::Circular::with_capacity(min_items)?;
let (tx, rx) = channel(1);
Ok(Writer {
writer,
writer_sender: tx,
chan: rx,
})
}
}
pub struct Writer<T> {
writer_sender: Sender<()>,
chan: Receiver<()>,
writer: generic::Writer<T, AsyncNotifier, NoMetadata>,
}
impl<T> Writer<T> {
pub fn add_reader(&self) -> Reader<T> {
let w_notifier = AsyncNotifier {
chan: self.writer_sender.clone(),
armed: false,
};
let (tx, rx) = channel(1);
let r_notififer = AsyncNotifier {
chan: tx,
armed: false,
};
let reader = self.writer.add_reader(r_notififer, w_notifier);
Reader { reader, chan: rx }
}
pub async fn slice(&mut self) -> &mut [T] {
let (p, s) = loop {
match self.writer.slice(true) {
[] => {
let _ = self.chan.next().await;
}
s => break (s.as_mut_ptr(), s.len()),
}
};
unsafe { slice::from_raw_parts_mut(p, s) }
}
pub fn try_slice(&mut self) -> &mut [T] {
self.writer.slice(false)
}
pub fn produce(&mut self, n: usize) {
self.writer.produce(n, Vec::new());
}
}
pub struct Reader<T> {
chan: Receiver<()>,
reader: generic::Reader<T, AsyncNotifier, NoMetadata>,
}
impl<T> Reader<T> {
pub async fn slice(&mut self) -> Option<&[T]> {
let r = loop {
match self.reader.slice(true) {
Some(([], _)) => {
let _ = self.chan.next().await;
}
Some((s, _)) => break Some((s.as_ptr(), s.len())),
None => break None,
}
};
if let Some((p, s)) = r {
unsafe { Some(slice::from_raw_parts(p, s)) }
} else {
None
}
}
pub fn try_slice(&mut self) -> Option<&[T]> {
self.reader.slice(false).map(|x| x.0)
}
pub fn consume(&mut self, n: usize) {
self.reader.consume(n);
}
}