Skip to main content

rumqttc/
notice.rs

1use tokio::sync::oneshot;
2
3use crate::mqttbytes::v5::{
4    PubAckReason, PubCompReason, PubRecReason, SubscribeReasonCode as V5SubscribeReasonCode,
5    UnsubAckReason as V5UnsubAckReason,
6};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq)]
9pub enum NoticeFailureReason {
10    /// Message dropped due to session reset.
11    SessionReset,
12}
13
14impl NoticeFailureReason {
15    pub(crate) const fn publish_error(self) -> PublishNoticeError {
16        match self {
17            Self::SessionReset => PublishNoticeError::SessionReset,
18        }
19    }
20
21    pub(crate) const fn request_error(self) -> RequestNoticeError {
22        match self {
23            Self::SessionReset => RequestNoticeError::SessionReset,
24        }
25    }
26}
27
28#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
29pub enum PublishNoticeError {
30    #[error("event loop dropped notice sender")]
31    Recv,
32    #[error("message dropped due to session reset")]
33    SessionReset,
34    #[error("qos0 publish was not flushed to the network")]
35    Qos0NotFlushed,
36    #[error("v5 puback returned non-success reason: {0:?}")]
37    V5PubAck(PubAckReason),
38    #[error("v5 pubrec returned non-success reason: {0:?}")]
39    V5PubRec(PubRecReason),
40    #[error("v5 pubcomp returned non-success reason: {0:?}")]
41    V5PubComp(PubCompReason),
42}
43
44impl From<oneshot::error::RecvError> for PublishNoticeError {
45    fn from(_: oneshot::error::RecvError) -> Self {
46        Self::Recv
47    }
48}
49
50type PublishNoticeResult = Result<(), PublishNoticeError>;
51type RequestNoticeResult = Result<(), RequestNoticeError>;
52
53/// Wait handle returned by tracked publish APIs.
54#[derive(Debug)]
55pub struct PublishNotice(pub(crate) oneshot::Receiver<PublishNoticeResult>);
56
57impl PublishNotice {
58    /// Wait for publish completion by blocking the current thread.
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if the event loop drops the notice sender or if the
63    /// publish fails its tracked completion criteria.
64    ///
65    /// # Panics
66    ///
67    /// Panics if called in an async context.
68    pub fn wait(self) -> PublishNoticeResult {
69        self.0.blocking_recv()?
70    }
71
72    /// Wait for publish completion asynchronously.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the event loop drops the notice sender or if the
77    /// publish fails its tracked completion criteria.
78    pub async fn wait_async(self) -> PublishNoticeResult {
79        self.0.await?
80    }
81}
82
83#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
84pub enum RequestNoticeError {
85    #[error("event loop dropped notice sender")]
86    Recv,
87    #[error("message dropped due to session reset")]
88    SessionReset,
89    #[error("v5 suback returned failing reason codes: {0:?}")]
90    V5SubAckFailure(Vec<V5SubscribeReasonCode>),
91    #[error("v5 unsuback returned failing reason codes: {0:?}")]
92    V5UnsubAckFailure(Vec<V5UnsubAckReason>),
93}
94
95impl From<oneshot::error::RecvError> for RequestNoticeError {
96    fn from(_: oneshot::error::RecvError) -> Self {
97        Self::Recv
98    }
99}
100
101/// Wait handle returned by tracked subscribe/unsubscribe APIs.
102#[derive(Debug)]
103pub struct RequestNotice(pub(crate) oneshot::Receiver<RequestNoticeResult>);
104
105impl RequestNotice {
106    /// Wait for request completion by blocking the current thread.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the event loop drops the notice sender or if the
111    /// tracked subscribe or unsubscribe fails.
112    ///
113    /// # Panics
114    ///
115    /// Panics if called in an async context.
116    pub fn wait(self) -> RequestNoticeResult {
117        self.0.blocking_recv()?
118    }
119
120    /// Wait for request completion asynchronously.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the event loop drops the notice sender or if the
125    /// tracked subscribe or unsubscribe fails.
126    pub async fn wait_async(self) -> RequestNoticeResult {
127        self.0.await?
128    }
129}
130
131#[derive(Debug)]
132pub struct PublishNoticeTx(pub(crate) oneshot::Sender<PublishNoticeResult>);
133
134impl PublishNoticeTx {
135    pub(crate) fn new() -> (Self, PublishNotice) {
136        let (tx, rx) = oneshot::channel();
137        (Self(tx), PublishNotice(rx))
138    }
139
140    pub(crate) fn success(self) {
141        _ = self.0.send(Ok(()));
142    }
143
144    pub(crate) fn error(self, err: PublishNoticeError) {
145        _ = self.0.send(Err(err));
146    }
147}
148
149#[derive(Debug)]
150pub struct RequestNoticeTx(pub(crate) oneshot::Sender<RequestNoticeResult>);
151
152impl RequestNoticeTx {
153    pub(crate) fn new() -> (Self, RequestNotice) {
154        let (tx, rx) = oneshot::channel();
155        (Self(tx), RequestNotice(rx))
156    }
157
158    pub(crate) fn success(self) {
159        _ = self.0.send(Ok(()));
160    }
161
162    pub(crate) fn error(self, err: RequestNoticeError) {
163        _ = self.0.send(Err(err));
164    }
165}
166
167#[derive(Debug)]
168pub enum TrackedNoticeTx {
169    Publish(PublishNoticeTx),
170    Request(RequestNoticeTx),
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn blocking_wait_returns_success() {
179        let (tx, notice) = PublishNoticeTx::new();
180        tx.success();
181        assert!(notice.wait().is_ok());
182    }
183
184    #[tokio::test]
185    async fn async_wait_returns_error() {
186        let (tx, notice) = PublishNoticeTx::new();
187        tx.error(PublishNoticeError::SessionReset);
188        let err = notice.wait_async().await.unwrap_err();
189        assert_eq!(err, PublishNoticeError::SessionReset);
190    }
191
192    #[test]
193    fn blocking_request_wait_returns_success() {
194        let (tx, notice) = RequestNoticeTx::new();
195        tx.success();
196        assert!(notice.wait().is_ok());
197    }
198
199    #[tokio::test]
200    async fn async_request_wait_returns_error() {
201        let (tx, notice) = RequestNoticeTx::new();
202        tx.error(RequestNoticeError::SessionReset);
203        let err = notice.wait_async().await.unwrap_err();
204        assert_eq!(err, RequestNoticeError::SessionReset);
205    }
206}