Skip to main content

rumqttc/
notice.rs

1use tokio::sync::oneshot;
2
3use crate::mqttbytes::QoS;
4use crate::mqttbytes::v4::{
5    PubAck, PubComp, SubAck, SubscribeReasonCode as V4SubscribeReasonCode, UnsubAck,
6};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq)]
9pub enum NoticeFailureReason {
10    /// Message dropped due to session reset.
11    SessionReset,
12}
13
14impl NoticeFailureReason {
15    pub(crate) const fn publish_error(self) -> PublishNoticeError {
16        match self {
17            Self::SessionReset => PublishNoticeError::SessionReset,
18        }
19    }
20
21    pub(crate) const fn subscribe_error(self) -> SubscribeNoticeError {
22        match self {
23            Self::SessionReset => SubscribeNoticeError::SessionReset,
24        }
25    }
26
27    pub(crate) const fn unsubscribe_error(self) -> UnsubscribeNoticeError {
28        match self {
29            Self::SessionReset => UnsubscribeNoticeError::SessionReset,
30        }
31    }
32}
33
34#[derive(Clone, Debug, PartialEq, Eq)]
35pub enum PublishResult {
36    Qos0Flushed,
37    Qos1(PubAck),
38    Qos2Completed(PubComp),
39}
40
41impl PublishResult {
42    #[must_use]
43    pub const fn qos(&self) -> QoS {
44        match self {
45            Self::Qos0Flushed => QoS::AtMostOnce,
46            Self::Qos1(_) => QoS::AtLeastOnce,
47            Self::Qos2Completed(_) => QoS::ExactlyOnce,
48        }
49    }
50}
51
52#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
53pub enum PublishNoticeError {
54    #[error("event loop dropped notice sender")]
55    Recv,
56    #[error("message dropped due to session reset")]
57    SessionReset,
58    #[error("qos0 publish was not flushed to the network")]
59    Qos0NotFlushed,
60}
61
62impl From<oneshot::error::RecvError> for PublishNoticeError {
63    fn from(_: oneshot::error::RecvError) -> Self {
64        Self::Recv
65    }
66}
67
68type PublishNoticeResult = Result<PublishResult, PublishNoticeError>;
69type SubscribeNoticeResult = Result<SubAck, SubscribeNoticeError>;
70type UnsubscribeNoticeResult = Result<UnsubAck, UnsubscribeNoticeError>;
71
72#[derive(Debug)]
73struct NoticeRx<T, E>(oneshot::Receiver<Result<T, E>>);
74
75impl<T, E> NoticeRx<T, E>
76where
77    E: From<oneshot::error::RecvError>,
78{
79    fn wait_blocking(self) -> Result<T, E> {
80        self.0.blocking_recv()?
81    }
82
83    async fn wait_async(self) -> Result<T, E> {
84        self.0.await?
85    }
86}
87
88#[derive(Debug)]
89struct NoticeTx<T, E>(oneshot::Sender<Result<T, E>>);
90
91impl<T, E> NoticeTx<T, E> {
92    fn success(self, result: T) {
93        _ = self.0.send(Ok(result));
94    }
95
96    fn error(self, err: E) {
97        _ = self.0.send(Err(err));
98    }
99}
100
101fn notice_channel<T, E>() -> (NoticeTx<T, E>, NoticeRx<T, E>) {
102    let (tx, rx) = oneshot::channel();
103    (NoticeTx(tx), NoticeRx(rx))
104}
105
106/// Wait handle returned by tracked publish APIs.
107#[derive(Debug)]
108pub struct PublishNotice(NoticeRx<PublishResult, PublishNoticeError>);
109
110impl PublishNotice {
111    /// Wait for the publish protocol result by blocking the current thread.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the event loop drops the notice sender or if the
116    /// publish fails before a protocol result is available.
117    ///
118    /// # Panics
119    ///
120    /// Panics if called in an async context.
121    pub fn wait(self) -> PublishNoticeResult {
122        self.0.wait_blocking()
123    }
124
125    /// Wait for the publish protocol result asynchronously.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the event loop drops the notice sender or if the
130    /// publish fails before a protocol result is available.
131    pub async fn wait_async(self) -> PublishNoticeResult {
132        self.0.wait_async().await
133    }
134
135    /// Wait for publish completion while discarding the detailed protocol result.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the publish fails before completion.
140    ///
141    /// # Panics
142    ///
143    /// Panics if called in an async context.
144    pub fn wait_completion(self) -> Result<(), PublishNoticeError> {
145        self.wait().map(drop)
146    }
147
148    /// Wait asynchronously for publish completion while discarding the detailed
149    /// protocol result.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the publish fails before completion.
154    pub async fn wait_completion_async(self) -> Result<(), PublishNoticeError> {
155        self.wait_async().await.map(drop)
156    }
157}
158
159#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
160pub enum SubscribeNoticeError {
161    #[error("event loop dropped notice sender")]
162    Recv,
163    #[error("message dropped due to session reset")]
164    SessionReset,
165    #[error("v4 suback returned failing reason codes: {0:?}")]
166    V4SubAckFailure(Vec<V4SubscribeReasonCode>),
167}
168
169impl From<oneshot::error::RecvError> for SubscribeNoticeError {
170    fn from(_: oneshot::error::RecvError) -> Self {
171        Self::Recv
172    }
173}
174
175/// Wait handle returned by tracked subscribe APIs.
176#[derive(Debug)]
177pub struct SubscribeNotice(NoticeRx<SubAck, SubscribeNoticeError>);
178
179impl SubscribeNotice {
180    /// Wait for `SubAck` by blocking the current thread.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the event loop drops the notice sender or if the
185    /// subscribe fails before a `SubAck` is available.
186    ///
187    /// # Panics
188    ///
189    /// Panics if called in an async context.
190    pub fn wait(self) -> SubscribeNoticeResult {
191        self.0.wait_blocking()
192    }
193
194    /// Wait for `SubAck` asynchronously.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the event loop drops the notice sender or if the
199    /// subscribe fails before a `SubAck` is available.
200    pub async fn wait_async(self) -> SubscribeNoticeResult {
201        self.0.wait_async().await
202    }
203
204    /// Wait for subscribe completion and treat failing `SubAck` return codes as
205    /// completion errors.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if the subscribe fails before `SubAck`, or if `SubAck`
210    /// contains failing return codes.
211    ///
212    /// # Panics
213    ///
214    /// Panics if called in an async context.
215    pub fn wait_completion(self) -> Result<(), SubscribeNoticeError> {
216        validate_v4_suback_completion(&self.wait()?)
217    }
218
219    /// Wait asynchronously for subscribe completion and treat failing `SubAck`
220    /// return codes as completion errors.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the subscribe fails before `SubAck`, or if `SubAck`
225    /// contains failing return codes.
226    pub async fn wait_completion_async(self) -> Result<(), SubscribeNoticeError> {
227        validate_v4_suback_completion(&self.wait_async().await?)
228    }
229}
230
231#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
232pub enum UnsubscribeNoticeError {
233    #[error("event loop dropped notice sender")]
234    Recv,
235    #[error("message dropped due to session reset")]
236    SessionReset,
237}
238
239impl From<oneshot::error::RecvError> for UnsubscribeNoticeError {
240    fn from(_: oneshot::error::RecvError) -> Self {
241        Self::Recv
242    }
243}
244
245/// Wait handle returned by tracked unsubscribe APIs.
246#[derive(Debug)]
247pub struct UnsubscribeNotice(NoticeRx<UnsubAck, UnsubscribeNoticeError>);
248
249impl UnsubscribeNotice {
250    /// Wait for `UnsubAck` by blocking the current thread.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the event loop drops the notice sender or if the
255    /// unsubscribe fails before an `UnsubAck` is available.
256    ///
257    /// # Panics
258    ///
259    /// Panics if called in an async context.
260    pub fn wait(self) -> UnsubscribeNoticeResult {
261        self.0.wait_blocking()
262    }
263
264    /// Wait for `UnsubAck` asynchronously.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the event loop drops the notice sender or if the
269    /// unsubscribe fails before an `UnsubAck` is available.
270    pub async fn wait_async(self) -> UnsubscribeNoticeResult {
271        self.0.wait_async().await
272    }
273
274    /// Wait for unsubscribe completion while discarding the `UnsubAck`.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the unsubscribe fails before `UnsubAck`.
279    ///
280    /// # Panics
281    ///
282    /// Panics if called in an async context.
283    pub fn wait_completion(self) -> Result<(), UnsubscribeNoticeError> {
284        self.wait().map(drop)
285    }
286
287    /// Wait asynchronously for unsubscribe completion while discarding the
288    /// `UnsubAck`.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the unsubscribe fails before `UnsubAck`.
293    pub async fn wait_completion_async(self) -> Result<(), UnsubscribeNoticeError> {
294        self.wait_async().await.map(drop)
295    }
296}
297
298#[derive(Debug)]
299pub struct PublishNoticeTx(NoticeTx<PublishResult, PublishNoticeError>);
300
301impl PublishNoticeTx {
302    pub(crate) fn new() -> (Self, PublishNotice) {
303        let (tx, rx) = notice_channel();
304        (Self(tx), PublishNotice(rx))
305    }
306
307    pub(crate) fn success(self, result: PublishResult) {
308        self.0.success(result);
309    }
310
311    pub(crate) fn error(self, err: PublishNoticeError) {
312        self.0.error(err);
313    }
314}
315
316#[derive(Debug)]
317pub struct SubscribeNoticeTx(NoticeTx<SubAck, SubscribeNoticeError>);
318
319impl SubscribeNoticeTx {
320    pub(crate) fn new() -> (Self, SubscribeNotice) {
321        let (tx, rx) = notice_channel();
322        (Self(tx), SubscribeNotice(rx))
323    }
324
325    pub(crate) fn success(self, suback: SubAck) {
326        self.0.success(suback);
327    }
328
329    pub(crate) fn error(self, err: SubscribeNoticeError) {
330        self.0.error(err);
331    }
332}
333
334#[derive(Debug)]
335pub struct UnsubscribeNoticeTx(NoticeTx<UnsubAck, UnsubscribeNoticeError>);
336
337impl UnsubscribeNoticeTx {
338    pub(crate) fn new() -> (Self, UnsubscribeNotice) {
339        let (tx, rx) = notice_channel();
340        (Self(tx), UnsubscribeNotice(rx))
341    }
342
343    pub(crate) fn success(self, unsuback: UnsubAck) {
344        self.0.success(unsuback);
345    }
346
347    pub(crate) fn error(self, err: UnsubscribeNoticeError) {
348        self.0.error(err);
349    }
350}
351
352#[derive(Debug)]
353pub enum TrackedNoticeTx {
354    Publish(PublishNoticeTx),
355    Subscribe(SubscribeNoticeTx),
356    Unsubscribe(UnsubscribeNoticeTx),
357}
358
359fn validate_v4_suback_completion(suback: &SubAck) -> Result<(), SubscribeNoticeError> {
360    let failures: Vec<_> = suback
361        .return_codes
362        .iter()
363        .copied()
364        .filter(|code| matches!(code, V4SubscribeReasonCode::Failure))
365        .collect();
366    if failures.is_empty() {
367        Ok(())
368    } else {
369        Err(SubscribeNoticeError::V4SubAckFailure(failures))
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn blocking_publish_wait_returns_result() {
379        let (tx, notice) = PublishNoticeTx::new();
380        tx.success(PublishResult::Qos0Flushed);
381        assert_eq!(notice.wait(), Ok(PublishResult::Qos0Flushed));
382    }
383
384    #[tokio::test]
385    async fn async_publish_wait_returns_error() {
386        let (tx, notice) = PublishNoticeTx::new();
387        tx.error(PublishNoticeError::SessionReset);
388        let err = notice.wait_async().await.unwrap_err();
389        assert_eq!(err, PublishNoticeError::SessionReset);
390    }
391
392    #[test]
393    fn blocking_subscribe_wait_returns_suback() {
394        let (tx, notice) = SubscribeNoticeTx::new();
395        let suback = SubAck::new(1, vec![V4SubscribeReasonCode::Success(QoS::AtLeastOnce)]);
396        tx.success(suback.clone());
397        assert_eq!(notice.wait(), Ok(suback));
398    }
399
400    #[test]
401    fn subscribe_completion_fails_on_failure_return_code() {
402        let (tx, notice) = SubscribeNoticeTx::new();
403        tx.success(SubAck::new(1, vec![V4SubscribeReasonCode::Failure]));
404        assert_eq!(
405            notice.wait_completion(),
406            Err(SubscribeNoticeError::V4SubAckFailure(vec![
407                V4SubscribeReasonCode::Failure
408            ]))
409        );
410    }
411
412    #[tokio::test]
413    async fn async_unsubscribe_wait_returns_error() {
414        let (tx, notice) = UnsubscribeNoticeTx::new();
415        tx.error(UnsubscribeNoticeError::SessionReset);
416        let err = notice.wait_async().await.unwrap_err();
417        assert_eq!(err, UnsubscribeNoticeError::SessionReset);
418    }
419}