Skip to main content

mqtt5_protocol/
error.rs

1use crate::packet::suback::SubAckReasonCode;
2use crate::prelude::String;
3use crate::protocol::v5::reason_codes::ReasonCode;
4
5#[cfg(feature = "std")]
6use thiserror::Error;
7
8#[cfg(not(feature = "std"))]
9use core::fmt;
10
11#[cfg(feature = "std")]
12pub type Result<T> = std::result::Result<T, MqttError>;
13
14#[cfg(not(feature = "std"))]
15pub type Result<T> = core::result::Result<T, MqttError>;
16
17#[derive(Debug, Clone)]
18#[cfg_attr(feature = "std", derive(Error))]
19pub enum MqttError {
20    #[cfg_attr(feature = "std", error("IO error: {0}"))]
21    Io(String),
22
23    #[cfg_attr(feature = "std", error("Invalid topic name: {0}"))]
24    InvalidTopicName(String),
25
26    #[cfg_attr(feature = "std", error("Invalid topic filter: {0}"))]
27    InvalidTopicFilter(String),
28
29    #[cfg_attr(feature = "std", error("Invalid client ID: {0}"))]
30    InvalidClientId(String),
31
32    #[cfg_attr(feature = "std", error("Connection error: {0}"))]
33    ConnectionError(String),
34
35    #[cfg_attr(feature = "std", error("Connection refused: {0:?}"))]
36    ConnectionRefused(ReasonCode),
37
38    #[cfg_attr(feature = "std", error("Protocol error: {0}"))]
39    ProtocolError(String),
40
41    #[cfg_attr(feature = "std", error("Malformed packet: {0}"))]
42    MalformedPacket(String),
43
44    #[cfg_attr(
45        feature = "std",
46        error("Packet too large: size {size} exceeds maximum {max}")
47    )]
48    PacketTooLarge { size: usize, max: usize },
49
50    #[cfg_attr(feature = "std", error("Authentication failed"))]
51    AuthenticationFailed,
52
53    #[cfg_attr(feature = "std", error("Not authorized"))]
54    NotAuthorized,
55
56    #[cfg_attr(feature = "std", error("Not connected"))]
57    NotConnected,
58
59    #[cfg_attr(feature = "std", error("Already connected"))]
60    AlreadyConnected,
61
62    #[cfg_attr(feature = "std", error("Timeout"))]
63    Timeout,
64
65    #[cfg_attr(feature = "std", error("Subscription failed: {0:?}"))]
66    SubscriptionFailed(ReasonCode),
67
68    #[cfg_attr(feature = "std", error("Subscription denied: {0:?}"))]
69    SubscriptionDenied(SubAckReasonCode),
70
71    #[cfg_attr(feature = "std", error("Unsubscription failed: {0:?}"))]
72    UnsubscriptionFailed(ReasonCode),
73
74    #[cfg_attr(feature = "std", error("Publish failed: {0:?}"))]
75    PublishFailed(ReasonCode),
76
77    #[cfg_attr(feature = "std", error("Packet identifier not found: {0}"))]
78    PacketIdNotFound(u16),
79
80    #[cfg_attr(feature = "std", error("Packet identifier already in use: {0}"))]
81    PacketIdInUse(u16),
82
83    #[cfg_attr(feature = "std", error("Invalid QoS: {0}"))]
84    InvalidQoS(u8),
85
86    #[cfg_attr(feature = "std", error("Invalid packet type: {0}"))]
87    InvalidPacketType(u8),
88
89    #[cfg_attr(feature = "std", error("Invalid reason code: {0}"))]
90    InvalidReasonCode(u8),
91
92    #[cfg_attr(feature = "std", error("Invalid property ID: {0}"))]
93    InvalidPropertyId(u8),
94
95    #[cfg_attr(feature = "std", error("Duplicate property ID: {0}"))]
96    DuplicatePropertyId(u8),
97
98    #[cfg_attr(feature = "std", error("Session expired"))]
99    SessionExpired,
100
101    #[cfg_attr(feature = "std", error("Keep alive timeout"))]
102    KeepAliveTimeout,
103
104    #[cfg_attr(feature = "std", error("Server shutting down"))]
105    ServerShuttingDown,
106
107    #[cfg_attr(feature = "std", error("Client closed connection"))]
108    ClientClosed,
109
110    #[cfg_attr(feature = "std", error("Connection closed by peer"))]
111    ConnectionClosedByPeer,
112
113    #[cfg_attr(feature = "std", error("Maximum connect time exceeded"))]
114    MaxConnectTime,
115
116    #[cfg_attr(feature = "std", error("Topic alias invalid: {0}"))]
117    TopicAliasInvalid(u16),
118
119    #[cfg_attr(feature = "std", error("Receive maximum exceeded"))]
120    ReceiveMaximumExceeded,
121
122    #[cfg_attr(feature = "std", error("Will message rejected"))]
123    WillRejected,
124
125    #[cfg_attr(feature = "std", error("Implementation specific error: {0}"))]
126    ImplementationSpecific(String),
127
128    #[cfg_attr(feature = "std", error("Unsupported protocol version"))]
129    UnsupportedProtocolVersion,
130
131    #[cfg_attr(feature = "std", error("Invalid state: {0}"))]
132    InvalidState(String),
133
134    #[cfg_attr(feature = "std", error("Client identifier not valid"))]
135    ClientIdentifierNotValid,
136
137    #[cfg_attr(feature = "std", error("Bad username or password"))]
138    BadUsernameOrPassword,
139
140    #[cfg_attr(feature = "std", error("Server unavailable"))]
141    ServerUnavailable,
142
143    #[cfg_attr(feature = "std", error("Server busy"))]
144    ServerBusy,
145
146    #[cfg_attr(feature = "std", error("Banned"))]
147    Banned,
148
149    #[cfg_attr(feature = "std", error("Bad authentication method"))]
150    BadAuthenticationMethod,
151
152    #[cfg_attr(feature = "std", error("Quota exceeded"))]
153    QuotaExceeded,
154
155    #[cfg_attr(feature = "std", error("Payload format invalid"))]
156    PayloadFormatInvalid,
157
158    #[cfg_attr(feature = "std", error("Retain not supported"))]
159    RetainNotSupported,
160
161    #[cfg_attr(feature = "std", error("QoS not supported"))]
162    QoSNotSupported,
163
164    #[cfg_attr(feature = "std", error("Use another server"))]
165    UseAnotherServer,
166
167    #[cfg_attr(feature = "std", error("Server moved"))]
168    ServerMoved,
169
170    #[cfg_attr(feature = "std", error("Shared subscriptions not supported"))]
171    SharedSubscriptionsNotSupported,
172
173    #[cfg_attr(feature = "std", error("Connection rate exceeded"))]
174    ConnectionRateExceeded,
175
176    #[cfg_attr(feature = "std", error("Subscription identifiers not supported"))]
177    SubscriptionIdentifiersNotSupported,
178
179    #[cfg_attr(feature = "std", error("Wildcard subscriptions not supported"))]
180    WildcardSubscriptionsNotSupported,
181
182    #[cfg_attr(feature = "std", error("Message too large for queue"))]
183    MessageTooLarge,
184
185    #[cfg_attr(feature = "std", error("Flow control exceeded"))]
186    FlowControlExceeded,
187
188    #[cfg_attr(feature = "std", error("Packet ID exhausted"))]
189    PacketIdExhausted,
190
191    #[cfg_attr(
192        feature = "std",
193        error("String too long: {0} bytes exceeds maximum of 65535")
194    )]
195    StringTooLong(usize),
196
197    #[cfg_attr(feature = "std", error("Configuration error: {0}"))]
198    Configuration(String),
199}
200
201#[cfg(not(feature = "std"))]
202impl fmt::Display for MqttError {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        match self {
205            Self::Io(s) => write!(f, "IO error: {s}"),
206            Self::InvalidTopicName(s) => write!(f, "Invalid topic name: {s}"),
207            Self::InvalidTopicFilter(s) => write!(f, "Invalid topic filter: {s}"),
208            Self::InvalidClientId(s) => write!(f, "Invalid client ID: {s}"),
209            Self::ConnectionError(s) => write!(f, "Connection error: {s}"),
210            Self::ConnectionRefused(r) => write!(f, "Connection refused: {r:?}"),
211            Self::ProtocolError(s) => write!(f, "Protocol error: {s}"),
212            Self::MalformedPacket(s) => write!(f, "Malformed packet: {s}"),
213            Self::PacketTooLarge { size, max } => {
214                write!(f, "Packet too large: size {size} exceeds maximum {max}")
215            }
216            Self::AuthenticationFailed => write!(f, "Authentication failed"),
217            Self::NotAuthorized => write!(f, "Not authorized"),
218            Self::NotConnected => write!(f, "Not connected"),
219            Self::AlreadyConnected => write!(f, "Already connected"),
220            Self::Timeout => write!(f, "Timeout"),
221            Self::SubscriptionFailed(r) => write!(f, "Subscription failed: {r:?}"),
222            Self::SubscriptionDenied(r) => write!(f, "Subscription denied: {r:?}"),
223            Self::UnsubscriptionFailed(r) => write!(f, "Unsubscription failed: {r:?}"),
224            Self::PublishFailed(r) => write!(f, "Publish failed: {r:?}"),
225            Self::PacketIdNotFound(id) => write!(f, "Packet identifier not found: {id}"),
226            Self::PacketIdInUse(id) => write!(f, "Packet identifier already in use: {id}"),
227            Self::InvalidQoS(q) => write!(f, "Invalid QoS: {q}"),
228            Self::InvalidPacketType(t) => write!(f, "Invalid packet type: {t}"),
229            Self::InvalidReasonCode(r) => write!(f, "Invalid reason code: {r}"),
230            Self::InvalidPropertyId(p) => write!(f, "Invalid property ID: {p}"),
231            Self::DuplicatePropertyId(p) => write!(f, "Duplicate property ID: {p}"),
232            Self::SessionExpired => write!(f, "Session expired"),
233            Self::KeepAliveTimeout => write!(f, "Keep alive timeout"),
234            Self::ServerShuttingDown => write!(f, "Server shutting down"),
235            Self::ClientClosed => write!(f, "Client closed connection"),
236            Self::ConnectionClosedByPeer => write!(f, "Connection closed by peer"),
237            Self::MaxConnectTime => write!(f, "Maximum connect time exceeded"),
238            Self::TopicAliasInvalid(a) => write!(f, "Topic alias invalid: {a}"),
239            Self::ReceiveMaximumExceeded => write!(f, "Receive maximum exceeded"),
240            Self::WillRejected => write!(f, "Will message rejected"),
241            Self::ImplementationSpecific(s) => write!(f, "Implementation specific error: {s}"),
242            Self::UnsupportedProtocolVersion => write!(f, "Unsupported protocol version"),
243            Self::InvalidState(s) => write!(f, "Invalid state: {s}"),
244            Self::ClientIdentifierNotValid => write!(f, "Client identifier not valid"),
245            Self::BadUsernameOrPassword => write!(f, "Bad username or password"),
246            Self::ServerUnavailable => write!(f, "Server unavailable"),
247            Self::ServerBusy => write!(f, "Server busy"),
248            Self::Banned => write!(f, "Banned"),
249            Self::BadAuthenticationMethod => write!(f, "Bad authentication method"),
250            Self::QuotaExceeded => write!(f, "Quota exceeded"),
251            Self::PayloadFormatInvalid => write!(f, "Payload format invalid"),
252            Self::RetainNotSupported => write!(f, "Retain not supported"),
253            Self::QoSNotSupported => write!(f, "QoS not supported"),
254            Self::UseAnotherServer => write!(f, "Use another server"),
255            Self::ServerMoved => write!(f, "Server moved"),
256            Self::SharedSubscriptionsNotSupported => {
257                write!(f, "Shared subscriptions not supported")
258            }
259            Self::ConnectionRateExceeded => write!(f, "Connection rate exceeded"),
260            Self::SubscriptionIdentifiersNotSupported => {
261                write!(f, "Subscription identifiers not supported")
262            }
263            Self::WildcardSubscriptionsNotSupported => {
264                write!(f, "Wildcard subscriptions not supported")
265            }
266            Self::MessageTooLarge => write!(f, "Message too large for queue"),
267            Self::FlowControlExceeded => write!(f, "Flow control exceeded"),
268            Self::PacketIdExhausted => write!(f, "Packet ID exhausted"),
269            Self::StringTooLong(len) => {
270                write!(f, "String too long: {len} bytes exceeds maximum of 65535")
271            }
272            Self::Configuration(s) => write!(f, "Configuration error: {s}"),
273        }
274    }
275}
276
277impl MqttError {
278    #[must_use]
279    pub fn is_normal_disconnect(&self) -> bool {
280        match self {
281            Self::ClientClosed
282            | Self::ConnectionClosedByPeer
283            | Self::UseAnotherServer
284            | Self::ServerMoved => true,
285            Self::Io(msg)
286                if msg.contains("stream has been shut down")
287                    || msg.contains("Connection reset") =>
288            {
289                true
290            }
291            _ => false,
292        }
293    }
294}
295
296#[cfg(feature = "std")]
297impl From<std::io::Error> for MqttError {
298    fn from(err: std::io::Error) -> Self {
299        MqttError::Io(err.to_string())
300    }
301}
302
303impl From<String> for MqttError {
304    fn from(msg: String) -> Self {
305        MqttError::MalformedPacket(msg)
306    }
307}
308
309impl From<&str> for MqttError {
310    fn from(msg: &str) -> Self {
311        MqttError::MalformedPacket(crate::prelude::ToString::to_string(msg))
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn test_error_display() {
321        let err = MqttError::InvalidTopicName("test/+/topic".to_string());
322        assert_eq!(err.to_string(), "Invalid topic name: test/+/topic");
323
324        let err = MqttError::PacketTooLarge {
325            size: 1000,
326            max: 500,
327        };
328        assert_eq!(
329            err.to_string(),
330            "Packet too large: size 1000 exceeds maximum 500"
331        );
332
333        let err = MqttError::ConnectionRefused(ReasonCode::BadUsernameOrPassword);
334        assert_eq!(err.to_string(), "Connection refused: BadUsernameOrPassword");
335    }
336
337    #[cfg(feature = "std")]
338    #[test]
339    fn test_error_from_io() {
340        use std::io;
341        let io_err = io::Error::new(io::ErrorKind::ConnectionRefused, "test");
342        let mqtt_err: MqttError = io_err.into();
343        match mqtt_err {
344            MqttError::Io(e) => assert!(e.contains("test")),
345            _ => panic!("Expected Io error"),
346        }
347    }
348
349    #[test]
350    fn test_result_type() {
351        #[allow(clippy::unnecessary_wraps)]
352        fn returns_result() -> Result<String> {
353            Ok("success".to_string())
354        }
355
356        fn returns_error() -> Result<String> {
357            Err(MqttError::NotConnected)
358        }
359
360        assert!(returns_result().is_ok());
361        assert!(returns_error().is_err());
362    }
363}