1use tokio::sync::oneshot;
2
3use crate::mqttbytes::QoS;
4use crate::mqttbytes::v5::{
5 PubAck, PubAckReason, PubComp, PubCompReason, PubRec, PubRecReason, SubAck,
6 SubscribeReasonCode as V5SubscribeReasonCode, UnsubAck, UnsubAckReason as V5UnsubAckReason,
7};
8use crate::{AuthFailureReason, AuthOutcome};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11pub enum NoticeFailureReason {
12 SessionReset,
14}
15
16impl NoticeFailureReason {
17 pub(crate) const fn publish_error(self) -> PublishNoticeError {
18 match self {
19 Self::SessionReset => PublishNoticeError::SessionReset,
20 }
21 }
22
23 pub(crate) const fn subscribe_error(self) -> SubscribeNoticeError {
24 match self {
25 Self::SessionReset => SubscribeNoticeError::SessionReset,
26 }
27 }
28
29 pub(crate) const fn unsubscribe_error(self) -> UnsubscribeNoticeError {
30 match self {
31 Self::SessionReset => UnsubscribeNoticeError::SessionReset,
32 }
33 }
34
35 pub(crate) const fn auth_error(self) -> AuthNoticeError {
36 match self {
37 Self::SessionReset => AuthNoticeError::SessionReset,
38 }
39 }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
43pub enum PublishResult {
44 Qos0Flushed,
45 Qos1(PubAck),
46 Qos2Completed(PubComp),
47 Qos2PubRecRejected(PubRec),
48}
49
50impl PublishResult {
51 #[must_use]
52 pub const fn qos(&self) -> QoS {
53 match self {
54 Self::Qos0Flushed => QoS::AtMostOnce,
55 Self::Qos1(_) => QoS::AtLeastOnce,
56 Self::Qos2Completed(_) | Self::Qos2PubRecRejected(_) => QoS::ExactlyOnce,
57 }
58 }
59}
60
61#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
62pub enum PublishNoticeError {
63 #[error("event loop dropped notice sender")]
64 Recv,
65 #[error("message dropped due to session reset")]
66 SessionReset,
67 #[error("publish with topic alias {0} cannot be replayed after reconnect")]
68 TopicAliasReplayUnavailable(u16),
69 #[error("qos0 publish was not flushed to the network")]
70 Qos0NotFlushed,
71 #[error("v5 puback returned non-success reason: {0:?}")]
72 V5PubAck(PubAckReason),
73 #[error("v5 pubrec returned non-success reason: {0:?}")]
74 V5PubRec(PubRecReason),
75 #[error("v5 pubcomp returned non-success reason: {0:?}")]
76 V5PubComp(PubCompReason),
77}
78
79impl From<oneshot::error::RecvError> for PublishNoticeError {
80 fn from(_: oneshot::error::RecvError) -> Self {
81 Self::Recv
82 }
83}
84
85type PublishNoticeResult = Result<PublishResult, PublishNoticeError>;
86type SubscribeNoticeResult = Result<SubAck, SubscribeNoticeError>;
87type UnsubscribeNoticeResult = Result<UnsubAck, UnsubscribeNoticeError>;
88type AuthNoticeResult = Result<AuthOutcome, AuthNoticeError>;
89
90#[derive(Debug)]
91struct NoticeRx<T, E>(oneshot::Receiver<Result<T, E>>);
92
93impl<T, E> NoticeRx<T, E>
94where
95 E: From<oneshot::error::RecvError>,
96{
97 fn wait_blocking(self) -> Result<T, E> {
98 self.0.blocking_recv()?
99 }
100
101 async fn wait_async(self) -> Result<T, E> {
102 self.0.await?
103 }
104}
105
106#[derive(Debug)]
107struct NoticeTx<T, E>(oneshot::Sender<Result<T, E>>);
108
109impl<T, E> NoticeTx<T, E> {
110 fn success(self, result: T) {
111 _ = self.0.send(Ok(result));
112 }
113
114 fn error(self, err: E) {
115 _ = self.0.send(Err(err));
116 }
117}
118
119fn notice_channel<T, E>() -> (NoticeTx<T, E>, NoticeRx<T, E>) {
120 let (tx, rx) = oneshot::channel();
121 (NoticeTx(tx), NoticeRx(rx))
122}
123
124#[derive(Debug)]
126pub struct PublishNotice(NoticeRx<PublishResult, PublishNoticeError>);
127
128impl PublishNotice {
129 pub fn wait(self) -> PublishNoticeResult {
140 self.0.wait_blocking()
141 }
142
143 pub async fn wait_async(self) -> PublishNoticeResult {
150 self.0.wait_async().await
151 }
152
153 pub fn wait_completion(self) -> Result<(), PublishNoticeError> {
165 validate_v5_publish_completion(&self.wait()?)
166 }
167
168 pub async fn wait_completion_async(self) -> Result<(), PublishNoticeError> {
176 validate_v5_publish_completion(&self.wait_async().await?)
177 }
178}
179
180#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
181pub enum SubscribeNoticeError {
182 #[error("event loop dropped notice sender")]
183 Recv,
184 #[error("message dropped due to session reset")]
185 SessionReset,
186 #[error("v5 suback returned failing reason codes: {0:?}")]
187 V5SubAckFailure(Vec<V5SubscribeReasonCode>),
188}
189
190impl From<oneshot::error::RecvError> for SubscribeNoticeError {
191 fn from(_: oneshot::error::RecvError) -> Self {
192 Self::Recv
193 }
194}
195
196#[derive(Debug)]
198pub struct SubscribeNotice(NoticeRx<SubAck, SubscribeNoticeError>);
199
200impl SubscribeNotice {
201 pub fn wait(self) -> SubscribeNoticeResult {
212 self.0.wait_blocking()
213 }
214
215 pub async fn wait_async(self) -> SubscribeNoticeResult {
222 self.0.wait_async().await
223 }
224
225 pub fn wait_completion(self) -> Result<(), SubscribeNoticeError> {
237 validate_v5_suback_completion(&self.wait()?)
238 }
239
240 pub async fn wait_completion_async(self) -> Result<(), SubscribeNoticeError> {
248 validate_v5_suback_completion(&self.wait_async().await?)
249 }
250}
251
252#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
253pub enum UnsubscribeNoticeError {
254 #[error("event loop dropped notice sender")]
255 Recv,
256 #[error("message dropped due to session reset")]
257 SessionReset,
258 #[error("v5 unsuback returned failing reason codes: {0:?}")]
259 V5UnsubAckFailure(Vec<V5UnsubAckReason>),
260}
261
262impl From<oneshot::error::RecvError> for UnsubscribeNoticeError {
263 fn from(_: oneshot::error::RecvError) -> Self {
264 Self::Recv
265 }
266}
267
268#[derive(Debug)]
270pub struct UnsubscribeNotice(NoticeRx<UnsubAck, UnsubscribeNoticeError>);
271
272impl UnsubscribeNotice {
273 pub fn wait(self) -> UnsubscribeNoticeResult {
284 self.0.wait_blocking()
285 }
286
287 pub async fn wait_async(self) -> UnsubscribeNoticeResult {
294 self.0.wait_async().await
295 }
296
297 pub fn wait_completion(self) -> Result<(), UnsubscribeNoticeError> {
309 validate_v5_unsuback_completion(&self.wait()?)
310 }
311
312 pub async fn wait_completion_async(self) -> Result<(), UnsubscribeNoticeError> {
320 validate_v5_unsuback_completion(&self.wait_async().await?)
321 }
322}
323
324#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
325pub enum AuthNoticeError {
326 #[error("event loop dropped notice sender")]
327 Recv,
328 #[error("authentication exchange was dropped due to session reset")]
329 SessionReset,
330 #[error("authentication exchange failed due to a protocol error")]
331 ProtocolError,
332 #[error("authentication failed: {0}")]
333 AuthenticationFailed(String),
334 #[error("connection closed before authentication completed")]
335 ConnectionClosed,
336 #[error("re-authentication is already active")]
337 OverlappingReauth,
338 #[error("re-authentication requires a CONNECT Authentication Method")]
339 MissingAuthenticationMethod,
340}
341
342impl From<oneshot::error::RecvError> for AuthNoticeError {
343 fn from(_: oneshot::error::RecvError) -> Self {
344 Self::Recv
345 }
346}
347
348impl AuthFailureReason {
349 pub(crate) fn from_notice_error(error: AuthNoticeError) -> Self {
350 match error {
351 AuthNoticeError::Recv => Self::NoticeDropped,
352 AuthNoticeError::SessionReset => Self::SessionReset,
353 AuthNoticeError::ProtocolError => Self::ProtocolError,
354 AuthNoticeError::AuthenticationFailed(message) => Self::AuthenticationFailed(message),
355 AuthNoticeError::ConnectionClosed => Self::ConnectionClosed,
356 AuthNoticeError::OverlappingReauth => Self::OverlappingReauth,
357 AuthNoticeError::MissingAuthenticationMethod => Self::MissingAuthenticationMethod,
358 }
359 }
360}
361
362#[derive(Debug)]
364pub struct AuthNotice(NoticeRx<AuthOutcome, AuthNoticeError>);
365
366impl AuthNotice {
367 pub fn wait(self) -> AuthNoticeResult {
378 self.0.wait_blocking()
379 }
380
381 pub async fn wait_async(self) -> AuthNoticeResult {
388 self.0.wait_async().await
389 }
390}
391
392#[derive(Debug)]
393pub struct PublishNoticeTx(NoticeTx<PublishResult, PublishNoticeError>);
394
395impl PublishNoticeTx {
396 pub(crate) fn new() -> (Self, PublishNotice) {
397 let (tx, rx) = notice_channel();
398 (Self(tx), PublishNotice(rx))
399 }
400
401 pub(crate) fn success(self, result: PublishResult) {
402 self.0.success(result);
403 }
404
405 pub(crate) fn error(self, err: PublishNoticeError) {
406 self.0.error(err);
407 }
408}
409
410#[derive(Debug)]
411pub struct SubscribeNoticeTx(NoticeTx<SubAck, SubscribeNoticeError>);
412
413impl SubscribeNoticeTx {
414 pub(crate) fn new() -> (Self, SubscribeNotice) {
415 let (tx, rx) = notice_channel();
416 (Self(tx), SubscribeNotice(rx))
417 }
418
419 pub(crate) fn success(self, suback: SubAck) {
420 self.0.success(suback);
421 }
422
423 pub(crate) fn error(self, err: SubscribeNoticeError) {
424 self.0.error(err);
425 }
426}
427
428#[derive(Debug)]
429pub struct UnsubscribeNoticeTx(NoticeTx<UnsubAck, UnsubscribeNoticeError>);
430
431impl UnsubscribeNoticeTx {
432 pub(crate) fn new() -> (Self, UnsubscribeNotice) {
433 let (tx, rx) = notice_channel();
434 (Self(tx), UnsubscribeNotice(rx))
435 }
436
437 pub(crate) fn success(self, unsuback: UnsubAck) {
438 self.0.success(unsuback);
439 }
440
441 pub(crate) fn error(self, err: UnsubscribeNoticeError) {
442 self.0.error(err);
443 }
444}
445
446#[derive(Debug)]
447pub struct AuthNoticeTx(NoticeTx<AuthOutcome, AuthNoticeError>);
448
449impl AuthNoticeTx {
450 pub(crate) fn new() -> (Self, AuthNotice) {
451 let (tx, rx) = notice_channel();
452 (Self(tx), AuthNotice(rx))
453 }
454
455 pub(crate) fn success(self, outcome: AuthOutcome) {
456 self.0.success(outcome);
457 }
458
459 pub(crate) fn error(self, err: AuthNoticeError) {
460 self.0.error(err);
461 }
462}
463
464#[derive(Debug)]
465pub enum TrackedNoticeTx {
466 Publish(PublishNoticeTx),
467 Subscribe(SubscribeNoticeTx),
468 Unsubscribe(UnsubscribeNoticeTx),
469 Auth(AuthNoticeTx),
470}
471
472fn validate_v5_publish_completion(result: &PublishResult) -> Result<(), PublishNoticeError> {
473 match result {
474 PublishResult::Qos0Flushed => Ok(()),
475 PublishResult::Qos1(puback)
476 if puback.reason == PubAckReason::Success
477 || puback.reason == PubAckReason::NoMatchingSubscribers =>
478 {
479 Ok(())
480 }
481 PublishResult::Qos1(puback) => Err(PublishNoticeError::V5PubAck(puback.reason)),
482 PublishResult::Qos2Completed(pubcomp) if pubcomp.reason == PubCompReason::Success => Ok(()),
483 PublishResult::Qos2Completed(pubcomp) => Err(PublishNoticeError::V5PubComp(pubcomp.reason)),
484 PublishResult::Qos2PubRecRejected(pubrec) => {
485 Err(PublishNoticeError::V5PubRec(pubrec.reason))
486 }
487 }
488}
489
490fn validate_v5_suback_completion(suback: &SubAck) -> Result<(), SubscribeNoticeError> {
491 let failures: Vec<_> = suback
492 .return_codes
493 .iter()
494 .filter(|reason| !matches!(reason, V5SubscribeReasonCode::Success(_)))
495 .copied()
496 .collect();
497 if failures.is_empty() {
498 Ok(())
499 } else {
500 Err(SubscribeNoticeError::V5SubAckFailure(failures))
501 }
502}
503
504fn validate_v5_unsuback_completion(unsuback: &UnsubAck) -> Result<(), UnsubscribeNoticeError> {
505 let failures: Vec<_> = unsuback
506 .reasons
507 .iter()
508 .filter(|reason| {
509 **reason != V5UnsubAckReason::Success
510 && **reason != V5UnsubAckReason::NoSubscriptionExisted
511 })
512 .copied()
513 .collect();
514 if failures.is_empty() {
515 Ok(())
516 } else {
517 Err(UnsubscribeNoticeError::V5UnsubAckFailure(failures))
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524
525 #[test]
526 fn blocking_publish_wait_returns_result() {
527 let (tx, notice) = PublishNoticeTx::new();
528 tx.success(PublishResult::Qos0Flushed);
529 assert_eq!(notice.wait(), Ok(PublishResult::Qos0Flushed));
530 }
531
532 #[tokio::test]
533 async fn async_publish_wait_returns_error() {
534 let (tx, notice) = PublishNoticeTx::new();
535 tx.error(PublishNoticeError::SessionReset);
536 let err = notice.wait_async().await.unwrap_err();
537 assert_eq!(err, PublishNoticeError::SessionReset);
538 }
539
540 #[test]
541 fn publish_completion_fails_on_rejected_puback() {
542 let (tx, notice) = PublishNoticeTx::new();
543 let mut puback = PubAck::new(1, None);
544 puback.reason = PubAckReason::ImplementationSpecificError;
545 tx.success(PublishResult::Qos1(puback));
546 assert_eq!(
547 notice.wait_completion(),
548 Err(PublishNoticeError::V5PubAck(
549 PubAckReason::ImplementationSpecificError
550 ))
551 );
552 }
553
554 #[test]
555 fn blocking_subscribe_wait_returns_suback() {
556 let (tx, notice) = SubscribeNoticeTx::new();
557 let suback = SubAck {
558 pkid: 1,
559 return_codes: vec![V5SubscribeReasonCode::Success(QoS::AtLeastOnce)],
560 properties: None,
561 };
562 tx.success(suback.clone());
563 assert_eq!(notice.wait(), Ok(suback));
564 }
565
566 #[test]
567 fn subscribe_completion_fails_on_failure_return_code() {
568 let (tx, notice) = SubscribeNoticeTx::new();
569 tx.success(SubAck {
570 pkid: 1,
571 return_codes: vec![V5SubscribeReasonCode::Unspecified],
572 properties: None,
573 });
574 assert_eq!(
575 notice.wait_completion(),
576 Err(SubscribeNoticeError::V5SubAckFailure(vec![
577 V5SubscribeReasonCode::Unspecified
578 ]))
579 );
580 }
581
582 #[tokio::test]
583 async fn async_unsubscribe_wait_returns_error() {
584 let (tx, notice) = UnsubscribeNoticeTx::new();
585 tx.error(UnsubscribeNoticeError::SessionReset);
586 let err = notice.wait_async().await.unwrap_err();
587 assert_eq!(err, UnsubscribeNoticeError::SessionReset);
588 }
589}