#![deny(missing_docs)]
pub use message_bus_macros::make_message_bus;
use once_cell::sync::Lazy;
use tokio::sync::broadcast::{
channel,
error::{RecvError, TryRecvError},
Receiver, Sender,
};
pub struct Topic<T: Clone>(Lazy<Sender<T>>);
impl<T> Topic<T>
where
T: Clone,
{
pub const fn new<const CAP: usize>() -> Self {
Self(Lazy::new(|| channel(CAP).0))
}
pub fn subscribe(&self) -> Subscriber<T> {
Subscriber {
recv: self.0.subscribe(),
missed_messages: 0,
}
}
pub fn publish(&self, payload: T) {
self.0.send(payload).ok();
}
}
pub struct Subscriber<T: Clone> {
recv: Receiver<T>,
missed_messages: u64,
}
impl<T> Subscriber<T>
where
T: Clone,
{
pub fn try_recv(&mut self) -> Option<T> {
loop {
match self.recv.try_recv() {
Ok(v) => return Some(v),
Err(TryRecvError::Empty) => return None,
Err(TryRecvError::Lagged(n)) => self.missed_messages += n,
Err(TryRecvError::Closed) => unreachable!(), }
}
}
pub async fn recv(&mut self) -> T {
loop {
match self.recv.recv().await {
Ok(msg) => return msg,
Err(RecvError::Lagged(n)) => self.missed_messages += n,
Err(RecvError::Closed) => unreachable!(), }
}
}
pub fn is_empty(&self) -> bool {
self.recv.is_empty()
}
pub fn messages_lost(&mut self) -> u64 {
let n = self.missed_messages;
self.missed_messages = 0;
n
}
}