mqtt5_protocol/
error_classification.rs1use crate::error::MqttError;
2use crate::protocol::v5::reason_codes::ReasonCode;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
5pub enum RecoverableError {
6 NetworkError,
7 ServerUnavailable,
8 QuotaExceeded,
9 PacketIdExhausted,
10 FlowControlLimited,
11 SessionTakenOver,
12 ServerShuttingDown,
13 MqoqFlowRecoverable,
14}
15
16impl RecoverableError {
17 #[must_use]
18 pub fn base_delay_multiplier(&self) -> u32 {
19 match self {
20 Self::QuotaExceeded => 10,
21 Self::MqoqFlowRecoverable => 3,
22 Self::FlowControlLimited => 2,
23 _ => 1,
24 }
25 }
26
27 #[must_use]
28 pub fn default_set() -> [Self; 6] {
29 [
30 Self::NetworkError,
31 Self::ServerUnavailable,
32 Self::QuotaExceeded,
33 Self::PacketIdExhausted,
34 Self::FlowControlLimited,
35 Self::MqoqFlowRecoverable,
36 ]
37 }
38}
39
40impl MqttError {
41 #[must_use]
42 pub fn classify(&self) -> Option<RecoverableError> {
43 match self {
44 Self::ConnectionError(msg) => classify_connection_error(msg),
45 Self::ConnectionRefused(reason) => classify_connection_refused(*reason),
46 Self::PacketIdExhausted => Some(RecoverableError::PacketIdExhausted),
47 Self::FlowControlExceeded => Some(RecoverableError::FlowControlLimited),
48 Self::Timeout => Some(RecoverableError::NetworkError),
49 Self::ServerUnavailable | Self::ServerBusy => Some(RecoverableError::ServerUnavailable),
50 Self::QuotaExceeded => Some(RecoverableError::QuotaExceeded),
51 Self::ServerShuttingDown => Some(RecoverableError::ServerShuttingDown),
52 Self::SessionExpired => Some(RecoverableError::SessionTakenOver),
53 _ => None,
54 }
55 }
56
57 #[must_use]
58 pub fn is_aws_iot_connection_limit(&self) -> bool {
59 match self {
60 Self::ConnectionError(msg) => is_aws_iot_limit_error(msg),
61 _ => false,
62 }
63 }
64}
65
66fn classify_connection_error(msg: &str) -> Option<RecoverableError> {
67 if is_aws_iot_limit_error(msg) {
68 return None;
69 }
70
71 if msg.contains("temporarily unavailable")
72 || msg.contains("Connection refused")
73 || msg.contains("Network is unreachable")
74 || msg.contains("connection reset")
75 || msg.contains("broken pipe")
76 || msg.contains("timed out")
77 {
78 return Some(RecoverableError::NetworkError);
79 }
80
81 None
82}
83
84fn is_aws_iot_limit_error(msg: &str) -> bool {
85 msg.contains("Connection reset by peer")
86 || msg.contains("RST")
87 || msg.contains("TCP RST")
88 || msg.contains("reset by peer")
89 || msg.contains("connection limit")
90 || msg.contains("client limit")
91}
92
93fn classify_connection_refused(reason: ReasonCode) -> Option<RecoverableError> {
94 match reason {
95 ReasonCode::ServerUnavailable | ReasonCode::ServerBusy => {
96 Some(RecoverableError::ServerUnavailable)
97 }
98 ReasonCode::QuotaExceeded => Some(RecoverableError::QuotaExceeded),
99 ReasonCode::SessionTakenOver => Some(RecoverableError::SessionTakenOver),
100 ReasonCode::ServerShuttingDown => Some(RecoverableError::ServerShuttingDown),
101 ReasonCode::MqoqIncompletePacket
102 | ReasonCode::MqoqFlowOpenIdle
103 | ReasonCode::MqoqFlowCancelled
104 | ReasonCode::MqoqFlowPacketCancelled
105 | ReasonCode::MqoqFlowRefused => Some(RecoverableError::MqoqFlowRecoverable),
106 _ => None,
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[test]
115 fn test_connection_error_classification() {
116 let error = MqttError::ConnectionError("Connection refused".to_string());
117 assert_eq!(error.classify(), Some(RecoverableError::NetworkError));
118
119 let error = MqttError::ConnectionError("Network is unreachable".to_string());
120 assert_eq!(error.classify(), Some(RecoverableError::NetworkError));
121
122 let error = MqttError::ConnectionError("temporarily unavailable".to_string());
123 assert_eq!(error.classify(), Some(RecoverableError::NetworkError));
124 }
125
126 #[test]
127 fn test_aws_iot_limit_not_recoverable() {
128 let error = MqttError::ConnectionError("Connection reset by peer".to_string());
129 assert_eq!(error.classify(), None);
130 assert!(error.is_aws_iot_connection_limit());
131
132 let error = MqttError::ConnectionError("TCP RST received".to_string());
133 assert_eq!(error.classify(), None);
134 assert!(error.is_aws_iot_connection_limit());
135
136 let error = MqttError::ConnectionError("client limit exceeded".to_string());
137 assert_eq!(error.classify(), None);
138 assert!(error.is_aws_iot_connection_limit());
139 }
140
141 #[test]
142 fn test_connection_refused_classification() {
143 let error = MqttError::ConnectionRefused(ReasonCode::ServerUnavailable);
144 assert_eq!(error.classify(), Some(RecoverableError::ServerUnavailable));
145
146 let error = MqttError::ConnectionRefused(ReasonCode::QuotaExceeded);
147 assert_eq!(error.classify(), Some(RecoverableError::QuotaExceeded));
148
149 let error = MqttError::ConnectionRefused(ReasonCode::SessionTakenOver);
150 assert_eq!(error.classify(), Some(RecoverableError::SessionTakenOver));
151
152 let error = MqttError::ConnectionRefused(ReasonCode::BadUsernameOrPassword);
153 assert_eq!(error.classify(), None);
154 }
155
156 #[test]
157 fn test_mqoq_classification() {
158 let error = MqttError::ConnectionRefused(ReasonCode::MqoqIncompletePacket);
159 assert_eq!(
160 error.classify(),
161 Some(RecoverableError::MqoqFlowRecoverable)
162 );
163
164 let error = MqttError::ConnectionRefused(ReasonCode::MqoqFlowOpenIdle);
165 assert_eq!(
166 error.classify(),
167 Some(RecoverableError::MqoqFlowRecoverable)
168 );
169
170 let error = MqttError::ConnectionRefused(ReasonCode::MqoqFlowCancelled);
171 assert_eq!(
172 error.classify(),
173 Some(RecoverableError::MqoqFlowRecoverable)
174 );
175
176 let error = MqttError::ConnectionRefused(ReasonCode::MqoqNoFlowState);
177 assert_eq!(error.classify(), None);
178 }
179
180 #[test]
181 fn test_direct_error_classification() {
182 assert_eq!(
183 MqttError::PacketIdExhausted.classify(),
184 Some(RecoverableError::PacketIdExhausted)
185 );
186 assert_eq!(
187 MqttError::FlowControlExceeded.classify(),
188 Some(RecoverableError::FlowControlLimited)
189 );
190 assert_eq!(
191 MqttError::Timeout.classify(),
192 Some(RecoverableError::NetworkError)
193 );
194 assert_eq!(
195 MqttError::ServerUnavailable.classify(),
196 Some(RecoverableError::ServerUnavailable)
197 );
198 assert_eq!(
199 MqttError::ServerBusy.classify(),
200 Some(RecoverableError::ServerUnavailable)
201 );
202 assert_eq!(
203 MqttError::QuotaExceeded.classify(),
204 Some(RecoverableError::QuotaExceeded)
205 );
206 assert_eq!(
207 MqttError::ServerShuttingDown.classify(),
208 Some(RecoverableError::ServerShuttingDown)
209 );
210 }
211
212 #[test]
213 fn test_non_recoverable_errors() {
214 assert_eq!(MqttError::NotConnected.classify(), None);
215 assert_eq!(MqttError::AlreadyConnected.classify(), None);
216 assert_eq!(MqttError::AuthenticationFailed.classify(), None);
217 assert_eq!(MqttError::NotAuthorized.classify(), None);
218 assert_eq!(MqttError::BadUsernameOrPassword.classify(), None);
219 assert_eq!(
220 MqttError::ProtocolError("test".to_string()).classify(),
221 None
222 );
223 }
224
225 #[test]
226 fn test_base_delay_multiplier() {
227 assert_eq!(RecoverableError::NetworkError.base_delay_multiplier(), 1);
228 assert_eq!(
229 RecoverableError::ServerUnavailable.base_delay_multiplier(),
230 1
231 );
232 assert_eq!(RecoverableError::QuotaExceeded.base_delay_multiplier(), 10);
233 assert_eq!(
234 RecoverableError::FlowControlLimited.base_delay_multiplier(),
235 2
236 );
237 assert_eq!(
238 RecoverableError::MqoqFlowRecoverable.base_delay_multiplier(),
239 3
240 );
241 }
242
243 #[test]
244 fn test_default_set() {
245 let defaults = RecoverableError::default_set();
246 assert_eq!(defaults.len(), 6);
247 assert!(defaults.contains(&RecoverableError::NetworkError));
248 assert!(defaults.contains(&RecoverableError::ServerUnavailable));
249 assert!(defaults.contains(&RecoverableError::QuotaExceeded));
250 assert!(defaults.contains(&RecoverableError::PacketIdExhausted));
251 assert!(defaults.contains(&RecoverableError::FlowControlLimited));
252 assert!(defaults.contains(&RecoverableError::MqoqFlowRecoverable));
253 assert!(!defaults.contains(&RecoverableError::SessionTakenOver));
254 assert!(!defaults.contains(&RecoverableError::ServerShuttingDown));
255 }
256}