Skip to main content

aranet_core/
manager.rs

1//! Multi-device management.
2//!
3//! This module provides a manager for handling multiple Aranet devices
4//! simultaneously, with connection pooling and concurrent operations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::Duration;
10
11use futures::future::join_all;
12use tokio::sync::RwLock;
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, info, warn};
15
16use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
17
18use crate::device::Device;
19use crate::error::{Error, Result};
20use crate::events::{DeviceEvent, DeviceId, DisconnectReason, EventDispatcher};
21use crate::passive::{PassiveMonitor, PassiveMonitorOptions, PassiveReading};
22use crate::reconnect::ReconnectOptions;
23use crate::scan::{DiscoveredDevice, ScanOptions, scan_with_options};
24
25/// Device priority levels for connection management.
26///
27/// When the connection limit is reached, lower priority devices
28/// may be disconnected to make room for higher priority devices.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
30pub enum DevicePriority {
31    /// Low priority - may be disconnected when at capacity.
32    Low,
33    /// Normal priority (default).
34    #[default]
35    Normal,
36    /// High priority - maintain connection, disconnect lower priorities if needed.
37    High,
38    /// Critical priority - never disconnect automatically.
39    Critical,
40}
41
42/// Adaptive interval that adjusts based on connection stability.
43///
44/// This is used by the health monitor to check connections more frequently
45/// when connections are unstable, and less frequently when stable.
46#[derive(Debug, Clone)]
47pub struct AdaptiveInterval {
48    /// Base interval when connections are stable.
49    pub base: Duration,
50    /// Current interval (may differ from base based on stability).
51    current: Duration,
52    /// Minimum interval (most frequent checking).
53    pub min: Duration,
54    /// Maximum interval (least frequent checking).
55    pub max: Duration,
56    /// Number of consecutive successes.
57    consecutive_successes: u32,
58    /// Number of consecutive failures.
59    consecutive_failures: u32,
60    /// Success threshold before increasing interval.
61    success_threshold: u32,
62    /// Failure threshold before decreasing interval.
63    failure_threshold: u32,
64}
65
66impl Default for AdaptiveInterval {
67    fn default() -> Self {
68        Self {
69            base: Duration::from_secs(30),
70            current: Duration::from_secs(30),
71            min: Duration::from_secs(5),
72            max: Duration::from_secs(120),
73            consecutive_successes: 0,
74            consecutive_failures: 0,
75            success_threshold: 3,
76            failure_threshold: 1,
77        }
78    }
79}
80
81impl AdaptiveInterval {
82    /// Create a new adaptive interval with custom settings.
83    pub fn new(base: Duration, min: Duration, max: Duration) -> Self {
84        Self {
85            base,
86            current: base,
87            min,
88            max,
89            ..Default::default()
90        }
91    }
92
93    /// Get the current interval.
94    pub fn current(&self) -> Duration {
95        self.current
96    }
97
98    /// Record a successful health check.
99    ///
100    /// After enough consecutive successes, the interval will increase
101    /// (less frequent checks) up to the maximum.
102    pub fn on_success(&mut self) {
103        self.consecutive_failures = 0;
104        self.consecutive_successes += 1;
105
106        if self.consecutive_successes >= self.success_threshold {
107            // Double the interval, capped at max
108            let new_interval = self.current.saturating_mul(2);
109            self.current = new_interval.min(self.max);
110            self.consecutive_successes = 0;
111            debug!(
112                "Health check stable, increasing interval to {:?}",
113                self.current
114            );
115        }
116    }
117
118    /// Record a failed health check (connection lost or reconnect needed).
119    ///
120    /// After enough consecutive failures, the interval will decrease
121    /// (more frequent checks) down to the minimum.
122    pub fn on_failure(&mut self) {
123        self.consecutive_successes = 0;
124        self.consecutive_failures += 1;
125
126        if self.consecutive_failures >= self.failure_threshold {
127            // Halve the interval, capped at min
128            let new_interval = self.current / 2;
129            self.current = new_interval.max(self.min);
130            self.consecutive_failures = 0;
131            debug!(
132                "Health check unstable, decreasing interval to {:?}",
133                self.current
134            );
135        }
136    }
137
138    /// Reset to the base interval.
139    pub fn reset(&mut self) {
140        self.current = self.base;
141        self.consecutive_successes = 0;
142        self.consecutive_failures = 0;
143    }
144}
145
146/// Information about a managed device.
147#[derive(Debug)]
148pub struct ManagedDevice {
149    /// Device identifier.
150    pub id: String,
151    /// Device name.
152    pub name: Option<String>,
153    /// Device type.
154    pub device_type: Option<DeviceType>,
155    /// The connected device (if connected).
156    /// Wrapped in Arc to allow concurrent access without holding the manager lock.
157    device: Option<Arc<Device>>,
158    /// Whether a connection attempt is currently in progress.
159    /// This prevents race conditions where multiple tasks try to connect simultaneously.
160    connecting: AtomicBool,
161    /// Whether auto-reconnect is enabled.
162    pub auto_reconnect: bool,
163    /// Last known reading.
164    pub last_reading: Option<CurrentReading>,
165    /// Device info.
166    pub info: Option<DeviceInfo>,
167    /// Reconnection options (if auto-reconnect is enabled).
168    pub reconnect_options: ReconnectOptions,
169    /// Device priority for connection management.
170    pub priority: DevicePriority,
171    /// Number of consecutive connection failures.
172    pub consecutive_failures: u32,
173    /// Last successful connection timestamp (Unix epoch millis).
174    pub last_success: Option<u64>,
175}
176
177impl ManagedDevice {
178    /// Create a new managed device entry.
179    pub fn new(id: &str) -> Self {
180        Self {
181            id: id.to_string(),
182            name: None,
183            device_type: None,
184            device: None,
185            connecting: AtomicBool::new(false),
186            auto_reconnect: true,
187            last_reading: None,
188            info: None,
189            reconnect_options: ReconnectOptions::default(),
190            priority: DevicePriority::default(),
191            consecutive_failures: 0,
192            last_success: None,
193        }
194    }
195
196    /// Create a managed device with custom reconnect options.
197    pub fn with_reconnect_options(id: &str, options: ReconnectOptions) -> Self {
198        Self {
199            reconnect_options: options,
200            ..Self::new(id)
201        }
202    }
203
204    /// Create a managed device with priority.
205    pub fn with_priority(id: &str, priority: DevicePriority) -> Self {
206        Self {
207            priority,
208            ..Self::new(id)
209        }
210    }
211
212    /// Create a managed device with reconnect options and priority.
213    pub fn with_options(id: &str, options: ReconnectOptions, priority: DevicePriority) -> Self {
214        Self {
215            reconnect_options: options,
216            priority,
217            ..Self::new(id)
218        }
219    }
220
221    /// Record a successful operation.
222    pub fn record_success(&mut self) {
223        self.consecutive_failures = 0;
224        self.last_success = Some(
225            std::time::SystemTime::now()
226                .duration_since(std::time::UNIX_EPOCH)
227                .unwrap_or_default()
228                .as_millis() as u64,
229        );
230    }
231
232    /// Record a failed operation.
233    pub fn record_failure(&mut self) {
234        self.consecutive_failures += 1;
235    }
236
237    /// Check if the device is connected (sync check, doesn't query BLE).
238    pub fn has_device(&self) -> bool {
239        self.device.is_some()
240    }
241
242    /// Check if the device is connected (async, queries BLE).
243    pub async fn is_connected(&self) -> bool {
244        if let Some(device) = &self.device {
245            device.is_connected().await
246        } else {
247            false
248        }
249    }
250
251    /// Get a reference to the underlying device.
252    pub fn device(&self) -> Option<&Arc<Device>> {
253        self.device.as_ref()
254    }
255
256    /// Get a clone of the device Arc.
257    pub fn device_arc(&self) -> Option<Arc<Device>> {
258        self.device.clone()
259    }
260}
261
262/// Configuration for the device manager.
263#[derive(Debug, Clone)]
264pub struct ManagerConfig {
265    /// Default scan options.
266    pub scan_options: ScanOptions,
267    /// Default reconnect options for new devices.
268    pub default_reconnect_options: ReconnectOptions,
269    /// Event channel capacity.
270    pub event_capacity: usize,
271    /// Health check interval for auto-reconnect (base interval).
272    pub health_check_interval: Duration,
273    /// Maximum number of concurrent BLE connections.
274    ///
275    /// Most BLE adapters support 5-7 concurrent connections.
276    /// Attempting to connect beyond this limit will return an error.
277    /// Set to 0 for no limit (not recommended).
278    pub max_concurrent_connections: usize,
279    /// Whether to use adaptive health check intervals.
280    ///
281    /// When enabled, the health check interval will automatically adjust:
282    /// - Decrease (more frequent) when connections are unstable
283    /// - Increase (less frequent) when connections are stable
284    pub use_adaptive_interval: bool,
285    /// Minimum health check interval (for adaptive mode).
286    pub min_health_check_interval: Duration,
287    /// Maximum health check interval (for adaptive mode).
288    pub max_health_check_interval: Duration,
289    /// Default priority for new devices.
290    pub default_priority: DevicePriority,
291    /// Whether to use connection validation (keepalive checks).
292    ///
293    /// When enabled, health checks will use `device.validate_connection()`
294    /// which performs an actual BLE read to verify the connection is alive.
295    /// This catches "zombie connections" but uses more power.
296    pub use_connection_validation: bool,
297}
298
299impl Default for ManagerConfig {
300    fn default() -> Self {
301        // Use platform-specific defaults if available
302        let platform_config = crate::platform::PlatformConfig::for_current_platform();
303
304        Self {
305            scan_options: ScanOptions::default(),
306            default_reconnect_options: ReconnectOptions::default(),
307            event_capacity: 100,
308            health_check_interval: Duration::from_secs(30),
309            max_concurrent_connections: platform_config.max_concurrent_connections,
310            use_adaptive_interval: true,
311            min_health_check_interval: Duration::from_secs(5),
312            max_health_check_interval: Duration::from_secs(120),
313            default_priority: DevicePriority::Normal,
314            use_connection_validation: true,
315        }
316    }
317}
318
319impl ManagerConfig {
320    /// Create a configuration with a specific connection limit.
321    pub fn with_max_connections(mut self, max: usize) -> Self {
322        self.max_concurrent_connections = max;
323        self
324    }
325
326    /// Create a configuration with no connection limit (not recommended).
327    pub fn unlimited_connections(mut self) -> Self {
328        self.max_concurrent_connections = 0;
329        self
330    }
331
332    /// Enable or disable adaptive health check intervals.
333    pub fn adaptive_interval(mut self, enabled: bool) -> Self {
334        self.use_adaptive_interval = enabled;
335        self
336    }
337
338    /// Set the health check interval (base interval for adaptive mode).
339    pub fn health_check_interval(mut self, interval: Duration) -> Self {
340        self.health_check_interval = interval;
341        self
342    }
343
344    /// Set the default device priority.
345    pub fn default_priority(mut self, priority: DevicePriority) -> Self {
346        self.default_priority = priority;
347        self
348    }
349
350    /// Enable or disable connection validation in health checks.
351    pub fn connection_validation(mut self, enabled: bool) -> Self {
352        self.use_connection_validation = enabled;
353        self
354    }
355}
356
357/// Manager for multiple Aranet devices.
358pub struct DeviceManager {
359    /// Map of device ID to managed device.
360    devices: RwLock<HashMap<String, ManagedDevice>>,
361    /// Event dispatcher.
362    events: EventDispatcher,
363    /// Manager configuration.
364    config: ManagerConfig,
365}
366
367impl DeviceManager {
368    /// Create a new device manager.
369    pub fn new() -> Self {
370        Self::with_config(ManagerConfig::default())
371    }
372
373    /// Create a manager with custom event capacity.
374    pub fn with_event_capacity(capacity: usize) -> Self {
375        Self::with_config(ManagerConfig {
376            event_capacity: capacity,
377            ..Default::default()
378        })
379    }
380
381    /// Create a manager with full configuration.
382    pub fn with_config(config: ManagerConfig) -> Self {
383        Self {
384            devices: RwLock::new(HashMap::new()),
385            events: EventDispatcher::new(config.event_capacity),
386            config,
387        }
388    }
389
390    /// Get the event dispatcher for subscribing to events.
391    pub fn events(&self) -> &EventDispatcher {
392        &self.events
393    }
394
395    /// Get the manager configuration.
396    pub fn config(&self) -> &ManagerConfig {
397        &self.config
398    }
399
400    /// Scan for available devices.
401    pub async fn scan(&self) -> Result<Vec<DiscoveredDevice>> {
402        scan_with_options(self.config.scan_options.clone()).await
403    }
404
405    /// Scan with custom options.
406    pub async fn scan_with_options(&self, options: ScanOptions) -> Result<Vec<DiscoveredDevice>> {
407        let devices = scan_with_options(options).await?;
408
409        // Emit discovery events
410        for device in &devices {
411            self.events.send(DeviceEvent::Discovered {
412                device: DeviceId {
413                    id: device.identifier.clone(),
414                    name: device.name.clone(),
415                    device_type: device.device_type,
416                },
417                rssi: device.rssi,
418            });
419        }
420
421        Ok(devices)
422    }
423
424    /// Add a device to the manager by identifier.
425    pub async fn add_device(&self, identifier: &str) -> Result<()> {
426        self.add_device_with_options(identifier, self.config.default_reconnect_options.clone())
427            .await
428    }
429
430    /// Add a device with custom reconnect options.
431    pub async fn add_device_with_options(
432        &self,
433        identifier: &str,
434        reconnect_options: ReconnectOptions,
435    ) -> Result<()> {
436        let mut devices = self.devices.write().await;
437
438        if devices.contains_key(identifier) {
439            return Ok(()); // Already exists
440        }
441
442        let managed = ManagedDevice::with_reconnect_options(identifier, reconnect_options);
443        devices.insert(identifier.to_string(), managed);
444
445        info!("Added device to manager: {}", identifier);
446        Ok(())
447    }
448
449    /// Connect to a device.
450    ///
451    /// This method performs an atomic connect-or-skip operation:
452    /// - If the device doesn't exist, it's added and connected
453    /// - If the device exists but is not connected, it's connected
454    /// - If the device is already connected, this is a no-op
455    ///
456    /// # Connection Limits
457    ///
458    /// If `max_concurrent_connections` is set in the config and would be exceeded,
459    /// this method returns an error. Use `connected_count()` to check the current
460    /// number of connections before calling this method.
461    ///
462    /// The lock is held during the device entry update to prevent race conditions,
463    /// but released during the actual BLE connection to avoid blocking other operations.
464    pub async fn connect(&self, identifier: &str) -> Result<()> {
465        // Check if we need to connect (atomically check and mark as pending)
466        let reconnect_options = {
467            let mut devices = self.devices.write().await;
468
469            // Check connection limit before doing anything else
470            if self.config.max_concurrent_connections > 0 {
471                // Check if already connected (doesn't count toward limit)
472                let already_connected = devices
473                    .get(identifier)
474                    .map(|m| m.has_device())
475                    .unwrap_or(false);
476
477                if !already_connected {
478                    let current_connections = devices.values().filter(|m| m.has_device()).count();
479                    if current_connections >= self.config.max_concurrent_connections {
480                        warn!(
481                            "Connection limit reached ({}/{}), cannot connect to {}",
482                            current_connections, self.config.max_concurrent_connections, identifier
483                        );
484                        return Err(Error::connection_failed(
485                            Some(identifier.to_string()),
486                            crate::error::ConnectionFailureReason::Other(format!(
487                                "Connection limit reached ({}/{})",
488                                current_connections, self.config.max_concurrent_connections
489                            )),
490                        ));
491                    }
492                }
493            }
494
495            // Get or create the managed device entry
496            let managed = devices.entry(identifier.to_string()).or_insert_with(|| {
497                info!("Adding device to manager: {}", identifier);
498                ManagedDevice::with_reconnect_options(
499                    identifier,
500                    self.config.default_reconnect_options.clone(),
501                )
502            });
503
504            // If already connected, nothing to do
505            if managed.device.is_some() {
506                debug!("Device {} already has a connection handle", identifier);
507                return Ok(());
508            }
509
510            // Try to atomically set the connecting flag to prevent race conditions
511            // If another task is already connecting, return early
512            if managed
513                .connecting
514                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
515                .is_err()
516            {
517                debug!(
518                    "Another task is already connecting to device {}",
519                    identifier
520                );
521                return Ok(());
522            }
523
524            // Clone the reconnect options for use after releasing lock
525            managed.reconnect_options.clone()
526        };
527        // Lock is released here - other tasks can now access the device map
528
529        // Perform BLE connection (this may take time)
530        // Use the cloned reconnect_options if needed in the future
531        let _ = reconnect_options;
532        let connect_result = Device::connect(identifier).await;
533
534        // Handle connection result
535        let device = match connect_result {
536            Ok(d) => Arc::new(d),
537            Err(e) => {
538                // Clear the connecting flag on failure
539                let devices = self.devices.read().await;
540                if let Some(managed) = devices.get(identifier) {
541                    managed.connecting.store(false, Ordering::SeqCst);
542                }
543                return Err(e);
544            }
545        };
546
547        let info = device.read_device_info().await.ok();
548        let device_type = device.device_type();
549        let name = device.name().map(|s| s.to_string());
550
551        // Update the managed device atomically
552        {
553            let mut devices = self.devices.write().await;
554            if let Some(managed) = devices.get_mut(identifier) {
555                // Clear the connecting flag
556                managed.connecting.store(false, Ordering::SeqCst);
557
558                // Check if another task connected while we were connecting
559                // (shouldn't happen with the atomic flag, but be defensive)
560                if managed.device.is_some() {
561                    // Another task beat us to it - disconnect our connection
562                    debug!(
563                        "Another task connected {} while we were connecting, discarding our connection",
564                        identifier
565                    );
566                    drop(devices); // Release lock before async disconnect
567                    let _ = device.disconnect().await;
568                    return Ok(());
569                }
570
571                managed.device = Some(device);
572                managed.info = info.clone();
573                managed.device_type = device_type;
574                managed.name = name.clone();
575            } else {
576                // Device was removed while we were connecting - still connect but add it back
577                let mut managed = ManagedDevice::new(identifier);
578                managed.device = Some(device);
579                managed.info = info.clone();
580                managed.device_type = device_type;
581                managed.name = name.clone();
582                devices.insert(identifier.to_string(), managed);
583            }
584        }
585
586        // Emit event
587        self.events.send(DeviceEvent::Connected {
588            device: DeviceId {
589                id: identifier.to_string(),
590                name,
591                device_type,
592            },
593            info,
594        });
595
596        info!("Connected to device: {}", identifier);
597        Ok(())
598    }
599
600    /// Disconnect from a device.
601    pub async fn disconnect(&self, identifier: &str) -> Result<()> {
602        let device_arc = {
603            let mut devices = self.devices.write().await;
604            if let Some(managed) = devices.get_mut(identifier) {
605                managed.device.take()
606            } else {
607                None
608            }
609        };
610
611        // Disconnect outside the lock
612        if let Some(device) = device_arc {
613            device.disconnect().await?;
614            self.events.send(DeviceEvent::Disconnected {
615                device: DeviceId::new(identifier),
616                reason: DisconnectReason::UserRequested,
617            });
618        }
619
620        Ok(())
621    }
622
623    /// Remove a device from the manager.
624    pub async fn remove_device(&self, identifier: &str) -> Result<()> {
625        self.disconnect(identifier).await?;
626        self.devices.write().await.remove(identifier);
627        info!("Removed device from manager: {}", identifier);
628        Ok(())
629    }
630
631    /// Get a list of all managed device IDs.
632    pub async fn device_ids(&self) -> Vec<String> {
633        self.devices.read().await.keys().cloned().collect()
634    }
635
636    /// Get the number of managed devices.
637    pub async fn device_count(&self) -> usize {
638        self.devices.read().await.len()
639    }
640
641    /// Get the number of connected devices (fast, doesn't query BLE).
642    ///
643    /// This returns the number of devices that have an active device handle,
644    /// without querying the BLE stack. Use `connected_count_verified` for
645    /// an accurate count that queries each device.
646    pub async fn connected_count(&self) -> usize {
647        let devices = self.devices.read().await;
648        devices.values().filter(|m| m.has_device()).count()
649    }
650
651    /// Check if a new connection can be made without exceeding the limit.
652    ///
653    /// Returns `true` if another connection can be made, `false` if at limit.
654    /// Always returns `true` if `max_concurrent_connections` is 0 (unlimited).
655    pub async fn can_connect(&self) -> bool {
656        if self.config.max_concurrent_connections == 0 {
657            return true;
658        }
659        self.connected_count().await < self.config.max_concurrent_connections
660    }
661
662    /// Get the connection limit status.
663    ///
664    /// Returns (current_connections, max_connections). If max is 0, there is no limit.
665    pub async fn connection_status(&self) -> (usize, usize) {
666        (
667            self.connected_count().await,
668            self.config.max_concurrent_connections,
669        )
670    }
671
672    /// Get the number of available connection slots.
673    ///
674    /// Returns `None` if there is no connection limit (unlimited).
675    pub async fn available_connections(&self) -> Option<usize> {
676        if self.config.max_concurrent_connections == 0 {
677            return None;
678        }
679        let current = self.connected_count().await;
680        Some(
681            self.config
682                .max_concurrent_connections
683                .saturating_sub(current),
684        )
685    }
686
687    /// Get the number of connected devices (verified via BLE).
688    ///
689    /// This method queries each device to verify its connection status.
690    /// The lock is released before making BLE calls to avoid contention.
691    pub async fn connected_count_verified(&self) -> usize {
692        // Collect device handles while holding the lock briefly
693        let device_arcs: Vec<Arc<Device>> = {
694            let devices = self.devices.read().await;
695            devices.values().filter_map(|m| m.device_arc()).collect()
696        };
697        // Lock is released here
698
699        // Check connection status in parallel
700        let futures = device_arcs.iter().map(|d| d.is_connected());
701        let results = join_all(futures).await;
702
703        results.into_iter().filter(|&connected| connected).count()
704    }
705
706    /// Read current values from a specific device.
707    pub async fn read_current(&self, identifier: &str) -> Result<CurrentReading> {
708        // Get device Arc while holding the lock briefly
709        let device = {
710            let devices = self.devices.read().await;
711            let managed = devices
712                .get(identifier)
713                .ok_or_else(|| Error::device_not_found(identifier))?;
714            managed.device_arc().ok_or(Error::NotConnected)?
715        };
716        // Lock is released here
717
718        let reading = device.read_current().await?;
719
720        // Emit reading event
721        self.events.send(DeviceEvent::Reading {
722            device: DeviceId::new(identifier),
723            reading,
724        });
725
726        // Update cached reading
727        {
728            let mut devices = self.devices.write().await;
729            if let Some(managed) = devices.get_mut(identifier) {
730                managed.last_reading = Some(reading);
731            }
732        }
733
734        Ok(reading)
735    }
736
737    /// Read current values from all connected devices (in parallel).
738    ///
739    /// This method releases the lock before performing async BLE operations,
740    /// allowing other tasks to add/remove devices while reads are in progress.
741    /// All reads are performed in parallel for maximum performance.
742    pub async fn read_all(&self) -> HashMap<String, Result<CurrentReading>> {
743        // Collect device handles while holding the lock briefly
744        let devices_to_read: Vec<(String, Arc<Device>)> = {
745            let devices = self.devices.read().await;
746            devices
747                .iter()
748                .filter_map(|(id, managed)| managed.device_arc().map(|d| (id.clone(), d)))
749                .collect()
750        };
751        // Lock is released here
752
753        // Perform all reads in parallel
754        let read_futures = devices_to_read.iter().map(|(id, device)| {
755            let id = id.clone();
756            let device = Arc::clone(device);
757            async move {
758                let result = device.read_current().await;
759                (id, result)
760            }
761        });
762
763        let read_results: Vec<(String, Result<CurrentReading>)> = join_all(read_futures).await;
764
765        // Emit events and update cache
766        for (id, result) in &read_results {
767            if let Ok(reading) = result {
768                self.events.send(DeviceEvent::Reading {
769                    device: DeviceId::new(id),
770                    reading: *reading,
771                });
772            }
773        }
774
775        // Update cached readings
776        {
777            let mut devices = self.devices.write().await;
778            for (id, result) in &read_results {
779                if let Ok(reading) = result
780                    && let Some(managed) = devices.get_mut(id)
781                {
782                    managed.last_reading = Some(*reading);
783                }
784            }
785        }
786
787        read_results.into_iter().collect()
788    }
789
790    /// Connect to all known devices (in parallel).
791    ///
792    /// Returns a map of device IDs to connection results.
793    pub async fn connect_all(&self) -> HashMap<String, Result<()>> {
794        let ids: Vec<_> = self.devices.read().await.keys().cloned().collect();
795
796        // Note: We can't fully parallelize connect because it modifies state,
797        // but we can at least attempt connections concurrently
798        let connect_futures = ids.iter().map(|id| {
799            let id = id.clone();
800            async move {
801                let result = self.connect(&id).await;
802                (id, result)
803            }
804        });
805
806        join_all(connect_futures).await.into_iter().collect()
807    }
808
809    /// Disconnect from all devices (in parallel).
810    ///
811    /// Returns a map of device IDs to disconnection results.
812    pub async fn disconnect_all(&self) -> HashMap<String, Result<()>> {
813        // Collect all device arcs first
814        let devices_to_disconnect: Vec<(String, Arc<Device>)> = {
815            let mut devices = self.devices.write().await;
816            devices
817                .iter_mut()
818                .filter_map(|(id, managed)| managed.device.take().map(|d| (id.clone(), d)))
819                .collect()
820        };
821
822        // Disconnect all in parallel
823        let disconnect_futures = devices_to_disconnect.iter().map(|(id, device)| {
824            let id = id.clone();
825            let device = Arc::clone(device);
826            async move {
827                let result = device.disconnect().await;
828                (id, result)
829            }
830        });
831
832        let results: Vec<(String, Result<()>)> = join_all(disconnect_futures).await;
833
834        // Emit disconnection events
835        for (id, result) in &results {
836            if result.is_ok() {
837                self.events.send(DeviceEvent::Disconnected {
838                    device: DeviceId::new(id),
839                    reason: DisconnectReason::UserRequested,
840                });
841            }
842        }
843
844        results.into_iter().collect()
845    }
846
847    /// Check if a specific device is connected (fast, doesn't query BLE).
848    ///
849    /// This method attempts to check if a device has an active connection handle
850    /// without blocking. Returns `None` if the lock couldn't be acquired immediately,
851    /// or `Some(bool)` indicating whether the device has a connection handle.
852    ///
853    /// Note: This only checks if we have a device handle, not whether the actual
854    /// BLE connection is still alive. Use [`is_connected`](Self::is_connected) for
855    /// a verified check.
856    pub fn try_is_connected(&self, identifier: &str) -> Option<bool> {
857        // Try to acquire the lock without blocking
858        match self.devices.try_read() {
859            Ok(devices) => Some(
860                devices
861                    .get(identifier)
862                    .map(|m| m.has_device())
863                    .unwrap_or(false),
864            ),
865            Err(_) => None, // Lock was held, couldn't check
866        }
867    }
868
869    /// Check if a specific device is connected (verified via BLE).
870    ///
871    /// The lock is released before making the BLE call.
872    pub async fn is_connected(&self, identifier: &str) -> bool {
873        let device = {
874            let devices = self.devices.read().await;
875            devices.get(identifier).and_then(|m| m.device_arc())
876        };
877
878        if let Some(device) = device {
879            device.is_connected().await
880        } else {
881            false
882        }
883    }
884
885    /// Get device info for a specific device.
886    pub async fn get_device_info(&self, identifier: &str) -> Option<DeviceInfo> {
887        let devices = self.devices.read().await;
888        devices.get(identifier).and_then(|m| m.info.clone())
889    }
890
891    /// Get the last cached reading for a device.
892    pub async fn get_last_reading(&self, identifier: &str) -> Option<CurrentReading> {
893        let devices = self.devices.read().await;
894        devices.get(identifier).and_then(|m| m.last_reading)
895    }
896
897    /// Start a background health check task that monitors connection status.
898    ///
899    /// This spawns a task that periodically checks device connections and
900    /// attempts to reconnect devices that have auto_reconnect enabled.
901    ///
902    /// The task will run until the provided cancellation token is cancelled.
903    ///
904    /// # Adaptive Intervals
905    ///
906    /// If `use_adaptive_interval` is enabled in the config, the health check
907    /// interval will automatically adjust based on connection stability:
908    /// - When connections are stable, checks become less frequent (up to `max_health_check_interval`)
909    /// - When connections are unstable, checks become more frequent (down to `min_health_check_interval`)
910    ///
911    /// # Connection Validation
912    ///
913    /// If `use_connection_validation` is enabled, health checks will perform
914    /// an actual BLE read (`device.validate_connection()`) to catch "zombie connections"
915    /// where the BLE stack thinks it's connected but the device is out of range.
916    ///
917    /// # Example
918    ///
919    /// ```ignore
920    /// use tokio_util::sync::CancellationToken;
921    ///
922    /// let manager = Arc::new(DeviceManager::new());
923    /// let cancel = CancellationToken::new();
924    /// let handle = manager.start_health_monitor(cancel.clone());
925    ///
926    /// // Later, to stop the health monitor:
927    /// cancel.cancel();
928    /// handle.await.unwrap();
929    /// ```
930    pub fn start_health_monitor(
931        self: &Arc<Self>,
932        cancel_token: CancellationToken,
933    ) -> tokio::task::JoinHandle<()> {
934        let manager = Arc::clone(self);
935
936        tokio::spawn(async move {
937            // Initialize adaptive interval if enabled
938            let mut adaptive = if manager.config.use_adaptive_interval {
939                Some(AdaptiveInterval::new(
940                    manager.config.health_check_interval,
941                    manager.config.min_health_check_interval,
942                    manager.config.max_health_check_interval,
943                ))
944            } else {
945                None
946            };
947
948            loop {
949                // Get current interval
950                let current_interval = adaptive
951                    .as_ref()
952                    .map(|a| a.current())
953                    .unwrap_or(manager.config.health_check_interval);
954
955                tokio::select! {
956                    _ = cancel_token.cancelled() => {
957                        info!("Health monitor cancelled, shutting down");
958                        break;
959                    }
960                    _ = tokio::time::sleep(current_interval) => {
961                        let mut any_failures = false;
962                        let mut any_successes = false;
963
964                        // Get devices that need checking
965                        let devices_to_check: Vec<(String, Option<Arc<Device>>, bool, DevicePriority)> = {
966                            let devices = manager.devices.read().await;
967                            devices
968                                .iter()
969                                .map(|(id, m)| {
970                                    (
971                                        id.clone(),
972                                        m.device_arc(),
973                                        m.auto_reconnect,
974                                        m.priority,
975                                    )
976                                })
977                                .collect()
978                        };
979
980                        // Sort by priority (higher priority checked first)
981                        let mut sorted_devices = devices_to_check;
982                        sorted_devices.sort_by(|a, b| b.3.cmp(&a.3));
983
984                        for (id, device_opt, auto_reconnect, _priority) in sorted_devices {
985                            let should_reconnect = match device_opt {
986                                Some(device) => {
987                                    // Use connection validation if enabled
988                                    if manager.config.use_connection_validation {
989                                        !device.is_connection_alive().await
990                                    } else {
991                                        !device.is_connected().await
992                                    }
993                                }
994                                None => true,
995                            };
996
997                            if should_reconnect && auto_reconnect {
998                                debug!("Health monitor: attempting reconnect for {}", id);
999                                any_failures = true;
1000
1001                                match manager.connect(&id).await {
1002                                    Ok(()) => {
1003                                        any_successes = true;
1004                                        // Update success in managed device
1005                                        if let Some(m) = manager.devices.write().await.get_mut(&id) {
1006                                            m.record_success();
1007                                        }
1008                                    }
1009                                    Err(e) => {
1010                                        warn!("Health monitor: reconnect failed for {}: {}", id, e);
1011                                        // Update failure in managed device
1012                                        if let Some(m) = manager.devices.write().await.get_mut(&id) {
1013                                            m.record_failure();
1014                                        }
1015                                    }
1016                                }
1017                            } else if !should_reconnect {
1018                                any_successes = true;
1019                            }
1020                        }
1021
1022                        // Update adaptive interval
1023                        if let Some(ref mut adaptive) = adaptive {
1024                            if any_failures && !any_successes {
1025                                adaptive.on_failure();
1026                            } else if any_successes && !any_failures {
1027                                adaptive.on_success();
1028                            }
1029                            // Mixed results: don't change interval
1030                        }
1031                    }
1032                }
1033            }
1034        })
1035    }
1036
1037    /// Add a device with priority.
1038    pub async fn add_device_with_priority(
1039        &self,
1040        identifier: &str,
1041        priority: DevicePriority,
1042    ) -> Result<()> {
1043        let mut devices = self.devices.write().await;
1044
1045        if devices.contains_key(identifier) {
1046            // Update priority if device already exists
1047            if let Some(m) = devices.get_mut(identifier) {
1048                m.priority = priority;
1049            }
1050            return Ok(());
1051        }
1052
1053        let mut managed = ManagedDevice::new(identifier);
1054        managed.priority = priority;
1055        managed.reconnect_options = self.config.default_reconnect_options.clone();
1056        devices.insert(identifier.to_string(), managed);
1057
1058        info!(
1059            "Added device to manager with priority {:?}: {}",
1060            priority, identifier
1061        );
1062        Ok(())
1063    }
1064
1065    /// Get the lowest priority connected device that could be disconnected.
1066    ///
1067    /// Returns None if no devices can be disconnected (all are Critical priority or not connected).
1068    pub async fn lowest_priority_connected(&self) -> Option<String> {
1069        let devices = self.devices.read().await;
1070        devices
1071            .iter()
1072            .filter(|(_, m)| m.has_device() && m.priority != DevicePriority::Critical)
1073            .min_by_key(|(_, m)| m.priority)
1074            .map(|(id, _)| id.clone())
1075    }
1076
1077    /// Disconnect the lowest priority device to make room for a new connection.
1078    ///
1079    /// Returns Ok(true) if a device was disconnected, Ok(false) if no eligible device found.
1080    pub async fn evict_lowest_priority(&self) -> Result<bool> {
1081        if let Some(id) = self.lowest_priority_connected().await {
1082            info!("Evicting lowest priority device: {}", id);
1083            self.disconnect(&id).await?;
1084            Ok(true)
1085        } else {
1086            Ok(false)
1087        }
1088    }
1089
1090    /// Start hybrid monitoring using both passive (advertisement) and active connections.
1091    ///
1092    /// This is the most efficient way to monitor multiple devices:
1093    /// - **Passive monitoring**: Uses BLE advertisements to receive real-time readings
1094    ///   without maintaining connections. Lower power consumption, unlimited devices.
1095    /// - **Active connections**: Only established when needed (history download, settings changes).
1096    ///
1097    /// # Requirements
1098    ///
1099    /// Smart Home integration must be enabled on each device for passive monitoring.
1100    ///
1101    /// # Example
1102    ///
1103    /// ```ignore
1104    /// use tokio_util::sync::CancellationToken;
1105    ///
1106    /// let manager = Arc::new(DeviceManager::new());
1107    /// let cancel = CancellationToken::new();
1108    /// let handle = manager.start_hybrid_monitor(cancel.clone(), None);
1109    ///
1110    /// // Receive readings via manager events
1111    /// let mut rx = manager.events().subscribe();
1112    /// while let Ok(event) = rx.recv().await {
1113    ///     if let DeviceEvent::Reading { device, reading } = event {
1114    ///         println!("{}: CO2 = {} ppm", device.id, reading.co2);
1115    ///     }
1116    /// }
1117    /// ```
1118    pub fn start_hybrid_monitor(
1119        self: &Arc<Self>,
1120        cancel_token: CancellationToken,
1121        passive_options: Option<PassiveMonitorOptions>,
1122    ) -> tokio::task::JoinHandle<()> {
1123        let manager = Arc::clone(self);
1124        let options = passive_options.unwrap_or_default();
1125
1126        tokio::spawn(async move {
1127            info!("Starting hybrid monitor (passive + active)");
1128
1129            // Create passive monitor
1130            let passive_monitor = Arc::new(PassiveMonitor::new(options));
1131            let mut passive_rx = passive_monitor.subscribe();
1132
1133            // Start passive monitoring
1134            let passive_cancel = cancel_token.clone();
1135            let _passive_handle = passive_monitor.start(passive_cancel);
1136
1137            loop {
1138                tokio::select! {
1139                    _ = cancel_token.cancelled() => {
1140                        info!("Hybrid monitor cancelled");
1141                        break;
1142                    }
1143                    result = passive_rx.recv() => {
1144                        match result {
1145                            Ok(passive_reading) => {
1146                                // Convert passive reading to CurrentReading and emit event
1147                                if let Some(reading) = passive_reading_to_current(&passive_reading) {
1148                                    // Update last reading in managed device if it exists
1149                                    if let Some(m) = manager.devices.write().await.get_mut(&passive_reading.device_id) {
1150                                        m.last_reading = Some(reading);
1151                                        m.record_success();
1152                                    }
1153
1154                                    // Emit reading event
1155                                    manager.events.send(DeviceEvent::Reading {
1156                                        device: DeviceId {
1157                                            id: passive_reading.device_id.clone(),
1158                                            name: passive_reading.device_name.clone(),
1159                                            device_type: Some(passive_reading.data.device_type),
1160                                        },
1161                                        reading,
1162                                    });
1163                                }
1164                            }
1165                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1166                                warn!("Hybrid monitor lagged {} messages", n);
1167                            }
1168                            Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1169                                info!("Passive monitor channel closed");
1170                                break;
1171                            }
1172                        }
1173                    }
1174                }
1175            }
1176        })
1177    }
1178
1179    /// Get a reading using hybrid approach: try passive first, fall back to active.
1180    ///
1181    /// This method checks if a recent passive reading is available. If not,
1182    /// it establishes an active connection to read the value.
1183    ///
1184    /// # Arguments
1185    ///
1186    /// * `identifier` - Device identifier
1187    /// * `max_passive_age` - Maximum age of passive reading to accept (default: 60s)
1188    pub async fn read_hybrid(
1189        &self,
1190        identifier: &str,
1191        max_passive_age: Option<Duration>,
1192    ) -> Result<CurrentReading> {
1193        let max_age = max_passive_age.unwrap_or(Duration::from_secs(60));
1194
1195        // Check if we have a recent cached reading
1196        {
1197            let devices = self.devices.read().await;
1198            if let Some(managed) = devices.get(identifier)
1199                && let Some(reading) = managed.last_reading
1200            {
1201                // Check if the reading has a captured_at timestamp
1202                if let Some(captured) = reading.captured_at {
1203                    let age = time::OffsetDateTime::now_utc() - captured;
1204                    if age
1205                        < time::Duration::try_from(max_age).unwrap_or(time::Duration::seconds(60))
1206                    {
1207                        debug!("Using cached passive reading for {}", identifier);
1208                        return Ok(reading);
1209                    }
1210                }
1211            }
1212        }
1213
1214        // No recent passive reading, use active connection
1215        debug!(
1216            "No recent passive reading, using active connection for {}",
1217            identifier
1218        );
1219        self.read_current(identifier).await
1220    }
1221
1222    /// Check if a device supports passive monitoring (Smart Home enabled).
1223    ///
1224    /// This performs a quick scan to check if the device is broadcasting
1225    /// advertisement data with sensor readings.
1226    pub async fn supports_passive_monitoring(&self, identifier: &str) -> bool {
1227        // Create a short-lived passive monitor to check for advertisements
1228        let options = PassiveMonitorOptions::default()
1229            .scan_duration(Duration::from_secs(5))
1230            .filter_devices(vec![identifier.to_string()]);
1231
1232        let monitor = Arc::new(PassiveMonitor::new(options));
1233        let mut rx = monitor.subscribe();
1234        let cancel = CancellationToken::new();
1235
1236        let _handle = monitor.start(cancel.clone());
1237
1238        // Wait for a reading or timeout
1239        let result = tokio::time::timeout(Duration::from_secs(6), rx.recv()).await;
1240        cancel.cancel();
1241
1242        matches!(result, Ok(Ok(_)))
1243    }
1244}
1245
1246/// Convert a passive advertisement reading to a CurrentReading.
1247fn passive_reading_to_current(passive: &PassiveReading) -> Option<CurrentReading> {
1248    let data = &passive.data;
1249
1250    // We need at least some sensor data to create a reading
1251    if data.co2.is_none()
1252        && data.temperature.is_none()
1253        && data.humidity.is_none()
1254        && data.radon.is_none()
1255        && data.radiation_dose_rate.is_none()
1256    {
1257        return None;
1258    }
1259
1260    Some(CurrentReading {
1261        co2: data.co2.unwrap_or(0),
1262        temperature: data.temperature.unwrap_or(0.0),
1263        pressure: data.pressure.unwrap_or(0.0),
1264        humidity: data.humidity.unwrap_or(0),
1265        battery: data.battery,
1266        status: data.status,
1267        interval: data.interval,
1268        age: data.age,
1269        captured_at: Some(time::OffsetDateTime::now_utc()),
1270        radon: data.radon,
1271        radon_avg_24h: None,
1272        radon_avg_7d: None,
1273        radon_avg_30d: None,
1274        radiation_rate: data.radiation_dose_rate,
1275        radiation_total: None, // Not available in advertisement data
1276    })
1277}
1278
1279impl Default for DeviceManager {
1280    fn default() -> Self {
1281        Self::new()
1282    }
1283}
1284
1285#[cfg(test)]
1286mod tests {
1287    use super::*;
1288
1289    #[tokio::test]
1290    async fn test_manager_add_device() {
1291        let manager = DeviceManager::new();
1292        manager.add_device("test-device").await.unwrap();
1293
1294        assert_eq!(manager.device_count().await, 1);
1295        assert!(
1296            manager
1297                .device_ids()
1298                .await
1299                .contains(&"test-device".to_string())
1300        );
1301    }
1302
1303    #[tokio::test]
1304    async fn test_manager_remove_device() {
1305        let manager = DeviceManager::new();
1306        manager.add_device("test-device").await.unwrap();
1307        manager.remove_device("test-device").await.unwrap();
1308
1309        assert_eq!(manager.device_count().await, 0);
1310    }
1311
1312    #[tokio::test]
1313    async fn test_manager_not_connected_by_default() {
1314        let manager = DeviceManager::new();
1315        manager.add_device("test-device").await.unwrap();
1316
1317        assert!(!manager.is_connected("test-device").await);
1318        assert_eq!(manager.connected_count().await, 0);
1319    }
1320
1321    #[tokio::test]
1322    async fn test_manager_events() {
1323        let manager = DeviceManager::new();
1324        let _rx = manager.events().subscribe();
1325
1326        manager.add_device("test-device").await.unwrap();
1327
1328        // Events are only emitted for actual device operations
1329        assert_eq!(manager.events().receiver_count(), 1);
1330    }
1331}