1use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12
13use tokio::sync::RwLock;
14use tokio::time::sleep;
15use tracing::{info, warn};
16
17use aranet_types::{CurrentReading, DeviceInfo, DeviceType, HistoryRecord};
18
19use crate::device::Device;
20use crate::error::{Error, Result};
21use crate::events::{DeviceEvent, DeviceId, EventSender};
22use crate::history::{HistoryInfo, HistoryOptions};
23use crate::settings::{CalibrationData, MeasurementInterval};
24use crate::traits::AranetDevice;
25
26#[derive(Debug, Clone)]
28pub struct ReconnectOptions {
29 pub max_attempts: Option<u32>,
31 pub initial_delay: Duration,
33 pub max_delay: Duration,
35 pub backoff_multiplier: f64,
37 pub use_exponential_backoff: bool,
39}
40
41impl Default for ReconnectOptions {
42 fn default() -> Self {
43 Self {
44 max_attempts: Some(5),
45 initial_delay: Duration::from_secs(1),
46 max_delay: Duration::from_secs(60),
47 backoff_multiplier: 2.0,
48 use_exponential_backoff: true,
49 }
50 }
51}
52
53impl ReconnectOptions {
54 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn unlimited() -> Self {
61 Self {
62 max_attempts: None,
63 ..Default::default()
64 }
65 }
66
67 pub fn fixed_delay(delay: Duration) -> Self {
69 Self {
70 initial_delay: delay,
71 use_exponential_backoff: false,
72 ..Default::default()
73 }
74 }
75
76 pub fn max_attempts(mut self, attempts: u32) -> Self {
78 self.max_attempts = Some(attempts);
79 self
80 }
81
82 pub fn initial_delay(mut self, delay: Duration) -> Self {
84 self.initial_delay = delay;
85 self
86 }
87
88 pub fn max_delay(mut self, delay: Duration) -> Self {
90 self.max_delay = delay;
91 self
92 }
93
94 pub fn backoff_multiplier(mut self, multiplier: f64) -> Self {
96 self.backoff_multiplier = multiplier;
97 self
98 }
99
100 pub fn exponential_backoff(mut self, enabled: bool) -> Self {
102 self.use_exponential_backoff = enabled;
103 self
104 }
105
106 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
108 if !self.use_exponential_backoff {
109 return self.initial_delay;
110 }
111
112 let capped_attempt = attempt.min(32);
115 let delay_ms = self.initial_delay.as_millis() as f64
116 * self.backoff_multiplier.powi(capped_attempt as i32);
117
118 let delay = if delay_ms.is_finite() && delay_ms <= u64::MAX as f64 {
120 Duration::from_millis(delay_ms as u64)
121 } else {
122 self.max_delay
123 };
124
125 delay.min(self.max_delay)
126 }
127
128 pub fn validate(&self) -> Result<()> {
135 if self.backoff_multiplier < 1.0 {
136 return Err(Error::InvalidConfig(
137 "backoff_multiplier must be >= 1.0".to_string(),
138 ));
139 }
140 if self.initial_delay.is_zero() {
141 return Err(Error::InvalidConfig(
142 "initial_delay must be > 0".to_string(),
143 ));
144 }
145 if self.max_delay < self.initial_delay {
146 return Err(Error::InvalidConfig(
147 "max_delay must be >= initial_delay".to_string(),
148 ));
149 }
150 Ok(())
151 }
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum ConnectionState {
157 Connected,
159 Disconnected,
161 Reconnecting,
163 Failed,
165}
166
167pub struct ReconnectingDevice {
173 identifier: String,
174 device: RwLock<Option<Arc<Device>>>,
176 options: ReconnectOptions,
177 state: RwLock<ConnectionState>,
178 event_sender: Option<EventSender>,
179 attempt_count: RwLock<u32>,
180 cancelled: Arc<AtomicBool>,
182 cached_name: std::sync::OnceLock<String>,
184 cached_device_type: std::sync::OnceLock<DeviceType>,
186}
187
188impl ReconnectingDevice {
189 pub async fn connect(identifier: &str, options: ReconnectOptions) -> Result<Self> {
191 options.validate()?;
192 let device = Arc::new(Device::connect(identifier).await?);
193
194 let cached_name = std::sync::OnceLock::new();
196 if let Some(name) = device.name() {
197 let _ = cached_name.set(name.to_string());
198 }
199
200 let cached_device_type = std::sync::OnceLock::new();
201 if let Some(device_type) = device.device_type() {
202 let _ = cached_device_type.set(device_type);
203 }
204
205 Ok(Self {
206 identifier: identifier.to_string(),
207 device: RwLock::new(Some(device)),
208 options,
209 state: RwLock::new(ConnectionState::Connected),
210 event_sender: None,
211 attempt_count: RwLock::new(0),
212 cancelled: Arc::new(AtomicBool::new(false)),
213 cached_name,
214 cached_device_type,
215 })
216 }
217
218 pub async fn connect_with_events(
220 identifier: &str,
221 options: ReconnectOptions,
222 event_sender: EventSender,
223 ) -> Result<Self> {
224 let mut this = Self::connect(identifier, options).await?;
225 this.event_sender = Some(event_sender);
226 Ok(this)
227 }
228
229 pub fn cancel_reconnect(&self) {
233 self.cancelled.store(true, Ordering::SeqCst);
234 }
235
236 pub fn is_cancelled(&self) -> bool {
238 self.cancelled.load(Ordering::SeqCst)
239 }
240
241 pub fn reset_cancellation(&self) {
248 self.cancelled.store(false, Ordering::SeqCst);
249 }
250
251 pub async fn state(&self) -> ConnectionState {
253 *self.state.read().await
254 }
255
256 pub async fn is_connected(&self) -> bool {
258 let guard = self.device.read().await;
259 if let Some(device) = guard.as_ref() {
260 device.is_connected().await
261 } else {
262 false
263 }
264 }
265
266 pub fn identifier(&self) -> &str {
268 &self.identifier
269 }
270
271 pub async fn with_device<F, Fut, T>(&self, f: F) -> Result<T>
283 where
284 F: Fn(&Device) -> Fut,
285 Fut: std::future::Future<Output = Result<T>>,
286 {
287 {
289 let guard = self.device.read().await;
290 if let Some(device) = guard.as_ref()
291 && device.is_connected().await
292 {
293 match f(device).await {
294 Ok(result) => return Ok(result),
295 Err(e) => {
296 warn!("Operation failed: {}", e);
297 }
299 }
300 }
301 }
302
303 self.reconnect().await?;
305
306 let guard = self.device.read().await;
308 if let Some(device) = guard.as_ref() {
309 f(device).await
310 } else {
311 Err(Error::NotConnected)
312 }
313 }
314
315 async fn run_with_reconnect<'a, T, F>(&'a self, f: F) -> Result<T>
325 where
326 F: for<'b> Fn(
327 &'b Device,
328 ) -> std::pin::Pin<
329 Box<dyn std::future::Future<Output = Result<T>> + Send + 'b>,
330 > + Send
331 + Sync,
332 T: Send,
333 {
334 {
336 let guard = self.device.read().await;
337 if let Some(device) = guard.as_ref()
338 && device.is_connected().await
339 {
340 match f(device).await {
341 Ok(result) => return Ok(result),
342 Err(e) => {
343 warn!("Operation failed: {}", e);
344 }
346 }
347 }
348 }
349
350 self.reconnect().await?;
352
353 let guard = self.device.read().await;
355 if let Some(device) = guard.as_ref() {
356 f(device).await
357 } else {
358 Err(Error::NotConnected)
359 }
360 }
361
362 pub async fn reconnect(&self) -> Result<()> {
371 *self.state.write().await = ConnectionState::Reconnecting;
378 *self.attempt_count.write().await = 0;
379
380 loop {
381 if self.is_cancelled() {
383 *self.state.write().await = ConnectionState::Disconnected;
384 info!("Reconnection cancelled for {}", self.identifier);
385 return Err(Error::Cancelled);
386 }
387
388 let attempt = {
389 let mut count = self.attempt_count.write().await;
390 *count += 1;
391 *count
392 };
393
394 if let Some(max) = self.options.max_attempts
396 && attempt > max
397 {
398 *self.state.write().await = ConnectionState::Failed;
399 return Err(Error::Timeout {
400 operation: format!("reconnect to '{}'", self.identifier),
401 duration: self.options.max_delay * max,
402 });
403 }
404
405 if let Some(sender) = &self.event_sender {
407 let _ = sender.send(DeviceEvent::ReconnectStarted {
408 device: DeviceId::new(&self.identifier),
409 attempt,
410 });
411 }
412
413 info!("Reconnection attempt {} for {}", attempt, self.identifier);
414
415 let delay = self.options.delay_for_attempt(attempt - 1);
417 sleep(delay).await;
418
419 if self.is_cancelled() {
421 *self.state.write().await = ConnectionState::Disconnected;
422 info!("Reconnection cancelled for {}", self.identifier);
423 return Err(Error::Cancelled);
424 }
425
426 match Device::connect(&self.identifier).await {
428 Ok(new_device) => {
429 *self.device.write().await = Some(Arc::new(new_device));
430 *self.state.write().await = ConnectionState::Connected;
431
432 if let Some(sender) = &self.event_sender {
434 let _ = sender.send(DeviceEvent::ReconnectSucceeded {
435 device: DeviceId::new(&self.identifier),
436 attempts: attempt,
437 });
438 }
439
440 info!("Reconnected successfully after {} attempts", attempt);
441 return Ok(());
442 }
443 Err(e) => {
444 warn!("Reconnection attempt {} failed: {}", attempt, e);
445 }
446 }
447 }
448 }
449
450 pub async fn disconnect(&self) -> Result<()> {
452 let mut guard = self.device.write().await;
453 if let Some(device) = guard.take() {
454 device.disconnect().await?;
455 }
456 *self.state.write().await = ConnectionState::Disconnected;
457 Ok(())
458 }
459
460 pub async fn attempt_count(&self) -> u32 {
462 *self.attempt_count.read().await
463 }
464
465 pub async fn name(&self) -> Option<String> {
467 let guard = self.device.read().await;
468 guard.as_ref().and_then(|d| d.name().map(|s| s.to_string()))
469 }
470
471 pub async fn address(&self) -> String {
473 let guard = self.device.read().await;
474 guard
475 .as_ref()
476 .map(|d| d.address().to_string())
477 .unwrap_or_else(|| self.identifier.clone())
478 }
479
480 pub async fn device_type(&self) -> Option<DeviceType> {
482 let guard = self.device.read().await;
483 guard.as_ref().and_then(|d| d.device_type())
484 }
485}
486
487impl AranetDevice for ReconnectingDevice {
489 async fn is_connected(&self) -> bool {
490 ReconnectingDevice::is_connected(self).await
491 }
492
493 async fn connect(&self) -> Result<()> {
494 if self.is_connected().await {
496 return Ok(());
497 }
498 self.reconnect().await
500 }
501
502 async fn disconnect(&self) -> Result<()> {
503 ReconnectingDevice::disconnect(self).await
504 }
505
506 fn name(&self) -> Option<&str> {
507 self.cached_name.get().map(|s| s.as_str())
508 }
509
510 fn address(&self) -> &str {
511 &self.identifier
512 }
513
514 fn device_type(&self) -> Option<DeviceType> {
515 self.cached_device_type.get().copied()
516 }
517
518 async fn read_current(&self) -> Result<CurrentReading> {
519 self.run_with_reconnect(|d| Box::pin(d.read_current()))
520 .await
521 }
522
523 async fn read_device_info(&self) -> Result<DeviceInfo> {
524 self.run_with_reconnect(|d| Box::pin(d.read_device_info()))
525 .await
526 }
527
528 async fn read_rssi(&self) -> Result<i16> {
529 self.run_with_reconnect(|d| Box::pin(d.read_rssi())).await
530 }
531
532 async fn read_battery(&self) -> Result<u8> {
533 self.run_with_reconnect(|d| Box::pin(d.read_battery()))
534 .await
535 }
536
537 async fn get_history_info(&self) -> Result<HistoryInfo> {
538 self.run_with_reconnect(|d| Box::pin(d.get_history_info()))
539 .await
540 }
541
542 async fn download_history(&self) -> Result<Vec<HistoryRecord>> {
543 self.run_with_reconnect(|d| Box::pin(d.download_history()))
544 .await
545 }
546
547 async fn download_history_with_options(
548 &self,
549 options: HistoryOptions,
550 ) -> Result<Vec<HistoryRecord>> {
551 let opts = options.clone();
552 self.run_with_reconnect(move |d| {
553 let opts = opts.clone();
554 Box::pin(async move { d.download_history_with_options(opts).await })
555 })
556 .await
557 }
558
559 async fn get_interval(&self) -> Result<MeasurementInterval> {
560 self.run_with_reconnect(|d| Box::pin(d.get_interval()))
561 .await
562 }
563
564 async fn set_interval(&self, interval: MeasurementInterval) -> Result<()> {
565 self.run_with_reconnect(move |d| Box::pin(d.set_interval(interval)))
566 .await
567 }
568
569 async fn get_calibration(&self) -> Result<CalibrationData> {
570 self.run_with_reconnect(|d| Box::pin(d.get_calibration()))
571 .await
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578
579 #[test]
580 fn test_reconnect_options_default() {
581 let opts = ReconnectOptions::default();
582 assert_eq!(opts.max_attempts, Some(5));
583 assert!(opts.use_exponential_backoff);
584 }
585
586 #[test]
587 fn test_reconnect_options_unlimited() {
588 let opts = ReconnectOptions::unlimited();
589 assert!(opts.max_attempts.is_none());
590 }
591
592 #[test]
593 fn test_delay_calculation() {
594 let opts = ReconnectOptions {
595 initial_delay: Duration::from_secs(1),
596 max_delay: Duration::from_secs(60),
597 backoff_multiplier: 2.0,
598 use_exponential_backoff: true,
599 ..Default::default()
600 };
601
602 assert_eq!(opts.delay_for_attempt(0), Duration::from_secs(1));
603 assert_eq!(opts.delay_for_attempt(1), Duration::from_secs(2));
604 assert_eq!(opts.delay_for_attempt(2), Duration::from_secs(4));
605 assert_eq!(opts.delay_for_attempt(3), Duration::from_secs(8));
606 }
607
608 #[test]
609 fn test_delay_capped_at_max() {
610 let opts = ReconnectOptions {
611 initial_delay: Duration::from_secs(1),
612 max_delay: Duration::from_secs(10),
613 backoff_multiplier: 2.0,
614 use_exponential_backoff: true,
615 ..Default::default()
616 };
617
618 assert_eq!(opts.delay_for_attempt(10), Duration::from_secs(10));
620 }
621
622 #[test]
623 fn test_fixed_delay() {
624 let opts = ReconnectOptions::fixed_delay(Duration::from_secs(5));
625 assert_eq!(opts.delay_for_attempt(0), Duration::from_secs(5));
626 assert_eq!(opts.delay_for_attempt(5), Duration::from_secs(5));
627 }
628}