Skip to main content

rumqttc/
notice.rs

1use tokio::sync::oneshot;
2
3use crate::mqttbytes::QoS;
4use crate::mqttbytes::v5::{
5    PubAck, PubAckReason, PubComp, PubCompReason, PubRec, PubRecReason, SubAck,
6    SubscribeReasonCode as V5SubscribeReasonCode, UnsubAck, UnsubAckReason as V5UnsubAckReason,
7};
8use crate::{AuthFailureReason, AuthOutcome};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11pub enum NoticeFailureReason {
12    /// Message dropped due to session reset.
13    SessionReset,
14}
15
16impl NoticeFailureReason {
17    pub(crate) const fn publish_error(self) -> PublishNoticeError {
18        match self {
19            Self::SessionReset => PublishNoticeError::SessionReset,
20        }
21    }
22
23    pub(crate) const fn subscribe_error(self) -> SubscribeNoticeError {
24        match self {
25            Self::SessionReset => SubscribeNoticeError::SessionReset,
26        }
27    }
28
29    pub(crate) const fn unsubscribe_error(self) -> UnsubscribeNoticeError {
30        match self {
31            Self::SessionReset => UnsubscribeNoticeError::SessionReset,
32        }
33    }
34
35    pub(crate) const fn auth_error(self) -> AuthNoticeError {
36        match self {
37            Self::SessionReset => AuthNoticeError::SessionReset,
38        }
39    }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub enum PublishResult {
44    Qos0Flushed,
45    Qos1(PubAck),
46    Qos2Completed(PubComp),
47    Qos2PubRecRejected(PubRec),
48}
49
50impl PublishResult {
51    #[must_use]
52    pub const fn qos(&self) -> QoS {
53        match self {
54            Self::Qos0Flushed => QoS::AtMostOnce,
55            Self::Qos1(_) => QoS::AtLeastOnce,
56            Self::Qos2Completed(_) | Self::Qos2PubRecRejected(_) => QoS::ExactlyOnce,
57        }
58    }
59}
60
61#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
62pub enum PublishNoticeError {
63    #[error("event loop dropped notice sender")]
64    Recv,
65    #[error("message dropped due to session reset")]
66    SessionReset,
67    #[error("publish with topic alias {0} cannot be replayed after reconnect")]
68    TopicAliasReplayUnavailable(u16),
69    #[error("qos0 publish was not flushed to the network")]
70    Qos0NotFlushed,
71    #[error("v5 puback returned non-success reason: {0:?}")]
72    V5PubAck(PubAckReason),
73    #[error("v5 pubrec returned non-success reason: {0:?}")]
74    V5PubRec(PubRecReason),
75    #[error("v5 pubcomp returned non-success reason: {0:?}")]
76    V5PubComp(PubCompReason),
77}
78
79impl From<oneshot::error::RecvError> for PublishNoticeError {
80    fn from(_: oneshot::error::RecvError) -> Self {
81        Self::Recv
82    }
83}
84
85type PublishNoticeResult = Result<PublishResult, PublishNoticeError>;
86type SubscribeNoticeResult = Result<SubAck, SubscribeNoticeError>;
87type UnsubscribeNoticeResult = Result<UnsubAck, UnsubscribeNoticeError>;
88type AuthNoticeResult = Result<AuthOutcome, AuthNoticeError>;
89
90#[derive(Debug)]
91struct NoticeRx<T, E>(oneshot::Receiver<Result<T, E>>);
92
93impl<T, E> NoticeRx<T, E>
94where
95    E: From<oneshot::error::RecvError>,
96{
97    fn wait_blocking(self) -> Result<T, E> {
98        self.0.blocking_recv()?
99    }
100
101    async fn wait_async(self) -> Result<T, E> {
102        self.0.await?
103    }
104}
105
106#[derive(Debug)]
107struct NoticeTx<T, E>(oneshot::Sender<Result<T, E>>);
108
109impl<T, E> NoticeTx<T, E> {
110    fn success(self, result: T) {
111        _ = self.0.send(Ok(result));
112    }
113
114    fn error(self, err: E) {
115        _ = self.0.send(Err(err));
116    }
117}
118
119fn notice_channel<T, E>() -> (NoticeTx<T, E>, NoticeRx<T, E>) {
120    let (tx, rx) = oneshot::channel();
121    (NoticeTx(tx), NoticeRx(rx))
122}
123
124/// Wait handle returned by tracked publish APIs.
125#[derive(Debug)]
126pub struct PublishNotice(NoticeRx<PublishResult, PublishNoticeError>);
127
128impl PublishNotice {
129    /// Wait for the publish protocol result by blocking the current thread.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the event loop drops the notice sender or if the
134    /// publish fails before a protocol result is available.
135    ///
136    /// # Panics
137    ///
138    /// Panics if called in an async context.
139    pub fn wait(self) -> PublishNoticeResult {
140        self.0.wait_blocking()
141    }
142
143    /// Wait for the publish protocol result asynchronously.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the event loop drops the notice sender or if the
148    /// publish fails before a protocol result is available.
149    pub async fn wait_async(self) -> PublishNoticeResult {
150        self.0.wait_async().await
151    }
152
153    /// Wait for publish completion and map broker rejection reasons to
154    /// completion errors.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the publish fails before a protocol result, or if the
159    /// terminal protocol result reports a failing reason.
160    ///
161    /// # Panics
162    ///
163    /// Panics if called in an async context.
164    pub fn wait_completion(self) -> Result<(), PublishNoticeError> {
165        validate_v5_publish_completion(&self.wait()?)
166    }
167
168    /// Wait asynchronously for publish completion and map broker rejection
169    /// reasons to completion errors.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the publish fails before a protocol result, or if the
174    /// terminal protocol result reports a failing reason.
175    pub async fn wait_completion_async(self) -> Result<(), PublishNoticeError> {
176        validate_v5_publish_completion(&self.wait_async().await?)
177    }
178}
179
180#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
181pub enum SubscribeNoticeError {
182    #[error("event loop dropped notice sender")]
183    Recv,
184    #[error("message dropped due to session reset")]
185    SessionReset,
186    #[error("v5 suback returned failing reason codes: {0:?}")]
187    V5SubAckFailure(Vec<V5SubscribeReasonCode>),
188}
189
190impl From<oneshot::error::RecvError> for SubscribeNoticeError {
191    fn from(_: oneshot::error::RecvError) -> Self {
192        Self::Recv
193    }
194}
195
196/// Wait handle returned by tracked subscribe APIs.
197#[derive(Debug)]
198pub struct SubscribeNotice(NoticeRx<SubAck, SubscribeNoticeError>);
199
200impl SubscribeNotice {
201    /// Wait for `SubAck` by blocking the current thread.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the event loop drops the notice sender or if the
206    /// subscribe fails before a `SubAck` is available.
207    ///
208    /// # Panics
209    ///
210    /// Panics if called in an async context.
211    pub fn wait(self) -> SubscribeNoticeResult {
212        self.0.wait_blocking()
213    }
214
215    /// Wait for `SubAck` asynchronously.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if the event loop drops the notice sender or if the
220    /// subscribe fails before a `SubAck` is available.
221    pub async fn wait_async(self) -> SubscribeNoticeResult {
222        self.0.wait_async().await
223    }
224
225    /// Wait for subscribe completion and treat failing `SubAck` return codes as
226    /// completion errors.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the subscribe fails before `SubAck`, or if `SubAck`
231    /// contains failing return codes.
232    ///
233    /// # Panics
234    ///
235    /// Panics if called in an async context.
236    pub fn wait_completion(self) -> Result<(), SubscribeNoticeError> {
237        validate_v5_suback_completion(&self.wait()?)
238    }
239
240    /// Wait asynchronously for subscribe completion and treat failing `SubAck`
241    /// return codes as completion errors.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the subscribe fails before `SubAck`, or if `SubAck`
246    /// contains failing return codes.
247    pub async fn wait_completion_async(self) -> Result<(), SubscribeNoticeError> {
248        validate_v5_suback_completion(&self.wait_async().await?)
249    }
250}
251
252#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
253pub enum UnsubscribeNoticeError {
254    #[error("event loop dropped notice sender")]
255    Recv,
256    #[error("message dropped due to session reset")]
257    SessionReset,
258    #[error("v5 unsuback returned failing reason codes: {0:?}")]
259    V5UnsubAckFailure(Vec<V5UnsubAckReason>),
260}
261
262impl From<oneshot::error::RecvError> for UnsubscribeNoticeError {
263    fn from(_: oneshot::error::RecvError) -> Self {
264        Self::Recv
265    }
266}
267
268/// Wait handle returned by tracked unsubscribe APIs.
269#[derive(Debug)]
270pub struct UnsubscribeNotice(NoticeRx<UnsubAck, UnsubscribeNoticeError>);
271
272impl UnsubscribeNotice {
273    /// Wait for `UnsubAck` by blocking the current thread.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the event loop drops the notice sender or if the
278    /// unsubscribe fails before an `UnsubAck` is available.
279    ///
280    /// # Panics
281    ///
282    /// Panics if called in an async context.
283    pub fn wait(self) -> UnsubscribeNoticeResult {
284        self.0.wait_blocking()
285    }
286
287    /// Wait for `UnsubAck` asynchronously.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the event loop drops the notice sender or if the
292    /// unsubscribe fails before an `UnsubAck` is available.
293    pub async fn wait_async(self) -> UnsubscribeNoticeResult {
294        self.0.wait_async().await
295    }
296
297    /// Wait for unsubscribe completion and treat failing `UnsubAck` reasons as
298    /// completion errors.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the unsubscribe fails before `UnsubAck`, or if
303    /// `UnsubAck` contains failing reasons.
304    ///
305    /// # Panics
306    ///
307    /// Panics if called in an async context.
308    pub fn wait_completion(self) -> Result<(), UnsubscribeNoticeError> {
309        validate_v5_unsuback_completion(&self.wait()?)
310    }
311
312    /// Wait asynchronously for unsubscribe completion and treat failing
313    /// `UnsubAck` reasons as completion errors.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if the unsubscribe fails before `UnsubAck`, or if
318    /// `UnsubAck` contains failing reasons.
319    pub async fn wait_completion_async(self) -> Result<(), UnsubscribeNoticeError> {
320        validate_v5_unsuback_completion(&self.wait_async().await?)
321    }
322}
323
324#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
325pub enum AuthNoticeError {
326    #[error("event loop dropped notice sender")]
327    Recv,
328    #[error("authentication exchange was dropped due to session reset")]
329    SessionReset,
330    #[error("authentication exchange failed due to a protocol error")]
331    ProtocolError,
332    #[error("authentication failed: {0}")]
333    AuthenticationFailed(String),
334    #[error("connection closed before authentication completed")]
335    ConnectionClosed,
336    #[error("re-authentication is already active")]
337    OverlappingReauth,
338    #[error("re-authentication requires a CONNECT Authentication Method")]
339    MissingAuthenticationMethod,
340}
341
342impl From<oneshot::error::RecvError> for AuthNoticeError {
343    fn from(_: oneshot::error::RecvError) -> Self {
344        Self::Recv
345    }
346}
347
348impl AuthFailureReason {
349    pub(crate) fn from_notice_error(error: AuthNoticeError) -> Self {
350        match error {
351            AuthNoticeError::Recv => Self::NoticeDropped,
352            AuthNoticeError::SessionReset => Self::SessionReset,
353            AuthNoticeError::ProtocolError => Self::ProtocolError,
354            AuthNoticeError::AuthenticationFailed(message) => Self::AuthenticationFailed(message),
355            AuthNoticeError::ConnectionClosed => Self::ConnectionClosed,
356            AuthNoticeError::OverlappingReauth => Self::OverlappingReauth,
357            AuthNoticeError::MissingAuthenticationMethod => Self::MissingAuthenticationMethod,
358        }
359    }
360}
361
362/// Wait handle returned by tracked re-authentication APIs.
363#[derive(Debug)]
364pub struct AuthNotice(NoticeRx<AuthOutcome, AuthNoticeError>);
365
366impl AuthNotice {
367    /// Wait for the authentication result by blocking the current thread.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the event loop drops the notice sender or if the
372    /// authentication exchange fails before a result is available.
373    ///
374    /// # Panics
375    ///
376    /// Panics if called in an async context.
377    pub fn wait(self) -> AuthNoticeResult {
378        self.0.wait_blocking()
379    }
380
381    /// Wait for the authentication result asynchronously.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the event loop drops the notice sender or if the
386    /// authentication exchange fails before a result is available.
387    pub async fn wait_async(self) -> AuthNoticeResult {
388        self.0.wait_async().await
389    }
390}
391
392#[derive(Debug)]
393pub struct PublishNoticeTx(NoticeTx<PublishResult, PublishNoticeError>);
394
395impl PublishNoticeTx {
396    pub(crate) fn new() -> (Self, PublishNotice) {
397        let (tx, rx) = notice_channel();
398        (Self(tx), PublishNotice(rx))
399    }
400
401    pub(crate) fn success(self, result: PublishResult) {
402        self.0.success(result);
403    }
404
405    pub(crate) fn error(self, err: PublishNoticeError) {
406        self.0.error(err);
407    }
408}
409
410#[derive(Debug)]
411pub struct SubscribeNoticeTx(NoticeTx<SubAck, SubscribeNoticeError>);
412
413impl SubscribeNoticeTx {
414    pub(crate) fn new() -> (Self, SubscribeNotice) {
415        let (tx, rx) = notice_channel();
416        (Self(tx), SubscribeNotice(rx))
417    }
418
419    pub(crate) fn success(self, suback: SubAck) {
420        self.0.success(suback);
421    }
422
423    pub(crate) fn error(self, err: SubscribeNoticeError) {
424        self.0.error(err);
425    }
426}
427
428#[derive(Debug)]
429pub struct UnsubscribeNoticeTx(NoticeTx<UnsubAck, UnsubscribeNoticeError>);
430
431impl UnsubscribeNoticeTx {
432    pub(crate) fn new() -> (Self, UnsubscribeNotice) {
433        let (tx, rx) = notice_channel();
434        (Self(tx), UnsubscribeNotice(rx))
435    }
436
437    pub(crate) fn success(self, unsuback: UnsubAck) {
438        self.0.success(unsuback);
439    }
440
441    pub(crate) fn error(self, err: UnsubscribeNoticeError) {
442        self.0.error(err);
443    }
444}
445
446#[derive(Debug)]
447pub struct AuthNoticeTx(NoticeTx<AuthOutcome, AuthNoticeError>);
448
449impl AuthNoticeTx {
450    pub(crate) fn new() -> (Self, AuthNotice) {
451        let (tx, rx) = notice_channel();
452        (Self(tx), AuthNotice(rx))
453    }
454
455    pub(crate) fn success(self, outcome: AuthOutcome) {
456        self.0.success(outcome);
457    }
458
459    pub(crate) fn error(self, err: AuthNoticeError) {
460        self.0.error(err);
461    }
462}
463
464#[derive(Debug)]
465pub enum TrackedNoticeTx {
466    Publish(PublishNoticeTx),
467    Subscribe(SubscribeNoticeTx),
468    Unsubscribe(UnsubscribeNoticeTx),
469    Auth(AuthNoticeTx),
470}
471
472fn validate_v5_publish_completion(result: &PublishResult) -> Result<(), PublishNoticeError> {
473    match result {
474        PublishResult::Qos0Flushed => Ok(()),
475        PublishResult::Qos1(puback)
476            if puback.reason == PubAckReason::Success
477                || puback.reason == PubAckReason::NoMatchingSubscribers =>
478        {
479            Ok(())
480        }
481        PublishResult::Qos1(puback) => Err(PublishNoticeError::V5PubAck(puback.reason)),
482        PublishResult::Qos2Completed(pubcomp) if pubcomp.reason == PubCompReason::Success => Ok(()),
483        PublishResult::Qos2Completed(pubcomp) => Err(PublishNoticeError::V5PubComp(pubcomp.reason)),
484        PublishResult::Qos2PubRecRejected(pubrec) => {
485            Err(PublishNoticeError::V5PubRec(pubrec.reason))
486        }
487    }
488}
489
490fn validate_v5_suback_completion(suback: &SubAck) -> Result<(), SubscribeNoticeError> {
491    let failures: Vec<_> = suback
492        .return_codes
493        .iter()
494        .filter(|reason| !matches!(reason, V5SubscribeReasonCode::Success(_)))
495        .copied()
496        .collect();
497    if failures.is_empty() {
498        Ok(())
499    } else {
500        Err(SubscribeNoticeError::V5SubAckFailure(failures))
501    }
502}
503
504fn validate_v5_unsuback_completion(unsuback: &UnsubAck) -> Result<(), UnsubscribeNoticeError> {
505    let failures: Vec<_> = unsuback
506        .reasons
507        .iter()
508        .filter(|reason| {
509            **reason != V5UnsubAckReason::Success
510                && **reason != V5UnsubAckReason::NoSubscriptionExisted
511        })
512        .copied()
513        .collect();
514    if failures.is_empty() {
515        Ok(())
516    } else {
517        Err(UnsubscribeNoticeError::V5UnsubAckFailure(failures))
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524
525    #[test]
526    fn blocking_publish_wait_returns_result() {
527        let (tx, notice) = PublishNoticeTx::new();
528        tx.success(PublishResult::Qos0Flushed);
529        assert_eq!(notice.wait(), Ok(PublishResult::Qos0Flushed));
530    }
531
532    #[tokio::test]
533    async fn async_publish_wait_returns_error() {
534        let (tx, notice) = PublishNoticeTx::new();
535        tx.error(PublishNoticeError::SessionReset);
536        let err = notice.wait_async().await.unwrap_err();
537        assert_eq!(err, PublishNoticeError::SessionReset);
538    }
539
540    #[test]
541    fn publish_completion_fails_on_rejected_puback() {
542        let (tx, notice) = PublishNoticeTx::new();
543        let mut puback = PubAck::new(1, None);
544        puback.reason = PubAckReason::ImplementationSpecificError;
545        tx.success(PublishResult::Qos1(puback));
546        assert_eq!(
547            notice.wait_completion(),
548            Err(PublishNoticeError::V5PubAck(
549                PubAckReason::ImplementationSpecificError
550            ))
551        );
552    }
553
554    #[test]
555    fn blocking_subscribe_wait_returns_suback() {
556        let (tx, notice) = SubscribeNoticeTx::new();
557        let suback = SubAck {
558            pkid: 1,
559            return_codes: vec![V5SubscribeReasonCode::Success(QoS::AtLeastOnce)],
560            properties: None,
561        };
562        tx.success(suback.clone());
563        assert_eq!(notice.wait(), Ok(suback));
564    }
565
566    #[test]
567    fn subscribe_completion_fails_on_failure_return_code() {
568        let (tx, notice) = SubscribeNoticeTx::new();
569        tx.success(SubAck {
570            pkid: 1,
571            return_codes: vec![V5SubscribeReasonCode::Unspecified],
572            properties: None,
573        });
574        assert_eq!(
575            notice.wait_completion(),
576            Err(SubscribeNoticeError::V5SubAckFailure(vec![
577                V5SubscribeReasonCode::Unspecified
578            ]))
579        );
580    }
581
582    #[tokio::test]
583    async fn async_unsubscribe_wait_returns_error() {
584        let (tx, notice) = UnsubscribeNoticeTx::new();
585        tx.error(UnsubscribeNoticeError::SessionReset);
586        let err = notice.wait_async().await.unwrap_err();
587        assert_eq!(err, UnsubscribeNoticeError::SessionReset);
588    }
589}