#[cfg(feature = "sync")]
pub mod sync_impl {
use std::time::Duration;
use crossbeam::channel::Receiver;
use crate::messages::Notice;
#[must_use = "NoticeStream must be polled (.next() / .iter()) to receive notices; dropping it releases the dispatcher slot"]
pub struct NoticeStream {
receiver: Receiver<Notice>,
}
impl NoticeStream {
pub(crate) fn new(receiver: Receiver<Notice>) -> Self {
Self { receiver }
}
pub fn next(&self) -> Option<Notice> {
self.receiver.recv().ok()
}
pub fn try_next(&self) -> Option<Notice> {
self.receiver.try_recv().ok()
}
pub fn next_timeout(&self, timeout: Duration) -> Option<Notice> {
self.receiver.recv_timeout(timeout).ok()
}
pub fn iter(&self) -> NoticeStreamIter<'_> {
NoticeStreamIter { stream: self }
}
}
#[must_use = "iterators are lazy and do nothing unless consumed"]
pub struct NoticeStreamIter<'a> {
stream: &'a NoticeStream,
}
impl Iterator for NoticeStreamIter<'_> {
type Item = Notice;
fn next(&mut self) -> Option<Self::Item> {
self.stream.next()
}
}
}
#[cfg(feature = "async")]
pub mod async_impl {
use futures::stream::{unfold, Stream};
use log::debug;
use tokio::sync::broadcast::{self, error::RecvError};
use crate::messages::Notice;
#[must_use = "NoticeStream must be polled (.next().await / .stream()) to receive notices; dropping it releases the dispatcher slot"]
pub struct NoticeStream {
receiver: broadcast::Receiver<Notice>,
}
impl NoticeStream {
pub(crate) fn new(receiver: broadcast::Receiver<Notice>) -> Self {
Self { receiver }
}
pub async fn next(&mut self) -> Option<Notice> {
loop {
match self.receiver.recv().await {
Ok(notice) => return Some(notice),
Err(RecvError::Closed) => return None,
Err(RecvError::Lagged(skipped)) => {
debug!("NoticeStream lagged, skipped {skipped} notices");
continue;
}
}
}
}
pub fn stream(&mut self) -> impl Stream<Item = Notice> + Unpin + '_ {
Box::pin(unfold(self, |s| async move { s.next().await.map(|n| (n, s)) }))
}
}
}
#[cfg(test)]
#[path = "notice_stream_tests.rs"]
mod tests;