1#![deny(missing_docs)]
4
5pub use message_bus_macros::make_message_bus;
6use once_cell::sync::Lazy;
7use tokio::sync::broadcast::{
8 channel,
9 error::{RecvError, TryRecvError},
10 Receiver, Sender,
11};
12
13pub struct Topic<T: Clone>(Lazy<Sender<T>>);
15
16impl<T> Topic<T>
17where
18 T: Clone,
19{
20 pub const fn new<const CAP: usize>() -> Self {
22 Self(Lazy::new(|| channel(CAP).0))
23 }
24
25 pub fn subscribe(&self) -> Subscriber<T> {
27 Subscriber {
28 recv: self.0.subscribe(),
29 missed_messages: 0,
30 }
31 }
32
33 pub fn publish(&self, payload: T) {
35 self.0.send(payload).ok();
36 }
37}
38
39pub struct Subscriber<T: Clone> {
41 recv: Receiver<T>,
42 missed_messages: u64,
43}
44
45impl<T> Subscriber<T>
46where
47 T: Clone,
48{
49 pub fn try_recv(&mut self) -> Option<T> {
51 loop {
52 match self.recv.try_recv() {
53 Ok(v) => return Some(v),
54 Err(TryRecvError::Empty) => return None,
55 Err(TryRecvError::Lagged(n)) => self.missed_messages += n,
56 Err(TryRecvError::Closed) => unreachable!(), }
58 }
59 }
60
61 pub async fn recv(&mut self) -> T {
63 loop {
64 match self.recv.recv().await {
65 Ok(msg) => return msg,
66 Err(RecvError::Lagged(n)) => self.missed_messages += n,
67 Err(RecvError::Closed) => unreachable!(), }
69 }
70 }
71
72 pub fn is_empty(&self) -> bool {
74 self.recv.is_empty()
75 }
76
77 pub fn messages_lost(&mut self) -> u64 {
79 let n = self.missed_messages;
80 self.missed_messages = 0;
81
82 n
83 }
84}