make_message_bus/
lib.rs

1//! Crate
2
3#![deny(missing_docs)]
4
5pub use message_bus_macros::make_message_bus;
6use once_cell::sync::Lazy;
7use tokio::sync::broadcast::{
8    channel,
9    error::{RecvError, TryRecvError},
10    Receiver, Sender,
11};
12
13/// Topic type used in static storage in codegen.
14pub struct Topic<T: Clone>(Lazy<Sender<T>>);
15
16impl<T> Topic<T>
17where
18    T: Clone,
19{
20    /// Create a new topic.
21    pub const fn new<const CAP: usize>() -> Self {
22        Self(Lazy::new(|| channel(CAP).0))
23    }
24
25    /// Subscribe to the topic.
26    pub fn subscribe(&self) -> Subscriber<T> {
27        Subscriber {
28            recv: self.0.subscribe(),
29            missed_messages: 0,
30        }
31    }
32
33    /// Publish to a topic.
34    pub fn publish(&self, payload: T) {
35        self.0.send(payload).ok();
36    }
37}
38
39/// A subscriber to a topic on the bus.
40pub struct Subscriber<T: Clone> {
41    recv: Receiver<T>,
42    missed_messages: u64,
43}
44
45impl<T> Subscriber<T>
46where
47    T: Clone,
48{
49    /// Tries to receive a value, will return `None` if there are none.
50    pub fn try_recv(&mut self) -> Option<T> {
51        loop {
52            match self.recv.try_recv() {
53                Ok(v) => return Some(v),
54                Err(TryRecvError::Empty) => return None,
55                Err(TryRecvError::Lagged(n)) => self.missed_messages += n,
56                Err(TryRecvError::Closed) => unreachable!(), // Impossible to drop the sender
57            }
58        }
59    }
60
61    /// Receive a value from the bus.
62    pub async fn recv(&mut self) -> T {
63        loop {
64            match self.recv.recv().await {
65                Ok(msg) => return msg,
66                Err(RecvError::Lagged(n)) => self.missed_messages += n,
67                Err(RecvError::Closed) => unreachable!(), // Impossible to drop the sender
68            }
69        }
70    }
71
72    /// Checks if there is a message on the topic.
73    pub fn is_empty(&self) -> bool {
74        self.recv.is_empty()
75    }
76
77    /// Number of messages missed since last time this was called.
78    pub fn messages_lost(&mut self) -> u64 {
79        let n = self.missed_messages;
80        self.missed_messages = 0;
81
82        n
83    }
84}