1use tokio::sync::oneshot;
2
3use crate::mqttbytes::QoS;
4use crate::mqttbytes::v4::{
5 PubAck, PubComp, SubAck, SubscribeReasonCode as V4SubscribeReasonCode, UnsubAck,
6};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq)]
9pub enum NoticeFailureReason {
10 SessionReset,
12}
13
14impl NoticeFailureReason {
15 pub(crate) const fn publish_error(self) -> PublishNoticeError {
16 match self {
17 Self::SessionReset => PublishNoticeError::SessionReset,
18 }
19 }
20
21 pub(crate) const fn subscribe_error(self) -> SubscribeNoticeError {
22 match self {
23 Self::SessionReset => SubscribeNoticeError::SessionReset,
24 }
25 }
26
27 pub(crate) const fn unsubscribe_error(self) -> UnsubscribeNoticeError {
28 match self {
29 Self::SessionReset => UnsubscribeNoticeError::SessionReset,
30 }
31 }
32}
33
34#[derive(Clone, Debug, PartialEq, Eq)]
35pub enum PublishResult {
36 Qos0Flushed,
37 Qos1(PubAck),
38 Qos2Completed(PubComp),
39}
40
41impl PublishResult {
42 #[must_use]
43 pub const fn qos(&self) -> QoS {
44 match self {
45 Self::Qos0Flushed => QoS::AtMostOnce,
46 Self::Qos1(_) => QoS::AtLeastOnce,
47 Self::Qos2Completed(_) => QoS::ExactlyOnce,
48 }
49 }
50}
51
52#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
53pub enum PublishNoticeError {
54 #[error("event loop dropped notice sender")]
55 Recv,
56 #[error("message dropped due to session reset")]
57 SessionReset,
58 #[error("qos0 publish was not flushed to the network")]
59 Qos0NotFlushed,
60}
61
62impl From<oneshot::error::RecvError> for PublishNoticeError {
63 fn from(_: oneshot::error::RecvError) -> Self {
64 Self::Recv
65 }
66}
67
68type PublishNoticeResult = Result<PublishResult, PublishNoticeError>;
69type SubscribeNoticeResult = Result<SubAck, SubscribeNoticeError>;
70type UnsubscribeNoticeResult = Result<UnsubAck, UnsubscribeNoticeError>;
71
72#[derive(Debug)]
73struct NoticeRx<T, E>(oneshot::Receiver<Result<T, E>>);
74
75impl<T, E> NoticeRx<T, E>
76where
77 E: From<oneshot::error::RecvError>,
78{
79 fn wait_blocking(self) -> Result<T, E> {
80 self.0.blocking_recv()?
81 }
82
83 async fn wait_async(self) -> Result<T, E> {
84 self.0.await?
85 }
86}
87
88#[derive(Debug)]
89struct NoticeTx<T, E>(oneshot::Sender<Result<T, E>>);
90
91impl<T, E> NoticeTx<T, E> {
92 fn success(self, result: T) {
93 _ = self.0.send(Ok(result));
94 }
95
96 fn error(self, err: E) {
97 _ = self.0.send(Err(err));
98 }
99}
100
101fn notice_channel<T, E>() -> (NoticeTx<T, E>, NoticeRx<T, E>) {
102 let (tx, rx) = oneshot::channel();
103 (NoticeTx(tx), NoticeRx(rx))
104}
105
106#[derive(Debug)]
108pub struct PublishNotice(NoticeRx<PublishResult, PublishNoticeError>);
109
110impl PublishNotice {
111 pub fn wait(self) -> PublishNoticeResult {
122 self.0.wait_blocking()
123 }
124
125 pub async fn wait_async(self) -> PublishNoticeResult {
132 self.0.wait_async().await
133 }
134
135 pub fn wait_completion(self) -> Result<(), PublishNoticeError> {
145 self.wait().map(drop)
146 }
147
148 pub async fn wait_completion_async(self) -> Result<(), PublishNoticeError> {
155 self.wait_async().await.map(drop)
156 }
157}
158
159#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
160pub enum SubscribeNoticeError {
161 #[error("event loop dropped notice sender")]
162 Recv,
163 #[error("message dropped due to session reset")]
164 SessionReset,
165 #[error("v4 suback returned failing reason codes: {0:?}")]
166 V4SubAckFailure(Vec<V4SubscribeReasonCode>),
167}
168
169impl From<oneshot::error::RecvError> for SubscribeNoticeError {
170 fn from(_: oneshot::error::RecvError) -> Self {
171 Self::Recv
172 }
173}
174
175#[derive(Debug)]
177pub struct SubscribeNotice(NoticeRx<SubAck, SubscribeNoticeError>);
178
179impl SubscribeNotice {
180 pub fn wait(self) -> SubscribeNoticeResult {
191 self.0.wait_blocking()
192 }
193
194 pub async fn wait_async(self) -> SubscribeNoticeResult {
201 self.0.wait_async().await
202 }
203
204 pub fn wait_completion(self) -> Result<(), SubscribeNoticeError> {
216 validate_v4_suback_completion(&self.wait()?)
217 }
218
219 pub async fn wait_completion_async(self) -> Result<(), SubscribeNoticeError> {
227 validate_v4_suback_completion(&self.wait_async().await?)
228 }
229}
230
231#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
232pub enum UnsubscribeNoticeError {
233 #[error("event loop dropped notice sender")]
234 Recv,
235 #[error("message dropped due to session reset")]
236 SessionReset,
237}
238
239impl From<oneshot::error::RecvError> for UnsubscribeNoticeError {
240 fn from(_: oneshot::error::RecvError) -> Self {
241 Self::Recv
242 }
243}
244
245#[derive(Debug)]
247pub struct UnsubscribeNotice(NoticeRx<UnsubAck, UnsubscribeNoticeError>);
248
249impl UnsubscribeNotice {
250 pub fn wait(self) -> UnsubscribeNoticeResult {
261 self.0.wait_blocking()
262 }
263
264 pub async fn wait_async(self) -> UnsubscribeNoticeResult {
271 self.0.wait_async().await
272 }
273
274 pub fn wait_completion(self) -> Result<(), UnsubscribeNoticeError> {
284 self.wait().map(drop)
285 }
286
287 pub async fn wait_completion_async(self) -> Result<(), UnsubscribeNoticeError> {
294 self.wait_async().await.map(drop)
295 }
296}
297
298#[derive(Debug)]
299pub struct PublishNoticeTx(NoticeTx<PublishResult, PublishNoticeError>);
300
301impl PublishNoticeTx {
302 pub(crate) fn new() -> (Self, PublishNotice) {
303 let (tx, rx) = notice_channel();
304 (Self(tx), PublishNotice(rx))
305 }
306
307 pub(crate) fn success(self, result: PublishResult) {
308 self.0.success(result);
309 }
310
311 pub(crate) fn error(self, err: PublishNoticeError) {
312 self.0.error(err);
313 }
314}
315
316#[derive(Debug)]
317pub struct SubscribeNoticeTx(NoticeTx<SubAck, SubscribeNoticeError>);
318
319impl SubscribeNoticeTx {
320 pub(crate) fn new() -> (Self, SubscribeNotice) {
321 let (tx, rx) = notice_channel();
322 (Self(tx), SubscribeNotice(rx))
323 }
324
325 pub(crate) fn success(self, suback: SubAck) {
326 self.0.success(suback);
327 }
328
329 pub(crate) fn error(self, err: SubscribeNoticeError) {
330 self.0.error(err);
331 }
332}
333
334#[derive(Debug)]
335pub struct UnsubscribeNoticeTx(NoticeTx<UnsubAck, UnsubscribeNoticeError>);
336
337impl UnsubscribeNoticeTx {
338 pub(crate) fn new() -> (Self, UnsubscribeNotice) {
339 let (tx, rx) = notice_channel();
340 (Self(tx), UnsubscribeNotice(rx))
341 }
342
343 pub(crate) fn success(self, unsuback: UnsubAck) {
344 self.0.success(unsuback);
345 }
346
347 pub(crate) fn error(self, err: UnsubscribeNoticeError) {
348 self.0.error(err);
349 }
350}
351
352#[derive(Debug)]
353pub enum TrackedNoticeTx {
354 Publish(PublishNoticeTx),
355 Subscribe(SubscribeNoticeTx),
356 Unsubscribe(UnsubscribeNoticeTx),
357}
358
359fn validate_v4_suback_completion(suback: &SubAck) -> Result<(), SubscribeNoticeError> {
360 let failures: Vec<_> = suback
361 .return_codes
362 .iter()
363 .copied()
364 .filter(|code| matches!(code, V4SubscribeReasonCode::Failure))
365 .collect();
366 if failures.is_empty() {
367 Ok(())
368 } else {
369 Err(SubscribeNoticeError::V4SubAckFailure(failures))
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn blocking_publish_wait_returns_result() {
379 let (tx, notice) = PublishNoticeTx::new();
380 tx.success(PublishResult::Qos0Flushed);
381 assert_eq!(notice.wait(), Ok(PublishResult::Qos0Flushed));
382 }
383
384 #[tokio::test]
385 async fn async_publish_wait_returns_error() {
386 let (tx, notice) = PublishNoticeTx::new();
387 tx.error(PublishNoticeError::SessionReset);
388 let err = notice.wait_async().await.unwrap_err();
389 assert_eq!(err, PublishNoticeError::SessionReset);
390 }
391
392 #[test]
393 fn blocking_subscribe_wait_returns_suback() {
394 let (tx, notice) = SubscribeNoticeTx::new();
395 let suback = SubAck::new(1, vec![V4SubscribeReasonCode::Success(QoS::AtLeastOnce)]);
396 tx.success(suback.clone());
397 assert_eq!(notice.wait(), Ok(suback));
398 }
399
400 #[test]
401 fn subscribe_completion_fails_on_failure_return_code() {
402 let (tx, notice) = SubscribeNoticeTx::new();
403 tx.success(SubAck::new(1, vec![V4SubscribeReasonCode::Failure]));
404 assert_eq!(
405 notice.wait_completion(),
406 Err(SubscribeNoticeError::V4SubAckFailure(vec![
407 V4SubscribeReasonCode::Failure
408 ]))
409 );
410 }
411
412 #[tokio::test]
413 async fn async_unsubscribe_wait_returns_error() {
414 let (tx, notice) = UnsubscribeNoticeTx::new();
415 tx.error(UnsubscribeNoticeError::SessionReset);
416 let err = notice.wait_async().await.unwrap_err();
417 assert_eq!(err, UnsubscribeNoticeError::SessionReset);
418 }
419}