Skip to main content

aranet_core/
manager.rs

1//! Multi-device management.
2//!
3//! This module provides a manager for handling multiple Aranet devices
4//! simultaneously, with connection pooling and concurrent operations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::Duration;
10
11use futures::future::join_all;
12use tokio::sync::RwLock;
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, info, warn};
15
16use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
17
18use crate::device::Device;
19use crate::error::{Error, Result};
20use crate::events::{DeviceEvent, DeviceId, DisconnectReason, EventDispatcher};
21use crate::passive::{PassiveMonitor, PassiveMonitorOptions, PassiveReading};
22use crate::reconnect::ReconnectOptions;
23use crate::scan::{DiscoveredDevice, ScanOptions, scan_with_options};
24
25/// Device priority levels for connection management.
26///
27/// When the connection limit is reached, lower priority devices
28/// may be disconnected to make room for higher priority devices.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
30pub enum DevicePriority {
31    /// Low priority - may be disconnected when at capacity.
32    Low,
33    /// Normal priority (default).
34    #[default]
35    Normal,
36    /// High priority - maintain connection, disconnect lower priorities if needed.
37    High,
38    /// Critical priority - never disconnect automatically.
39    Critical,
40}
41
42/// Adaptive interval that adjusts based on connection stability.
43///
44/// This is used by the health monitor to check connections more frequently
45/// when connections are unstable, and less frequently when stable.
46#[derive(Debug, Clone)]
47pub struct AdaptiveInterval {
48    /// Base interval when connections are stable.
49    pub base: Duration,
50    /// Current interval (may differ from base based on stability).
51    current: Duration,
52    /// Minimum interval (most frequent checking).
53    pub min: Duration,
54    /// Maximum interval (least frequent checking).
55    pub max: Duration,
56    /// Number of consecutive successes.
57    consecutive_successes: u32,
58    /// Number of consecutive failures.
59    consecutive_failures: u32,
60    /// Success threshold before increasing interval.
61    success_threshold: u32,
62    /// Failure threshold before decreasing interval.
63    failure_threshold: u32,
64}
65
66impl Default for AdaptiveInterval {
67    fn default() -> Self {
68        Self {
69            base: Duration::from_secs(30),
70            current: Duration::from_secs(30),
71            min: Duration::from_secs(5),
72            max: Duration::from_secs(120),
73            consecutive_successes: 0,
74            consecutive_failures: 0,
75            success_threshold: 3,
76            failure_threshold: 1,
77        }
78    }
79}
80
81impl AdaptiveInterval {
82    /// Create a new adaptive interval with custom settings.
83    pub fn new(base: Duration, min: Duration, max: Duration) -> Self {
84        Self {
85            base,
86            current: base,
87            min,
88            max,
89            ..Default::default()
90        }
91    }
92
93    /// Get the current interval.
94    pub fn current(&self) -> Duration {
95        self.current
96    }
97
98    /// Record a successful health check.
99    ///
100    /// After enough consecutive successes, the interval will increase
101    /// (less frequent checks) up to the maximum.
102    pub fn on_success(&mut self) {
103        self.consecutive_failures = 0;
104        self.consecutive_successes += 1;
105
106        if self.consecutive_successes >= self.success_threshold {
107            // Double the interval, capped at max
108            let new_interval = self.current.saturating_mul(2);
109            self.current = new_interval.min(self.max);
110            self.consecutive_successes = 0;
111            debug!(
112                "Health check stable, increasing interval to {:?}",
113                self.current
114            );
115        }
116    }
117
118    /// Record a failed health check (connection lost or reconnect needed).
119    ///
120    /// After enough consecutive failures, the interval will decrease
121    /// (more frequent checks) down to the minimum.
122    pub fn on_failure(&mut self) {
123        self.consecutive_successes = 0;
124        self.consecutive_failures += 1;
125
126        if self.consecutive_failures >= self.failure_threshold {
127            // Halve the interval, capped at min
128            let new_interval = self.current / 2;
129            self.current = new_interval.max(self.min);
130            self.consecutive_failures = 0;
131            debug!(
132                "Health check unstable, decreasing interval to {:?}",
133                self.current
134            );
135        }
136    }
137
138    /// Reset to the base interval.
139    pub fn reset(&mut self) {
140        self.current = self.base;
141        self.consecutive_successes = 0;
142        self.consecutive_failures = 0;
143    }
144}
145
146/// Information about a managed device.
147#[derive(Debug)]
148pub struct ManagedDevice {
149    /// Device identifier.
150    pub id: String,
151    /// Device name.
152    pub name: Option<String>,
153    /// Device type.
154    pub device_type: Option<DeviceType>,
155    /// The connected device (if connected).
156    /// Wrapped in Arc to allow concurrent access without holding the manager lock.
157    device: Option<Arc<Device>>,
158    /// Whether a connection attempt is currently in progress.
159    /// This prevents race conditions where multiple tasks try to connect simultaneously.
160    connecting: AtomicBool,
161    /// Whether auto-reconnect is enabled.
162    pub auto_reconnect: bool,
163    /// Last known reading.
164    pub last_reading: Option<CurrentReading>,
165    /// Device info.
166    pub info: Option<DeviceInfo>,
167    /// Reconnection options (if auto-reconnect is enabled).
168    pub reconnect_options: ReconnectOptions,
169    /// Device priority for connection management.
170    pub priority: DevicePriority,
171    /// Number of consecutive connection failures.
172    pub consecutive_failures: u32,
173    /// Last successful connection timestamp (Unix epoch millis).
174    pub last_success: Option<u64>,
175}
176
177impl ManagedDevice {
178    /// Create a new managed device entry.
179    pub fn new(id: &str) -> Self {
180        Self {
181            id: id.to_string(),
182            name: None,
183            device_type: None,
184            device: None,
185            connecting: AtomicBool::new(false),
186            auto_reconnect: true,
187            last_reading: None,
188            info: None,
189            reconnect_options: ReconnectOptions::default(),
190            priority: DevicePriority::default(),
191            consecutive_failures: 0,
192            last_success: None,
193        }
194    }
195
196    /// Create a managed device with custom reconnect options.
197    pub fn with_reconnect_options(id: &str, options: ReconnectOptions) -> Self {
198        Self {
199            reconnect_options: options,
200            ..Self::new(id)
201        }
202    }
203
204    /// Create a managed device with priority.
205    pub fn with_priority(id: &str, priority: DevicePriority) -> Self {
206        Self {
207            priority,
208            ..Self::new(id)
209        }
210    }
211
212    /// Create a managed device with reconnect options and priority.
213    pub fn with_options(id: &str, options: ReconnectOptions, priority: DevicePriority) -> Self {
214        Self {
215            reconnect_options: options,
216            priority,
217            ..Self::new(id)
218        }
219    }
220
221    /// Record a successful operation.
222    pub fn record_success(&mut self) {
223        self.consecutive_failures = 0;
224        self.last_success = Some(
225            std::time::SystemTime::now()
226                .duration_since(std::time::UNIX_EPOCH)
227                .unwrap_or_default()
228                .as_millis() as u64,
229        );
230    }
231
232    /// Record a failed operation.
233    pub fn record_failure(&mut self) {
234        self.consecutive_failures += 1;
235    }
236
237    /// Check if the device is connected (sync check, doesn't query BLE).
238    pub fn has_device(&self) -> bool {
239        self.device.is_some()
240    }
241
242    /// Check if the device is connected (async, queries BLE).
243    pub async fn is_connected(&self) -> bool {
244        if let Some(device) = &self.device {
245            device.is_connected().await
246        } else {
247            false
248        }
249    }
250
251    /// Get a reference to the underlying device.
252    pub fn device(&self) -> Option<&Arc<Device>> {
253        self.device.as_ref()
254    }
255
256    /// Get a clone of the device Arc.
257    pub fn device_arc(&self) -> Option<Arc<Device>> {
258        self.device.clone()
259    }
260}
261
262/// Configuration for the device manager.
263#[derive(Debug, Clone)]
264pub struct ManagerConfig {
265    /// Default scan options.
266    pub scan_options: ScanOptions,
267    /// Default reconnect options for new devices.
268    pub default_reconnect_options: ReconnectOptions,
269    /// Event channel capacity.
270    pub event_capacity: usize,
271    /// Health check interval for auto-reconnect (base interval).
272    pub health_check_interval: Duration,
273    /// Maximum number of concurrent BLE connections.
274    ///
275    /// Most BLE adapters support 5-7 concurrent connections.
276    /// Attempting to connect beyond this limit will return an error.
277    /// Set to 0 for no limit (not recommended).
278    pub max_concurrent_connections: usize,
279    /// Whether to use adaptive health check intervals.
280    ///
281    /// When enabled, the health check interval will automatically adjust:
282    /// - Decrease (more frequent) when connections are unstable
283    /// - Increase (less frequent) when connections are stable
284    pub use_adaptive_interval: bool,
285    /// Minimum health check interval (for adaptive mode).
286    pub min_health_check_interval: Duration,
287    /// Maximum health check interval (for adaptive mode).
288    pub max_health_check_interval: Duration,
289    /// Default priority for new devices.
290    pub default_priority: DevicePriority,
291    /// Whether to use connection validation (keepalive checks).
292    ///
293    /// When enabled, health checks will use `device.validate_connection()`
294    /// which performs an actual BLE read to verify the connection is alive.
295    /// This catches "zombie connections" but uses more power.
296    pub use_connection_validation: bool,
297}
298
299impl Default for ManagerConfig {
300    fn default() -> Self {
301        // Use platform-specific defaults if available
302        let platform_config = crate::platform::PlatformConfig::for_current_platform();
303
304        Self {
305            scan_options: ScanOptions::default(),
306            default_reconnect_options: ReconnectOptions::default(),
307            event_capacity: 100,
308            health_check_interval: Duration::from_secs(30),
309            max_concurrent_connections: platform_config.max_concurrent_connections,
310            use_adaptive_interval: true,
311            min_health_check_interval: Duration::from_secs(5),
312            max_health_check_interval: Duration::from_secs(120),
313            default_priority: DevicePriority::Normal,
314            use_connection_validation: true,
315        }
316    }
317}
318
319impl ManagerConfig {
320    /// Create a configuration with a specific connection limit.
321    pub fn with_max_connections(mut self, max: usize) -> Self {
322        self.max_concurrent_connections = max;
323        self
324    }
325
326    /// Create a configuration with no connection limit (not recommended).
327    pub fn unlimited_connections(mut self) -> Self {
328        self.max_concurrent_connections = 0;
329        self
330    }
331
332    /// Enable or disable adaptive health check intervals.
333    pub fn adaptive_interval(mut self, enabled: bool) -> Self {
334        self.use_adaptive_interval = enabled;
335        self
336    }
337
338    /// Set the health check interval (base interval for adaptive mode).
339    pub fn health_check_interval(mut self, interval: Duration) -> Self {
340        self.health_check_interval = interval;
341        self
342    }
343
344    /// Set the default device priority.
345    pub fn default_priority(mut self, priority: DevicePriority) -> Self {
346        self.default_priority = priority;
347        self
348    }
349
350    /// Enable or disable connection validation in health checks.
351    pub fn connection_validation(mut self, enabled: bool) -> Self {
352        self.use_connection_validation = enabled;
353        self
354    }
355}
356
357/// Manager for multiple Aranet devices.
358pub struct DeviceManager {
359    /// Map of device ID to managed device.
360    devices: RwLock<HashMap<String, ManagedDevice>>,
361    /// Event dispatcher.
362    events: EventDispatcher,
363    /// Manager configuration.
364    config: ManagerConfig,
365}
366
367impl DeviceManager {
368    /// Create a new device manager.
369    pub fn new() -> Self {
370        Self::with_config(ManagerConfig::default())
371    }
372
373    /// Create a manager with custom event capacity.
374    pub fn with_event_capacity(capacity: usize) -> Self {
375        Self::with_config(ManagerConfig {
376            event_capacity: capacity,
377            ..Default::default()
378        })
379    }
380
381    /// Create a manager with full configuration.
382    pub fn with_config(config: ManagerConfig) -> Self {
383        Self {
384            devices: RwLock::new(HashMap::new()),
385            events: EventDispatcher::new(config.event_capacity),
386            config,
387        }
388    }
389
390    /// Get the event dispatcher for subscribing to events.
391    pub fn events(&self) -> &EventDispatcher {
392        &self.events
393    }
394
395    /// Get the manager configuration.
396    pub fn config(&self) -> &ManagerConfig {
397        &self.config
398    }
399
400    /// Scan for available devices.
401    pub async fn scan(&self) -> Result<Vec<DiscoveredDevice>> {
402        scan_with_options(self.config.scan_options.clone()).await
403    }
404
405    /// Scan with custom options.
406    pub async fn scan_with_options(&self, options: ScanOptions) -> Result<Vec<DiscoveredDevice>> {
407        let devices = scan_with_options(options).await?;
408
409        // Emit discovery events
410        for device in &devices {
411            self.events.send(DeviceEvent::Discovered {
412                device: DeviceId {
413                    id: device.identifier.clone(),
414                    name: device.name.clone(),
415                    device_type: device.device_type,
416                },
417                rssi: device.rssi,
418            });
419        }
420
421        Ok(devices)
422    }
423
424    /// Add a device to the manager by identifier.
425    pub async fn add_device(&self, identifier: &str) -> Result<()> {
426        self.add_device_with_options(identifier, self.config.default_reconnect_options.clone())
427            .await
428    }
429
430    /// Add a device with custom reconnect options.
431    pub async fn add_device_with_options(
432        &self,
433        identifier: &str,
434        reconnect_options: ReconnectOptions,
435    ) -> Result<()> {
436        let mut devices = self.devices.write().await;
437
438        if devices.contains_key(identifier) {
439            return Ok(()); // Already exists
440        }
441
442        let managed = ManagedDevice::with_reconnect_options(identifier, reconnect_options);
443        devices.insert(identifier.to_string(), managed);
444
445        info!("Added device to manager: {}", identifier);
446        Ok(())
447    }
448
449    /// Connect to a device.
450    ///
451    /// This method performs an atomic connect-or-skip operation:
452    /// - If the device doesn't exist, it's added and connected
453    /// - If the device exists but is not connected, it's connected
454    /// - If the device is already connected, this is a no-op
455    ///
456    /// # Connection Limits
457    ///
458    /// If `max_concurrent_connections` is set in the config and would be exceeded,
459    /// this method returns an error. Use `connected_count()` to check the current
460    /// number of connections before calling this method.
461    ///
462    /// The lock is held during the device entry update to prevent race conditions,
463    /// but released during the actual BLE connection to avoid blocking other operations.
464    pub async fn connect(&self, identifier: &str) -> Result<()> {
465        // Check if we need to connect (atomically check and mark as pending)
466        let reconnect_options = {
467            let mut devices = self.devices.write().await;
468
469            // Check connection limit before doing anything else
470            if self.config.max_concurrent_connections > 0 {
471                // Check if already connected (doesn't count toward limit)
472                let already_connected = devices
473                    .get(identifier)
474                    .map(|m| m.has_device())
475                    .unwrap_or(false);
476
477                if !already_connected {
478                    let current_connections = devices.values().filter(|m| m.has_device()).count();
479                    if current_connections >= self.config.max_concurrent_connections {
480                        warn!(
481                            "Connection limit reached ({}/{}), cannot connect to {}",
482                            current_connections, self.config.max_concurrent_connections, identifier
483                        );
484                        return Err(Error::connection_failed(
485                            Some(identifier.to_string()),
486                            crate::error::ConnectionFailureReason::Other(format!(
487                                "Connection limit reached ({}/{})",
488                                current_connections, self.config.max_concurrent_connections
489                            )),
490                        ));
491                    }
492                }
493            }
494
495            // Get or create the managed device entry
496            let managed = devices.entry(identifier.to_string()).or_insert_with(|| {
497                info!("Adding device to manager: {}", identifier);
498                ManagedDevice::with_reconnect_options(
499                    identifier,
500                    self.config.default_reconnect_options.clone(),
501                )
502            });
503
504            // If already connected, nothing to do
505            if managed.device.is_some() {
506                debug!("Device {} already has a connection handle", identifier);
507                return Ok(());
508            }
509
510            // Try to atomically set the connecting flag to prevent race conditions.
511            // If another task is already connecting, return Ok(()) immediately.
512            //
513            // NOTE: The caller should verify `is_connected()` afterward if the
514            // connection must be established before proceeding, since this early
515            // return does not wait for the in-flight connection attempt to finish.
516            if managed
517                .connecting
518                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
519                .is_err()
520            {
521                debug!(
522                    "Another task is already connecting to device {}, returning early",
523                    identifier
524                );
525                return Ok(());
526            }
527
528            // Clone the reconnect options for use after releasing lock
529            managed.reconnect_options.clone()
530        };
531        // Lock is released here - other tasks can now access the device map
532
533        // Perform BLE connection (this may take time)
534        // Use the cloned reconnect_options if needed in the future
535        let _ = reconnect_options;
536        let connect_result = Device::connect(identifier).await;
537
538        // Handle connection result
539        let device = match connect_result {
540            Ok(d) => Arc::new(d),
541            Err(e) => {
542                // Clear the connecting flag on failure
543                let devices = self.devices.read().await;
544                if let Some(managed) = devices.get(identifier) {
545                    managed.connecting.store(false, Ordering::SeqCst);
546                }
547                return Err(e);
548            }
549        };
550
551        let info = device.read_device_info().await.ok();
552        let device_type = device.device_type();
553        let name = device.name().map(|s| s.to_string());
554
555        // Update the managed device atomically
556        {
557            let mut devices = self.devices.write().await;
558            if let Some(managed) = devices.get_mut(identifier) {
559                // Clear the connecting flag
560                managed.connecting.store(false, Ordering::SeqCst);
561
562                // Check if another task connected while we were connecting
563                // (shouldn't happen with the atomic flag, but be defensive)
564                if managed.device.is_some() {
565                    // Another task beat us to it - disconnect our connection
566                    debug!(
567                        "Another task connected {} while we were connecting, discarding our connection",
568                        identifier
569                    );
570                    drop(devices); // Release lock before async disconnect
571                    let _ = device.disconnect().await;
572                    return Ok(());
573                }
574
575                managed.device = Some(device);
576                managed.info = info.clone();
577                managed.device_type = device_type;
578                managed.name = name.clone();
579            } else {
580                // Device was removed while we were connecting - still connect but add it back
581                let mut managed = ManagedDevice::new(identifier);
582                managed.device = Some(device);
583                managed.info = info.clone();
584                managed.device_type = device_type;
585                managed.name = name.clone();
586                devices.insert(identifier.to_string(), managed);
587            }
588        }
589
590        // Emit event
591        self.events.send(DeviceEvent::Connected {
592            device: DeviceId {
593                id: identifier.to_string(),
594                name,
595                device_type,
596            },
597            info,
598        });
599
600        info!("Connected to device: {}", identifier);
601        Ok(())
602    }
603
604    /// Disconnect from a device.
605    pub async fn disconnect(&self, identifier: &str) -> Result<()> {
606        let device_arc = {
607            let mut devices = self.devices.write().await;
608            if let Some(managed) = devices.get_mut(identifier) {
609                managed.device.take()
610            } else {
611                None
612            }
613        };
614
615        // Disconnect outside the lock
616        if let Some(device) = device_arc {
617            device.disconnect().await?;
618            self.events.send(DeviceEvent::Disconnected {
619                device: DeviceId::new(identifier),
620                reason: DisconnectReason::UserRequested,
621            });
622        }
623
624        Ok(())
625    }
626
627    /// Remove a device from the manager.
628    pub async fn remove_device(&self, identifier: &str) -> Result<()> {
629        self.disconnect(identifier).await?;
630        self.devices.write().await.remove(identifier);
631        info!("Removed device from manager: {}", identifier);
632        Ok(())
633    }
634
635    /// Get a list of all managed device IDs.
636    pub async fn device_ids(&self) -> Vec<String> {
637        self.devices.read().await.keys().cloned().collect()
638    }
639
640    /// Get the number of managed devices.
641    pub async fn device_count(&self) -> usize {
642        self.devices.read().await.len()
643    }
644
645    /// Get the number of connected devices (fast, doesn't query BLE).
646    ///
647    /// This returns the number of devices that have an active device handle,
648    /// without querying the BLE stack. Use `connected_count_verified` for
649    /// an accurate count that queries each device.
650    pub async fn connected_count(&self) -> usize {
651        let devices = self.devices.read().await;
652        devices.values().filter(|m| m.has_device()).count()
653    }
654
655    /// Check if a new connection can be made without exceeding the limit.
656    ///
657    /// Returns `true` if another connection can be made, `false` if at limit.
658    /// Always returns `true` if `max_concurrent_connections` is 0 (unlimited).
659    pub async fn can_connect(&self) -> bool {
660        if self.config.max_concurrent_connections == 0 {
661            return true;
662        }
663        self.connected_count().await < self.config.max_concurrent_connections
664    }
665
666    /// Get the connection limit status.
667    ///
668    /// Returns (current_connections, max_connections). If max is 0, there is no limit.
669    pub async fn connection_status(&self) -> (usize, usize) {
670        (
671            self.connected_count().await,
672            self.config.max_concurrent_connections,
673        )
674    }
675
676    /// Get the number of available connection slots.
677    ///
678    /// Returns `None` if there is no connection limit (unlimited).
679    pub async fn available_connections(&self) -> Option<usize> {
680        if self.config.max_concurrent_connections == 0 {
681            return None;
682        }
683        let current = self.connected_count().await;
684        Some(
685            self.config
686                .max_concurrent_connections
687                .saturating_sub(current),
688        )
689    }
690
691    /// Get the number of connected devices (verified via BLE).
692    ///
693    /// This method queries each device to verify its connection status.
694    /// The lock is released before making BLE calls to avoid contention.
695    pub async fn connected_count_verified(&self) -> usize {
696        // Collect device handles while holding the lock briefly
697        let device_arcs: Vec<Arc<Device>> = {
698            let devices = self.devices.read().await;
699            devices.values().filter_map(|m| m.device_arc()).collect()
700        };
701        // Lock is released here
702
703        // Check connection status in parallel
704        let futures = device_arcs.iter().map(|d| d.is_connected());
705        let results = join_all(futures).await;
706
707        results.into_iter().filter(|&connected| connected).count()
708    }
709
710    /// Read current values from a specific device.
711    pub async fn read_current(&self, identifier: &str) -> Result<CurrentReading> {
712        // Get device Arc while holding the lock briefly
713        let device = {
714            let devices = self.devices.read().await;
715            let managed = devices
716                .get(identifier)
717                .ok_or_else(|| Error::device_not_found(identifier))?;
718            managed.device_arc().ok_or(Error::NotConnected)?
719        };
720        // Lock is released here
721
722        let reading = device.read_current().await?;
723
724        // Emit reading event
725        self.events.send(DeviceEvent::Reading {
726            device: DeviceId::new(identifier),
727            reading,
728        });
729
730        // Update cached reading
731        {
732            let mut devices = self.devices.write().await;
733            if let Some(managed) = devices.get_mut(identifier) {
734                managed.last_reading = Some(reading);
735            }
736        }
737
738        Ok(reading)
739    }
740
741    /// Read current values from all connected devices (in parallel).
742    ///
743    /// This method releases the lock before performing async BLE operations,
744    /// allowing other tasks to add/remove devices while reads are in progress.
745    /// All reads are performed in parallel for maximum performance.
746    pub async fn read_all(&self) -> HashMap<String, Result<CurrentReading>> {
747        // Collect device handles while holding the lock briefly
748        let devices_to_read: Vec<(String, Arc<Device>)> = {
749            let devices = self.devices.read().await;
750            devices
751                .iter()
752                .filter_map(|(id, managed)| managed.device_arc().map(|d| (id.clone(), d)))
753                .collect()
754        };
755        // Lock is released here
756
757        // Perform all reads in parallel
758        let read_futures = devices_to_read.iter().map(|(id, device)| {
759            let id = id.clone();
760            let device = Arc::clone(device);
761            async move {
762                let result = device.read_current().await;
763                (id, result)
764            }
765        });
766
767        let read_results: Vec<(String, Result<CurrentReading>)> = join_all(read_futures).await;
768
769        // Emit events and update cache
770        for (id, result) in &read_results {
771            if let Ok(reading) = result {
772                self.events.send(DeviceEvent::Reading {
773                    device: DeviceId::new(id),
774                    reading: *reading,
775                });
776            }
777        }
778
779        // Update cached readings
780        {
781            let mut devices = self.devices.write().await;
782            for (id, result) in &read_results {
783                if let Ok(reading) = result
784                    && let Some(managed) = devices.get_mut(id)
785                {
786                    managed.last_reading = Some(*reading);
787                }
788            }
789        }
790
791        read_results.into_iter().collect()
792    }
793
794    /// Connect to all known devices (in parallel).
795    ///
796    /// Returns a map of device IDs to connection results.
797    pub async fn connect_all(&self) -> HashMap<String, Result<()>> {
798        let ids: Vec<_> = self.devices.read().await.keys().cloned().collect();
799
800        // Note: We can't fully parallelize connect because it modifies state,
801        // but we can at least attempt connections concurrently
802        let connect_futures = ids.iter().map(|id| {
803            let id = id.clone();
804            async move {
805                let result = self.connect(&id).await;
806                (id, result)
807            }
808        });
809
810        join_all(connect_futures).await.into_iter().collect()
811    }
812
813    /// Disconnect from all devices (in parallel).
814    ///
815    /// Returns a map of device IDs to disconnection results.
816    pub async fn disconnect_all(&self) -> HashMap<String, Result<()>> {
817        // Collect all device arcs first
818        let devices_to_disconnect: Vec<(String, Arc<Device>)> = {
819            let mut devices = self.devices.write().await;
820            devices
821                .iter_mut()
822                .filter_map(|(id, managed)| managed.device.take().map(|d| (id.clone(), d)))
823                .collect()
824        };
825
826        // Disconnect all in parallel
827        let disconnect_futures = devices_to_disconnect.iter().map(|(id, device)| {
828            let id = id.clone();
829            let device = Arc::clone(device);
830            async move {
831                let result = device.disconnect().await;
832                (id, result)
833            }
834        });
835
836        let results: Vec<(String, Result<()>)> = join_all(disconnect_futures).await;
837
838        // Emit disconnection events
839        for (id, result) in &results {
840            if result.is_ok() {
841                self.events.send(DeviceEvent::Disconnected {
842                    device: DeviceId::new(id),
843                    reason: DisconnectReason::UserRequested,
844                });
845            }
846        }
847
848        results.into_iter().collect()
849    }
850
851    /// Check if a specific device is connected (fast, doesn't query BLE).
852    ///
853    /// This method attempts to check if a device has an active connection handle
854    /// without blocking. Returns `None` if the lock couldn't be acquired immediately,
855    /// or `Some(bool)` indicating whether the device has a connection handle.
856    ///
857    /// Note: This only checks if we have a device handle, not whether the actual
858    /// BLE connection is still alive. Use [`is_connected`](Self::is_connected) for
859    /// a verified check.
860    pub fn try_is_connected(&self, identifier: &str) -> Option<bool> {
861        // Try to acquire the lock without blocking
862        match self.devices.try_read() {
863            Ok(devices) => Some(
864                devices
865                    .get(identifier)
866                    .map(|m| m.has_device())
867                    .unwrap_or(false),
868            ),
869            Err(_) => None, // Lock was held, couldn't check
870        }
871    }
872
873    /// Check if a specific device is connected (verified via BLE).
874    ///
875    /// The lock is released before making the BLE call.
876    pub async fn is_connected(&self, identifier: &str) -> bool {
877        let device = {
878            let devices = self.devices.read().await;
879            devices.get(identifier).and_then(|m| m.device_arc())
880        };
881
882        if let Some(device) = device {
883            device.is_connected().await
884        } else {
885            false
886        }
887    }
888
889    /// Get device info for a specific device.
890    pub async fn get_device_info(&self, identifier: &str) -> Option<DeviceInfo> {
891        let devices = self.devices.read().await;
892        devices.get(identifier).and_then(|m| m.info.clone())
893    }
894
895    /// Get the last cached reading for a device.
896    pub async fn get_last_reading(&self, identifier: &str) -> Option<CurrentReading> {
897        let devices = self.devices.read().await;
898        devices.get(identifier).and_then(|m| m.last_reading)
899    }
900
901    /// Start a background health check task that monitors connection status.
902    ///
903    /// This spawns a task that periodically checks device connections and
904    /// attempts to reconnect devices that have auto_reconnect enabled.
905    ///
906    /// The task will run until the provided cancellation token is cancelled.
907    ///
908    /// # Adaptive Intervals
909    ///
910    /// If `use_adaptive_interval` is enabled in the config, the health check
911    /// interval will automatically adjust based on connection stability:
912    /// - When connections are stable, checks become less frequent (up to `max_health_check_interval`)
913    /// - When connections are unstable, checks become more frequent (down to `min_health_check_interval`)
914    ///
915    /// # Connection Validation
916    ///
917    /// If `use_connection_validation` is enabled, health checks will perform
918    /// an actual BLE read (`device.validate_connection()`) to catch "zombie connections"
919    /// where the BLE stack thinks it's connected but the device is out of range.
920    ///
921    /// # Example
922    ///
923    /// ```ignore
924    /// use tokio_util::sync::CancellationToken;
925    ///
926    /// let manager = Arc::new(DeviceManager::new());
927    /// let cancel = CancellationToken::new();
928    /// let handle = manager.start_health_monitor(cancel.clone());
929    ///
930    /// // Later, to stop the health monitor:
931    /// cancel.cancel();
932    /// handle.await.unwrap();
933    /// ```
934    pub fn start_health_monitor(
935        self: &Arc<Self>,
936        cancel_token: CancellationToken,
937    ) -> tokio::task::JoinHandle<()> {
938        let manager = Arc::clone(self);
939
940        tokio::spawn(async move {
941            // Initialize adaptive interval if enabled
942            let mut adaptive = if manager.config.use_adaptive_interval {
943                Some(AdaptiveInterval::new(
944                    manager.config.health_check_interval,
945                    manager.config.min_health_check_interval,
946                    manager.config.max_health_check_interval,
947                ))
948            } else {
949                None
950            };
951
952            loop {
953                // Get current interval
954                let current_interval = adaptive
955                    .as_ref()
956                    .map(|a| a.current())
957                    .unwrap_or(manager.config.health_check_interval);
958
959                tokio::select! {
960                    _ = cancel_token.cancelled() => {
961                        info!("Health monitor cancelled, shutting down");
962                        break;
963                    }
964                    _ = tokio::time::sleep(current_interval) => {
965                        let mut any_failures = false;
966                        let mut any_successes = false;
967
968                        // Get devices that need checking
969                        let devices_to_check: Vec<(String, Option<Arc<Device>>, bool, DevicePriority)> = {
970                            let devices = manager.devices.read().await;
971                            devices
972                                .iter()
973                                .map(|(id, m)| {
974                                    (
975                                        id.clone(),
976                                        m.device_arc(),
977                                        m.auto_reconnect,
978                                        m.priority,
979                                    )
980                                })
981                                .collect()
982                        };
983
984                        // Sort by priority (higher priority checked first)
985                        let mut sorted_devices = devices_to_check;
986                        sorted_devices.sort_by(|a, b| b.3.cmp(&a.3));
987
988                        for (id, device_opt, auto_reconnect, _priority) in sorted_devices {
989                            let should_reconnect = match device_opt {
990                                Some(device) => {
991                                    // Use connection validation if enabled
992                                    if manager.config.use_connection_validation {
993                                        !device.is_connection_alive().await
994                                    } else {
995                                        !device.is_connected().await
996                                    }
997                                }
998                                None => true,
999                            };
1000
1001                            if should_reconnect && auto_reconnect {
1002                                debug!("Health monitor: attempting reconnect for {}", id);
1003                                any_failures = true;
1004
1005                                match manager.connect(&id).await {
1006                                    Ok(()) => {
1007                                        any_successes = true;
1008                                        // Update success in managed device
1009                                        if let Some(m) = manager.devices.write().await.get_mut(&id) {
1010                                            m.record_success();
1011                                        }
1012                                    }
1013                                    Err(e) => {
1014                                        warn!("Health monitor: reconnect failed for {}: {}", id, e);
1015                                        // Update failure in managed device
1016                                        if let Some(m) = manager.devices.write().await.get_mut(&id) {
1017                                            m.record_failure();
1018                                        }
1019                                    }
1020                                }
1021                            } else if !should_reconnect {
1022                                any_successes = true;
1023                            }
1024                        }
1025
1026                        // Update adaptive interval
1027                        if let Some(ref mut adaptive) = adaptive {
1028                            if any_failures && !any_successes {
1029                                adaptive.on_failure();
1030                            } else if any_successes && !any_failures {
1031                                adaptive.on_success();
1032                            }
1033                            // Mixed results: don't change interval
1034                        }
1035                    }
1036                }
1037            }
1038        })
1039    }
1040
1041    /// Add a device with priority.
1042    pub async fn add_device_with_priority(
1043        &self,
1044        identifier: &str,
1045        priority: DevicePriority,
1046    ) -> Result<()> {
1047        let mut devices = self.devices.write().await;
1048
1049        if devices.contains_key(identifier) {
1050            // Update priority if device already exists
1051            if let Some(m) = devices.get_mut(identifier) {
1052                m.priority = priority;
1053            }
1054            return Ok(());
1055        }
1056
1057        let mut managed = ManagedDevice::new(identifier);
1058        managed.priority = priority;
1059        managed.reconnect_options = self.config.default_reconnect_options.clone();
1060        devices.insert(identifier.to_string(), managed);
1061
1062        info!(
1063            "Added device to manager with priority {:?}: {}",
1064            priority, identifier
1065        );
1066        Ok(())
1067    }
1068
1069    /// Get the lowest priority connected device that could be disconnected.
1070    ///
1071    /// Returns None if no devices can be disconnected (all are Critical priority or not connected).
1072    pub async fn lowest_priority_connected(&self) -> Option<String> {
1073        let devices = self.devices.read().await;
1074        devices
1075            .iter()
1076            .filter(|(_, m)| m.has_device() && m.priority != DevicePriority::Critical)
1077            .min_by_key(|(_, m)| m.priority)
1078            .map(|(id, _)| id.clone())
1079    }
1080
1081    /// Disconnect the lowest priority device to make room for a new connection.
1082    ///
1083    /// Returns Ok(true) if a device was disconnected, Ok(false) if no eligible device found.
1084    pub async fn evict_lowest_priority(&self) -> Result<bool> {
1085        if let Some(id) = self.lowest_priority_connected().await {
1086            info!("Evicting lowest priority device: {}", id);
1087            self.disconnect(&id).await?;
1088            Ok(true)
1089        } else {
1090            Ok(false)
1091        }
1092    }
1093
1094    /// Start hybrid monitoring using both passive (advertisement) and active connections.
1095    ///
1096    /// This is the most efficient way to monitor multiple devices:
1097    /// - **Passive monitoring**: Uses BLE advertisements to receive real-time readings
1098    ///   without maintaining connections. Lower power consumption, unlimited devices.
1099    /// - **Active connections**: Only established when needed (history download, settings changes).
1100    ///
1101    /// # Requirements
1102    ///
1103    /// Smart Home integration must be enabled on each device for passive monitoring.
1104    ///
1105    /// # Example
1106    ///
1107    /// ```ignore
1108    /// use tokio_util::sync::CancellationToken;
1109    ///
1110    /// let manager = Arc::new(DeviceManager::new());
1111    /// let cancel = CancellationToken::new();
1112    /// let handle = manager.start_hybrid_monitor(cancel.clone(), None);
1113    ///
1114    /// // Receive readings via manager events
1115    /// let mut rx = manager.events().subscribe();
1116    /// while let Ok(event) = rx.recv().await {
1117    ///     if let DeviceEvent::Reading { device, reading } = event {
1118    ///         println!("{}: CO2 = {} ppm", device.id, reading.co2);
1119    ///     }
1120    /// }
1121    /// ```
1122    pub fn start_hybrid_monitor(
1123        self: &Arc<Self>,
1124        cancel_token: CancellationToken,
1125        passive_options: Option<PassiveMonitorOptions>,
1126    ) -> tokio::task::JoinHandle<()> {
1127        let manager = Arc::clone(self);
1128        let options = passive_options.unwrap_or_default();
1129
1130        tokio::spawn(async move {
1131            info!("Starting hybrid monitor (passive + active)");
1132
1133            // Create passive monitor
1134            let passive_monitor = Arc::new(PassiveMonitor::new(options));
1135            let mut passive_rx = passive_monitor.subscribe();
1136
1137            // Start passive monitoring
1138            let passive_cancel = cancel_token.clone();
1139            let _passive_handle = passive_monitor.start(passive_cancel);
1140
1141            loop {
1142                tokio::select! {
1143                    _ = cancel_token.cancelled() => {
1144                        info!("Hybrid monitor cancelled");
1145                        break;
1146                    }
1147                    result = passive_rx.recv() => {
1148                        match result {
1149                            Ok(passive_reading) => {
1150                                // Convert passive reading to CurrentReading and emit event
1151                                if let Some(reading) = passive_reading_to_current(&passive_reading) {
1152                                    // Update last reading in managed device if it exists
1153                                    if let Some(m) = manager.devices.write().await.get_mut(&passive_reading.device_id) {
1154                                        m.last_reading = Some(reading);
1155                                        m.record_success();
1156                                    }
1157
1158                                    // Emit reading event
1159                                    manager.events.send(DeviceEvent::Reading {
1160                                        device: DeviceId {
1161                                            id: passive_reading.device_id.clone(),
1162                                            name: passive_reading.device_name.clone(),
1163                                            device_type: Some(passive_reading.data.device_type),
1164                                        },
1165                                        reading,
1166                                    });
1167                                }
1168                            }
1169                            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1170                                warn!("Hybrid monitor lagged {} messages", n);
1171                            }
1172                            Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1173                                info!("Passive monitor channel closed");
1174                                break;
1175                            }
1176                        }
1177                    }
1178                }
1179            }
1180        })
1181    }
1182
1183    /// Get a reading using hybrid approach: try passive first, fall back to active.
1184    ///
1185    /// This method checks if a recent passive reading is available. If not,
1186    /// it establishes an active connection to read the value.
1187    ///
1188    /// # Arguments
1189    ///
1190    /// * `identifier` - Device identifier
1191    /// * `max_passive_age` - Maximum age of passive reading to accept (default: 60s)
1192    pub async fn read_hybrid(
1193        &self,
1194        identifier: &str,
1195        max_passive_age: Option<Duration>,
1196    ) -> Result<CurrentReading> {
1197        let max_age = max_passive_age.unwrap_or(Duration::from_secs(60));
1198
1199        // Check if we have a recent cached reading
1200        {
1201            let devices = self.devices.read().await;
1202            if let Some(managed) = devices.get(identifier)
1203                && let Some(reading) = managed.last_reading
1204            {
1205                // Check if the reading has a captured_at timestamp
1206                if let Some(captured) = reading.captured_at {
1207                    let age = time::OffsetDateTime::now_utc() - captured;
1208                    if age
1209                        < time::Duration::try_from(max_age).unwrap_or(time::Duration::seconds(60))
1210                    {
1211                        debug!("Using cached passive reading for {}", identifier);
1212                        return Ok(reading);
1213                    }
1214                }
1215            }
1216        }
1217
1218        // No recent passive reading, use active connection
1219        debug!(
1220            "No recent passive reading, using active connection for {}",
1221            identifier
1222        );
1223        self.read_current(identifier).await
1224    }
1225
1226    /// Check if a device supports passive monitoring (Smart Home enabled).
1227    ///
1228    /// This performs a quick scan to check if the device is broadcasting
1229    /// advertisement data with sensor readings.
1230    pub async fn supports_passive_monitoring(&self, identifier: &str) -> bool {
1231        // Create a short-lived passive monitor to check for advertisements
1232        let options = PassiveMonitorOptions::default()
1233            .scan_duration(Duration::from_secs(5))
1234            .filter_devices(vec![identifier.to_string()]);
1235
1236        let monitor = Arc::new(PassiveMonitor::new(options));
1237        let mut rx = monitor.subscribe();
1238        let cancel = CancellationToken::new();
1239
1240        let _handle = monitor.start(cancel.clone());
1241
1242        // Wait for a reading or timeout
1243        let result = tokio::time::timeout(Duration::from_secs(6), rx.recv()).await;
1244        cancel.cancel();
1245
1246        matches!(result, Ok(Ok(_)))
1247    }
1248}
1249
1250/// Convert a passive advertisement reading to a CurrentReading.
1251fn passive_reading_to_current(passive: &PassiveReading) -> Option<CurrentReading> {
1252    let data = &passive.data;
1253
1254    // We need at least some sensor data to create a reading
1255    if data.co2.is_none()
1256        && data.temperature.is_none()
1257        && data.humidity.is_none()
1258        && data.radon.is_none()
1259        && data.radiation_dose_rate.is_none()
1260    {
1261        return None;
1262    }
1263
1264    Some(CurrentReading {
1265        co2: data.co2.unwrap_or(0),
1266        temperature: data.temperature.unwrap_or(0.0),
1267        pressure: data.pressure.unwrap_or(0.0),
1268        humidity: data.humidity.unwrap_or(0),
1269        battery: data.battery,
1270        status: data.status,
1271        interval: data.interval,
1272        age: data.age,
1273        captured_at: Some(time::OffsetDateTime::now_utc()),
1274        radon: data.radon,
1275        radon_avg_24h: None,
1276        radon_avg_7d: None,
1277        radon_avg_30d: None,
1278        radiation_rate: data.radiation_dose_rate,
1279        radiation_total: None, // Not available in advertisement data
1280    })
1281}
1282
1283impl Default for DeviceManager {
1284    fn default() -> Self {
1285        Self::new()
1286    }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    use super::*;
1292
1293    #[tokio::test]
1294    async fn test_manager_add_device() {
1295        let manager = DeviceManager::new();
1296        manager.add_device("test-device").await.unwrap();
1297
1298        assert_eq!(manager.device_count().await, 1);
1299        assert!(
1300            manager
1301                .device_ids()
1302                .await
1303                .contains(&"test-device".to_string())
1304        );
1305    }
1306
1307    #[tokio::test]
1308    async fn test_manager_remove_device() {
1309        let manager = DeviceManager::new();
1310        manager.add_device("test-device").await.unwrap();
1311        manager.remove_device("test-device").await.unwrap();
1312
1313        assert_eq!(manager.device_count().await, 0);
1314    }
1315
1316    #[tokio::test]
1317    async fn test_manager_not_connected_by_default() {
1318        let manager = DeviceManager::new();
1319        manager.add_device("test-device").await.unwrap();
1320
1321        assert!(!manager.is_connected("test-device").await);
1322        assert_eq!(manager.connected_count().await, 0);
1323    }
1324
1325    #[tokio::test]
1326    async fn test_manager_events() {
1327        let manager = DeviceManager::new();
1328        let _rx = manager.events().subscribe();
1329
1330        manager.add_device("test-device").await.unwrap();
1331
1332        // Events are only emitted for actual device operations
1333        assert_eq!(manager.events().receiver_count(), 1);
1334    }
1335}