rumqttd/link/
meters.rs

1use crate::router::{Event, Meter};
2use crate::ConnectionId;
3use flume::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TryRecvError, TrySendError};
4
5#[derive(Debug, thiserror::Error)]
6pub enum LinkError {
7    #[error("Channel try send error")]
8    TrySend(#[from] TrySendError<(ConnectionId, Event)>),
9    #[error("Channel send error")]
10    Send(#[from] SendError<(ConnectionId, Event)>),
11    #[error("Channel recv error")]
12    Recv(#[from] RecvError),
13    #[error("Channel timeout recv error")]
14    RecvTimeout(#[from] RecvTimeoutError),
15    #[error("Timeout = {0}")]
16    Elapsed(#[from] tokio::time::error::Elapsed),
17    #[error("Channel try_recv error")]
18    TryRecv(#[from] TryRecvError),
19}
20
21pub struct MetersLink {
22    router_rx: Receiver<Vec<Meter>>,
23}
24
25impl MetersLink {
26    pub fn new(router_tx: Sender<(ConnectionId, Event)>) -> Result<MetersLink, LinkError> {
27        let (tx, rx) = flume::bounded(100);
28
29        router_tx.send((0, Event::NewMeter(tx)))?;
30        let link = MetersLink { router_rx: rx };
31        Ok(link)
32    }
33
34    pub async fn init(router_tx: Sender<(ConnectionId, Event)>) -> Result<MetersLink, LinkError> {
35        let (tx, rx) = flume::bounded(100);
36
37        router_tx.send_async((0, Event::NewMeter(tx))).await?;
38        let link = MetersLink { router_rx: rx };
39        Ok(link)
40    }
41
42    pub fn recv(&self) -> Result<Vec<Meter>, LinkError> {
43        let o = self.router_rx.try_recv()?;
44        Ok(o)
45    }
46
47    pub async fn next(&self) -> Result<Vec<Meter>, LinkError> {
48        let o = self.router_rx.recv_async().await?;
49        Ok(o)
50    }
51}