1use tokio::sync::oneshot;
2
3use crate::mqttbytes::v5::{
4 PubAckReason, PubCompReason, PubRecReason, SubscribeReasonCode as V5SubscribeReasonCode,
5 UnsubAckReason as V5UnsubAckReason,
6};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq)]
9pub enum NoticeFailureReason {
10 SessionReset,
12}
13
14impl NoticeFailureReason {
15 pub(crate) const fn publish_error(self) -> PublishNoticeError {
16 match self {
17 Self::SessionReset => PublishNoticeError::SessionReset,
18 }
19 }
20
21 pub(crate) const fn request_error(self) -> RequestNoticeError {
22 match self {
23 Self::SessionReset => RequestNoticeError::SessionReset,
24 }
25 }
26}
27
28#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
29pub enum PublishNoticeError {
30 #[error("event loop dropped notice sender")]
31 Recv,
32 #[error("message dropped due to session reset")]
33 SessionReset,
34 #[error("qos0 publish was not flushed to the network")]
35 Qos0NotFlushed,
36 #[error("v5 puback returned non-success reason: {0:?}")]
37 V5PubAck(PubAckReason),
38 #[error("v5 pubrec returned non-success reason: {0:?}")]
39 V5PubRec(PubRecReason),
40 #[error("v5 pubcomp returned non-success reason: {0:?}")]
41 V5PubComp(PubCompReason),
42}
43
44impl From<oneshot::error::RecvError> for PublishNoticeError {
45 fn from(_: oneshot::error::RecvError) -> Self {
46 Self::Recv
47 }
48}
49
50type PublishNoticeResult = Result<(), PublishNoticeError>;
51type RequestNoticeResult = Result<(), RequestNoticeError>;
52
53#[derive(Debug)]
55pub struct PublishNotice(pub(crate) oneshot::Receiver<PublishNoticeResult>);
56
57impl PublishNotice {
58 pub fn wait(self) -> PublishNoticeResult {
69 self.0.blocking_recv()?
70 }
71
72 pub async fn wait_async(self) -> PublishNoticeResult {
79 self.0.await?
80 }
81}
82
83#[derive(Clone, Debug, thiserror::Error, PartialEq, Eq)]
84pub enum RequestNoticeError {
85 #[error("event loop dropped notice sender")]
86 Recv,
87 #[error("message dropped due to session reset")]
88 SessionReset,
89 #[error("v5 suback returned failing reason codes: {0:?}")]
90 V5SubAckFailure(Vec<V5SubscribeReasonCode>),
91 #[error("v5 unsuback returned failing reason codes: {0:?}")]
92 V5UnsubAckFailure(Vec<V5UnsubAckReason>),
93}
94
95impl From<oneshot::error::RecvError> for RequestNoticeError {
96 fn from(_: oneshot::error::RecvError) -> Self {
97 Self::Recv
98 }
99}
100
101#[derive(Debug)]
103pub struct RequestNotice(pub(crate) oneshot::Receiver<RequestNoticeResult>);
104
105impl RequestNotice {
106 pub fn wait(self) -> RequestNoticeResult {
117 self.0.blocking_recv()?
118 }
119
120 pub async fn wait_async(self) -> RequestNoticeResult {
127 self.0.await?
128 }
129}
130
131#[derive(Debug)]
132pub struct PublishNoticeTx(pub(crate) oneshot::Sender<PublishNoticeResult>);
133
134impl PublishNoticeTx {
135 pub(crate) fn new() -> (Self, PublishNotice) {
136 let (tx, rx) = oneshot::channel();
137 (Self(tx), PublishNotice(rx))
138 }
139
140 pub(crate) fn success(self) {
141 _ = self.0.send(Ok(()));
142 }
143
144 pub(crate) fn error(self, err: PublishNoticeError) {
145 _ = self.0.send(Err(err));
146 }
147}
148
149#[derive(Debug)]
150pub struct RequestNoticeTx(pub(crate) oneshot::Sender<RequestNoticeResult>);
151
152impl RequestNoticeTx {
153 pub(crate) fn new() -> (Self, RequestNotice) {
154 let (tx, rx) = oneshot::channel();
155 (Self(tx), RequestNotice(rx))
156 }
157
158 pub(crate) fn success(self) {
159 _ = self.0.send(Ok(()));
160 }
161
162 pub(crate) fn error(self, err: RequestNoticeError) {
163 _ = self.0.send(Err(err));
164 }
165}
166
167#[derive(Debug)]
168pub enum TrackedNoticeTx {
169 Publish(PublishNoticeTx),
170 Request(RequestNoticeTx),
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn blocking_wait_returns_success() {
179 let (tx, notice) = PublishNoticeTx::new();
180 tx.success();
181 assert!(notice.wait().is_ok());
182 }
183
184 #[tokio::test]
185 async fn async_wait_returns_error() {
186 let (tx, notice) = PublishNoticeTx::new();
187 tx.error(PublishNoticeError::SessionReset);
188 let err = notice.wait_async().await.unwrap_err();
189 assert_eq!(err, PublishNoticeError::SessionReset);
190 }
191
192 #[test]
193 fn blocking_request_wait_returns_success() {
194 let (tx, notice) = RequestNoticeTx::new();
195 tx.success();
196 assert!(notice.wait().is_ok());
197 }
198
199 #[tokio::test]
200 async fn async_request_wait_returns_error() {
201 let (tx, notice) = RequestNoticeTx::new();
202 tx.error(RequestNoticeError::SessionReset);
203 let err = notice.wait_async().await.unwrap_err();
204 assert_eq!(err, RequestNoticeError::SessionReset);
205 }
206}