1use crate::{
2 codec::{
3 AckRx, AuthReason, AuthRx, ConnackRx, ConnectReason, DisconnectReason, DisconnectRx,
4 PubackReason, PubcompReason, PubrecReason,
5 },
6 core::{collections::UserProperties, error::CodecError},
7};
8use futures::channel::{mpsc::TrySendError, oneshot::Canceled};
9use std::{
10 error::Error,
11 fmt::{self, Display},
12 io, str,
13 time::{Duration, SystemTimeError},
14};
15
16#[derive(Debug, Clone)]
19pub struct SocketClosed;
20
21impl fmt::Display for SocketClosed {
22 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
23 write!(f, "socket closed")
24 }
25}
26
27impl Error for SocketClosed {}
28
29impl From<io::Error> for SocketClosed {
30 fn from(_: io::Error) -> Self {
31 Self
32 }
33}
34
35#[derive(Debug, Clone)]
39pub struct HandleClosed;
40
41impl fmt::Display for HandleClosed {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 write!(f, "context handle closed")
44 }
45}
46
47impl Error for HandleClosed {}
48
49#[derive(Debug, Clone)]
53pub struct ContextExited;
54
55impl fmt::Display for ContextExited {
56 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57 write!(f, "context exited")
58 }
59}
60
61impl Error for ContextExited {}
62
63impl<T> From<TrySendError<T>> for ContextExited {
64 fn from(_: TrySendError<T>) -> Self {
65 Self
66 }
67}
68
69impl From<Canceled> for ContextExited {
70 fn from(_: Canceled) -> Self {
71 Self
72 }
73}
74
75#[derive(Clone)]
79pub struct Disconnected {
80 packet: DisconnectRx,
81}
82
83impl Disconnected {
84 pub fn reason(&self) -> DisconnectReason {
87 self.packet.reason
88 }
89
90 pub fn session_expiry_interval(&self) -> Duration {
93 Duration::from_secs(u64::from(u32::from(self.packet.session_expiry_interval)))
94 }
95
96 pub fn reason_string(&self) -> Option<&str> {
99 self.packet
100 .reason_string
101 .as_ref()
102 .map(|val| &val.0)
103 .map(|val| val.0.as_ref())
104 .map(str::from_utf8)
105 .and_then(Result::ok)
106 }
107
108 pub fn server_reference(&self) -> Option<&str> {
111 self.packet
112 .server_reference
113 .as_ref()
114 .map(|val| &val.0)
115 .map(|val| val.0.as_ref())
116 .map(str::from_utf8)
117 .and_then(Result::ok)
118 }
119
120 pub fn user_properties(&self) -> &UserProperties {
123 &self.packet.user_property
124 }
125}
126
127impl fmt::Debug for Disconnected {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 f.debug_struct("Disconnected")
130 .field("reason", &self.reason())
131 .field("session_expiry_interval", &self.session_expiry_interval())
132 .field("reason_string", &self.reason_string())
133 .field("server_reference", &self.server_reference())
134 .field("user_properties", &self.user_properties())
135 .finish()
136 }
137}
138
139impl fmt::Display for Disconnected {
140 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
141 write!(
142 f,
143 "disconnected with reason: {} [{:?}]",
144 self.reason() as u8,
145 self.reason()
146 )
147 }
148}
149
150impl Error for Disconnected {}
151
152#[derive(Debug, Clone)]
156pub struct InternalError {
157 msg: &'static str,
158}
159
160impl fmt::Display for InternalError {
161 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
162 write!(
163 f,
164 "{{ \"type\": \"InternalError\", \"message\": \"{}\" }}",
165 self.msg
166 )
167 }
168}
169
170impl Error for InternalError {}
171
172impl From<&'static str> for InternalError {
173 fn from(s: &'static str) -> Self {
174 Self { msg: s }
175 }
176}
177
178impl From<SystemTimeError> for InternalError {
179 fn from(_: SystemTimeError) -> Self {
180 Self {
181 msg: "system time error",
182 }
183 }
184}
185
186#[derive(Debug, Clone, Copy)]
190pub struct QuotaExceeded;
191
192impl fmt::Display for QuotaExceeded {
193 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194 write!(
195 f,
196 "{{ \"type\": \"QuotaExceeded\", \"message\": \"quota exceeded\" }}"
197 )
198 }
199}
200
201#[derive(Debug, Clone, Copy)]
206pub struct MaximumPacketSizeExceeded;
207
208impl fmt::Display for MaximumPacketSizeExceeded {
209 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
210 write!(
211 f,
212 "{{ \"type\": \"MaximumPacketSizeExceeded\", \"message\": \"packet too large\" }}"
213 )
214 }
215}
216
217#[derive(Clone)]
221pub struct ConnectError {
222 packet: ConnackRx,
223}
224
225impl ConnectError {
226 pub fn reason(&self) -> ConnectReason {
229 self.packet.reason
230 }
231
232 pub fn reason_string(&self) -> Option<&str> {
235 self.packet
236 .reason_string
237 .as_ref()
238 .map(|val| &val.0)
239 .map(|val| val.0.as_ref())
240 .map(str::from_utf8)
241 .and_then(Result::ok)
242 }
243
244 pub fn server_reference(&self) -> Option<&str> {
247 self.packet
248 .server_reference
249 .as_ref()
250 .map(|val| &val.0)
251 .map(|val| val.0.as_ref())
252 .map(str::from_utf8)
253 .and_then(Result::ok)
254 }
255
256 pub fn user_properties(&self) -> &UserProperties {
259 &self.packet.user_property
260 }
261}
262
263impl fmt::Debug for ConnectError {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 f.debug_struct("ConnectError")
266 .field("reason", &self.reason())
267 .field("reason_string", &self.reason_string())
268 .field("server_reference", &self.server_reference())
269 .field("user_properties", &self.user_properties())
270 .finish()
271 }
272}
273
274impl fmt::Display for ConnectError {
275 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276 write!(
277 f,
278 "{{ \"type\": \"ConnectError\", \"message\": \"connect error: {} [{:?}]\" }}",
279 self.reason() as u8,
280 self.reason()
281 )
282 }
283}
284
285impl Error for ConnectError {}
286
287impl From<ConnackRx> for ConnectError {
288 fn from(packet: ConnackRx) -> Self {
289 debug_assert!(packet.reason as u8 >= 0x80);
290 Self { packet }
291 }
292}
293
294impl From<ConnectError> for MqttError {
295 fn from(err: ConnectError) -> Self {
296 MqttError::ConnectError(err)
297 }
298}
299
300#[derive(Clone)]
304pub struct AuthError {
305 packet: AuthRx,
306}
307
308impl AuthError {
309 pub fn reason(&self) -> AuthReason {
312 self.packet.reason
313 }
314
315 pub fn reason_string(&self) -> Option<&str> {
318 self.packet
319 .reason_string
320 .as_ref()
321 .map(|val| &val.0)
322 .map(|val| val.0.as_ref())
323 .map(str::from_utf8)
324 .and_then(Result::ok)
325 }
326
327 pub fn user_properties(&self) -> &UserProperties {
330 &self.packet.user_property
331 }
332}
333
334impl fmt::Debug for AuthError {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 f.debug_struct("AuthError")
337 .field("reason", &self.reason())
338 .field("reason_string", &self.reason_string())
339 .field("user_properties", &self.user_properties())
340 .finish()
341 }
342}
343
344impl fmt::Display for AuthError {
345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346 write!(
347 f,
348 "{{ \"type\": \"AuthError\", \"message\": \"authorization error: {} [{:?}]\" }}",
349 self.reason() as u8,
350 self.reason()
351 )
352 }
353}
354
355impl Error for AuthError {}
356
357impl From<AuthRx> for AuthError {
358 fn from(packet: AuthRx) -> Self {
359 debug_assert!(packet.reason as u8 >= 0x80);
360 Self { packet }
361 }
362}
363
364impl From<AuthError> for MqttError {
365 fn from(err: AuthError) -> Self {
366 MqttError::AuthError(err)
367 }
368}
369
370#[derive(Clone)]
373pub struct AckError<ReasonT>
374where
375 ReasonT: Default,
376{
377 pub(crate) packet: AckRx<ReasonT>,
378}
379
380impl<ReasonT> AckError<ReasonT>
381where
382 ReasonT: Default + Copy,
383{
384 pub fn reason(&self) -> ReasonT {
387 self.packet.reason
388 }
389
390 pub fn reason_string(&self) -> Option<&str> {
393 self.packet
394 .reason_string
395 .as_ref()
396 .map(|val| &val.0)
397 .map(|val| val.0.as_ref())
398 .map(str::from_utf8)
399 .and_then(Result::ok)
400 }
401
402 pub fn user_properties(&self) -> &UserProperties {
405 &self.packet.user_property
406 }
407}
408
409impl<ReasonT> fmt::Debug for AckError<ReasonT>
410where
411 ReasonT: Copy + Default + fmt::Debug,
412{
413 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
414 f.debug_struct("AckError")
415 .field("reason", &self.reason())
416 .field("reason_string", &self.reason_string())
417 .field("user_properties", &self.user_properties())
418 .finish()
419 }
420}
421
422impl<ReasonT> From<AckRx<ReasonT>> for AckError<ReasonT>
423where
424 ReasonT: Default + fmt::Debug,
425{
426 fn from(packet: AckRx<ReasonT>) -> Self {
427 Self { packet }
428 }
429}
430
431pub type PubackError = AckError<PubackReason>;
435
436impl From<PubackError> for MqttError {
437 fn from(err: PubackError) -> Self {
438 MqttError::PubackError(err)
439 }
440}
441
442impl Error for PubackError {}
443
444impl Display for PubackError {
445 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446 write!(
447 f,
448 "{{ \"type\": \"PubackError\", \"message\": \"PubackError error: {} [{:?}]\" }}",
449 self.packet.reason as u8, self.packet.reason
450 )
451 }
452}
453
454pub type PubrecError = AckError<PubrecReason>;
458
459impl From<PubrecError> for MqttError {
460 fn from(err: PubrecError) -> Self {
461 MqttError::PubrecError(err)
462 }
463}
464
465impl Error for PubrecError {}
466
467impl Display for PubrecError {
468 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469 write!(
470 f,
471 "{{ \"type\": \"PubrecError\", \"message\": \"PubrecError error: {} [{:?}]\" }}",
472 self.packet.reason as u8, self.packet.reason
473 )
474 }
475}
476
477pub type PubcompError = AckError<PubcompReason>;
481
482impl From<PubcompError> for MqttError {
483 fn from(err: PubcompError) -> Self {
484 MqttError::PubcompError(err)
485 }
486}
487
488impl Error for PubcompError {}
489
490impl Display for PubcompError {
491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492 write!(
493 f,
494 "{{ \"type\": \"PubcompError\", \"message\": \"PubcompError error: {} [{:?}]\" }}",
495 self.packet.reason as u8, self.packet.reason
496 )
497 }
498}
499
500#[derive(Debug, Clone)]
503pub enum MqttError {
504 InternalError(InternalError),
507
508 ConnectError(ConnectError),
511
512 AuthError(AuthError),
515
516 PubackError(PubackError),
519
520 PubrecError(PubrecError),
523
524 PubcompError(PubcompError),
527
528 SocketClosed(SocketClosed),
531
532 HandleClosed(HandleClosed),
535
536 ContextExited(ContextExited),
539
540 Disconnected(Disconnected),
543
544 CodecError(CodecError),
547
548 QuotaExceeded(QuotaExceeded),
551
552 MaximumPacketSizeExceeded(MaximumPacketSizeExceeded),
555}
556
557impl fmt::Display for MqttError {
558 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
559 match self {
560 Self::InternalError(err) => write!(f, "{}", err),
561 Self::ConnectError(err) => write!(f, "{}", err),
562 Self::AuthError(err) => write!(f, "{}", err),
563 Self::PubackError(err) => write!(f, "{}", err),
564 Self::PubrecError(err) => write!(f, "{}", err),
565 Self::PubcompError(err) => write!(f, "{}", err),
566 Self::CodecError(err) => write!(f, "{}", err),
567 Self::SocketClosed(err) => {
568 write!(f, "{{ \"type\": \"MqttError\", \"message\": \"{}\" }}", err)
569 }
570 Self::HandleClosed(err) => {
571 write!(f, "{{ \"type\": \"MqttError\", \"message\": \"{}\" }}", err)
572 }
573 Self::ContextExited(err) => {
574 write!(f, "{{ \"type\": \"MqttError\", \"message\": \"{}\" }}", err)
575 }
576 Self::Disconnected(err) => {
577 write!(f, "{{ \"type\": \"MqttError\", \"message\": \"{}\" }}", err)
578 }
579 Self::QuotaExceeded(err) => write!(f, "{}", err),
580 Self::MaximumPacketSizeExceeded(err) => write!(f, "{}", err),
581 }
582 }
583}
584
585impl Error for MqttError {}
586
587impl From<InternalError> for MqttError {
588 fn from(err: InternalError) -> Self {
589 Self::InternalError(err)
590 }
591}
592
593impl From<SystemTimeError> for MqttError {
594 fn from(err: SystemTimeError) -> Self {
595 Self::InternalError(err.into())
596 }
597}
598
599impl From<SocketClosed> for MqttError {
600 fn from(err: SocketClosed) -> Self {
601 Self::SocketClosed(err)
602 }
603}
604
605impl From<io::Error> for MqttError {
606 fn from(err: io::Error) -> Self {
607 Self::SocketClosed(err.into())
608 }
609}
610
611impl From<HandleClosed> for MqttError {
612 fn from(err: HandleClosed) -> Self {
613 Self::HandleClosed(err)
614 }
615}
616
617impl From<Canceled> for MqttError {
618 fn from(err: Canceled) -> Self {
619 Self::ContextExited(err.into())
620 }
621}
622
623impl From<ContextExited> for MqttError {
624 fn from(err: ContextExited) -> Self {
625 Self::ContextExited(err)
626 }
627}
628
629impl<T> From<TrySendError<T>> for MqttError {
630 fn from(err: TrySendError<T>) -> Self {
631 Self::ContextExited(err.into())
632 }
633}
634
635impl From<CodecError> for MqttError {
636 fn from(err: CodecError) -> Self {
637 Self::CodecError(err)
638 }
639}
640
641impl From<Disconnected> for MqttError {
642 fn from(err: Disconnected) -> Self {
643 Self::Disconnected(err)
644 }
645}
646
647impl From<DisconnectRx> for MqttError {
648 fn from(packet: DisconnectRx) -> Self {
649 Self::Disconnected(Disconnected { packet })
650 }
651}
652
653impl From<QuotaExceeded> for MqttError {
654 fn from(err: QuotaExceeded) -> Self {
655 Self::QuotaExceeded(err)
656 }
657}
658
659impl From<MaximumPacketSizeExceeded> for MqttError {
660 fn from(err: MaximumPacketSizeExceeded) -> Self {
661 Self::MaximumPacketSizeExceeded(err)
662 }
663}