mqtt5_protocol/
connection.rs

1use crate::error::MqttError;
2use crate::prelude::*;
3use crate::time::Duration;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
6pub enum ConnectionState {
7    #[default]
8    Disconnected,
9    Connecting,
10    Connected,
11    Reconnecting {
12        attempt: u32,
13    },
14}
15
16impl ConnectionState {
17    #[must_use]
18    pub fn is_connected(&self) -> bool {
19        matches!(self, Self::Connected)
20    }
21
22    #[must_use]
23    pub fn is_disconnected(&self) -> bool {
24        matches!(self, Self::Disconnected)
25    }
26
27    #[must_use]
28    pub fn is_reconnecting(&self) -> bool {
29        matches!(self, Self::Reconnecting { .. })
30    }
31
32    #[must_use]
33    pub fn reconnect_attempt(&self) -> Option<u32> {
34        match self {
35            Self::Reconnecting { attempt } => Some(*attempt),
36            _ => None,
37        }
38    }
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum DisconnectReason {
43    ClientInitiated,
44    ServerClosed,
45    NetworkError(String),
46    ProtocolError(String),
47    KeepAliveTimeout,
48    AuthFailure,
49}
50
51#[derive(Debug, Clone)]
52pub enum ConnectionEvent {
53    Connecting,
54    Connected { session_present: bool },
55    Disconnected { reason: DisconnectReason },
56    Reconnecting { attempt: u32 },
57    ReconnectFailed { error: MqttError },
58}
59
60#[derive(Debug, Clone, Default)]
61pub struct ConnectionInfo {
62    pub session_present: bool,
63    pub assigned_client_id: Option<String>,
64    pub server_keep_alive: Option<u16>,
65}
66
67#[derive(Debug, Clone)]
68pub struct ReconnectConfig {
69    pub enabled: bool,
70    pub initial_delay: Duration,
71    pub max_delay: Duration,
72    pub backoff_factor_tenths: u32,
73    pub max_attempts: Option<u32>,
74}
75
76impl Default for ReconnectConfig {
77    fn default() -> Self {
78        Self {
79            enabled: true,
80            initial_delay: Duration::from_secs(1),
81            max_delay: Duration::from_secs(60),
82            backoff_factor_tenths: 20,
83            max_attempts: None,
84        }
85    }
86}
87
88impl ReconnectConfig {
89    #[must_use]
90    pub fn disabled() -> Self {
91        Self {
92            enabled: false,
93            ..Default::default()
94        }
95    }
96
97    #[must_use]
98    pub fn backoff_factor(&self) -> f64 {
99        f64::from(self.backoff_factor_tenths) / 10.0
100    }
101
102    pub fn set_backoff_factor(&mut self, factor: f64) {
103        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
104        {
105            self.backoff_factor_tenths = (factor * 10.0) as u32;
106        }
107    }
108
109    #[must_use]
110    #[allow(
111        clippy::cast_possible_truncation,
112        clippy::cast_sign_loss,
113        clippy::cast_possible_wrap,
114        clippy::cast_precision_loss
115    )]
116    pub fn calculate_delay(&self, attempt: u32) -> Duration {
117        if attempt == 0 {
118            return self.initial_delay;
119        }
120
121        let initial_ms = self.initial_delay.as_millis() as u64;
122        let max_ms = self.max_delay.as_millis() as u64;
123
124        let factor_tenths = u64::from(self.backoff_factor_tenths);
125        let mut delay_tenths = initial_ms.saturating_mul(10);
126
127        for _ in 0..attempt {
128            delay_tenths = delay_tenths.saturating_mul(factor_tenths) / 10;
129            if delay_tenths / 10 >= max_ms {
130                return self.max_delay;
131            }
132        }
133
134        Duration::from_millis((delay_tenths / 10).min(max_ms))
135    }
136
137    #[must_use]
138    pub fn should_retry(&self, attempt: u32) -> bool {
139        if !self.enabled {
140            return false;
141        }
142        match self.max_attempts {
143            Some(max) => attempt < max,
144            None => true,
145        }
146    }
147}
148
149#[derive(Debug, Clone)]
150pub struct ConnectionStateMachine {
151    state: ConnectionState,
152    info: ConnectionInfo,
153    reconnect_config: ReconnectConfig,
154}
155
156impl Default for ConnectionStateMachine {
157    fn default() -> Self {
158        Self {
159            state: ConnectionState::Disconnected,
160            info: ConnectionInfo::default(),
161            reconnect_config: ReconnectConfig::default(),
162        }
163    }
164}
165
166impl ConnectionStateMachine {
167    #[must_use]
168    pub fn new(reconnect_config: ReconnectConfig) -> Self {
169        Self {
170            state: ConnectionState::Disconnected,
171            info: ConnectionInfo::default(),
172            reconnect_config,
173        }
174    }
175
176    #[must_use]
177    pub fn state(&self) -> ConnectionState {
178        self.state
179    }
180
181    #[must_use]
182    pub fn info(&self) -> &ConnectionInfo {
183        &self.info
184    }
185
186    #[must_use]
187    pub fn reconnect_config(&self) -> &ReconnectConfig {
188        &self.reconnect_config
189    }
190
191    pub fn set_reconnect_config(&mut self, config: ReconnectConfig) {
192        self.reconnect_config = config;
193    }
194
195    pub fn transition(&mut self, event: &ConnectionEvent) -> ConnectionState {
196        match event {
197            ConnectionEvent::Connecting => {
198                self.state = ConnectionState::Connecting;
199            }
200            ConnectionEvent::Connected { session_present } => {
201                self.state = ConnectionState::Connected;
202                self.info.session_present = *session_present;
203            }
204            ConnectionEvent::Disconnected { .. } | ConnectionEvent::ReconnectFailed { .. } => {
205                self.state = ConnectionState::Disconnected;
206                self.info = ConnectionInfo::default();
207            }
208            ConnectionEvent::Reconnecting { attempt } => {
209                self.state = ConnectionState::Reconnecting { attempt: *attempt };
210            }
211        }
212        self.state
213    }
214
215    pub fn set_connection_info(&mut self, info: ConnectionInfo) {
216        self.info = info;
217    }
218
219    #[must_use]
220    pub fn is_connected(&self) -> bool {
221        self.state.is_connected()
222    }
223
224    #[must_use]
225    pub fn should_reconnect(&self) -> bool {
226        match self.state {
227            ConnectionState::Disconnected => self.reconnect_config.enabled,
228            ConnectionState::Reconnecting { attempt } => {
229                self.reconnect_config.should_retry(attempt + 1)
230            }
231            _ => false,
232        }
233    }
234
235    #[must_use]
236    pub fn next_reconnect_delay(&self) -> Option<Duration> {
237        match self.state {
238            ConnectionState::Disconnected => {
239                if self.reconnect_config.enabled {
240                    Some(self.reconnect_config.calculate_delay(0))
241                } else {
242                    None
243                }
244            }
245            ConnectionState::Reconnecting { attempt } => {
246                if self.reconnect_config.should_retry(attempt + 1) {
247                    Some(self.reconnect_config.calculate_delay(attempt))
248                } else {
249                    None
250                }
251            }
252            _ => None,
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    #[test]
262    fn test_connection_state_default() {
263        let state = ConnectionState::default();
264        assert!(state.is_disconnected());
265    }
266
267    #[test]
268    fn test_state_machine_transitions() {
269        let mut sm = ConnectionStateMachine::default();
270
271        assert!(sm.state().is_disconnected());
272
273        sm.transition(&ConnectionEvent::Connecting);
274        assert_eq!(sm.state(), ConnectionState::Connecting);
275
276        sm.transition(&ConnectionEvent::Connected {
277            session_present: true,
278        });
279        assert!(sm.is_connected());
280        assert!(sm.info().session_present);
281
282        sm.transition(&ConnectionEvent::Disconnected {
283            reason: DisconnectReason::NetworkError("timeout".into()),
284        });
285        assert!(sm.state().is_disconnected());
286        assert!(!sm.info().session_present);
287    }
288
289    #[test]
290    fn test_reconnect_delay_calculation() {
291        let config = ReconnectConfig {
292            enabled: true,
293            initial_delay: Duration::from_secs(1),
294            max_delay: Duration::from_secs(30),
295            backoff_factor_tenths: 20,
296            max_attempts: Some(5),
297        };
298
299        assert_eq!(config.calculate_delay(0), Duration::from_secs(1));
300        assert_eq!(config.calculate_delay(1), Duration::from_secs(2));
301        assert_eq!(config.calculate_delay(2), Duration::from_secs(4));
302        assert_eq!(config.calculate_delay(3), Duration::from_secs(8));
303        assert_eq!(config.calculate_delay(4), Duration::from_secs(16));
304        assert_eq!(config.calculate_delay(5), Duration::from_secs(30));
305    }
306
307    #[test]
308    fn test_should_retry() {
309        let config = ReconnectConfig {
310            enabled: true,
311            max_attempts: Some(3),
312            ..Default::default()
313        };
314
315        assert!(config.should_retry(0));
316        assert!(config.should_retry(1));
317        assert!(config.should_retry(2));
318        assert!(!config.should_retry(3));
319        assert!(!config.should_retry(4));
320    }
321
322    #[test]
323    fn test_disabled_reconnect() {
324        let config = ReconnectConfig::disabled();
325        assert!(!config.should_retry(0));
326    }
327
328    #[test]
329    fn test_reconnect_flow() {
330        let mut sm = ConnectionStateMachine::new(ReconnectConfig {
331            enabled: true,
332            initial_delay: Duration::from_millis(100),
333            max_delay: Duration::from_secs(10),
334            backoff_factor_tenths: 20,
335            max_attempts: Some(3),
336        });
337
338        sm.transition(&ConnectionEvent::Connecting);
339        sm.transition(&ConnectionEvent::Connected {
340            session_present: false,
341        });
342        assert!(sm.is_connected());
343
344        sm.transition(&ConnectionEvent::Disconnected {
345            reason: DisconnectReason::NetworkError("connection lost".into()),
346        });
347        assert!(sm.should_reconnect());
348
349        sm.transition(&ConnectionEvent::Reconnecting { attempt: 0 });
350        assert!(sm.state().is_reconnecting());
351        assert_eq!(sm.state().reconnect_attempt(), Some(0));
352        assert!(sm.should_reconnect());
353
354        sm.transition(&ConnectionEvent::Reconnecting { attempt: 1 });
355        assert!(sm.should_reconnect());
356
357        sm.transition(&ConnectionEvent::Reconnecting { attempt: 2 });
358        assert!(!sm.should_reconnect());
359    }
360}