use super::queue::PicoQ;
use futures_core::Stream;
use futures_util::stream::poll_fn;
use std::{
sync::{Arc, Mutex},
task::Poll,
};
use tokio::sync::Notify;
pub struct Subscriber<T>
where
T: Sync + Send + 'static,
{
queue: Mutex<PicoQ<T>>,
not_empty: Notify,
not_full: Notify,
}
impl<T> Subscriber<T>
where
T: Sync + Send + 'static,
{
pub fn new(cap: usize) -> Self {
Self {
queue: Mutex::new(PicoQ::new(cap)),
not_empty: Notify::new(),
not_full: Notify::new(),
}
}
pub async fn recv(&self) -> Arc<T> {
loop {
{
match self.queue.lock() {
Ok(mut q) if !q.is_empty() => {
if let Some(msg) = q.pop() {
self.not_full.notify_one();
return msg;
}
}
_ => {}
}
}
self.not_empty.notified().await;
}
}
pub fn stream(self: Arc<Self>) -> impl Stream<Item = Arc<T>> {
poll_fn(move |cx| {
match self.queue.lock() {
Ok(mut q) if !q.is_empty() => {
if let Some(msg) = q.pop() {
self.not_full.notify_one();
return Poll::Ready(Some(msg));
}
}
_ => {}
}
let notified = self.not_empty.notified();
tokio::pin!(notified);
match notified.poll(cx) {
Poll::Ready(()) | Poll::Pending => Poll::Pending,
}
})
}
pub async fn push(&self, msg: Arc<T>) {
loop {
{
match self.queue.lock() {
Ok(mut q) if !q.is_full() => {
q.push(msg);
self.not_empty.notify_one();
return;
}
_ => {}
}
}
self.not_full.notified().await;
}
}
}