1use std::collections::HashMap;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::Duration;
10
11use futures::future::join_all;
12use tokio::sync::RwLock;
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, info, warn};
15
16use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
17
18use crate::device::Device;
19use crate::error::{Error, Result};
20use crate::events::{DeviceEvent, DeviceId, DisconnectReason, EventDispatcher};
21use crate::passive::{PassiveMonitor, PassiveMonitorOptions, PassiveReading};
22use crate::reconnect::ReconnectOptions;
23use crate::scan::{DiscoveredDevice, ScanOptions, scan_with_options};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
30pub enum DevicePriority {
31 Low,
33 #[default]
35 Normal,
36 High,
38 Critical,
40}
41
42#[derive(Debug, Clone)]
47pub struct AdaptiveInterval {
48 pub base: Duration,
50 current: Duration,
52 pub min: Duration,
54 pub max: Duration,
56 consecutive_successes: u32,
58 consecutive_failures: u32,
60 success_threshold: u32,
62 failure_threshold: u32,
64}
65
66impl Default for AdaptiveInterval {
67 fn default() -> Self {
68 Self {
69 base: Duration::from_secs(30),
70 current: Duration::from_secs(30),
71 min: Duration::from_secs(5),
72 max: Duration::from_secs(120),
73 consecutive_successes: 0,
74 consecutive_failures: 0,
75 success_threshold: 3,
76 failure_threshold: 1,
77 }
78 }
79}
80
81impl AdaptiveInterval {
82 pub fn new(base: Duration, min: Duration, max: Duration) -> Self {
84 Self {
85 base,
86 current: base,
87 min,
88 max,
89 ..Default::default()
90 }
91 }
92
93 pub fn current(&self) -> Duration {
95 self.current
96 }
97
98 pub fn on_success(&mut self) {
103 self.consecutive_failures = 0;
104 self.consecutive_successes += 1;
105
106 if self.consecutive_successes >= self.success_threshold {
107 let new_interval = self.current.saturating_mul(2);
109 self.current = new_interval.min(self.max);
110 self.consecutive_successes = 0;
111 debug!(
112 "Health check stable, increasing interval to {:?}",
113 self.current
114 );
115 }
116 }
117
118 pub fn on_failure(&mut self) {
123 self.consecutive_successes = 0;
124 self.consecutive_failures += 1;
125
126 if self.consecutive_failures >= self.failure_threshold {
127 let new_interval = self.current / 2;
129 self.current = new_interval.max(self.min);
130 self.consecutive_failures = 0;
131 debug!(
132 "Health check unstable, decreasing interval to {:?}",
133 self.current
134 );
135 }
136 }
137
138 pub fn reset(&mut self) {
140 self.current = self.base;
141 self.consecutive_successes = 0;
142 self.consecutive_failures = 0;
143 }
144}
145
146#[derive(Debug)]
148pub struct ManagedDevice {
149 pub id: String,
151 pub name: Option<String>,
153 pub device_type: Option<DeviceType>,
155 device: Option<Arc<Device>>,
158 connecting: AtomicBool,
161 pub auto_reconnect: bool,
163 pub last_reading: Option<CurrentReading>,
165 pub info: Option<DeviceInfo>,
167 pub reconnect_options: ReconnectOptions,
169 pub priority: DevicePriority,
171 pub consecutive_failures: u32,
173 pub last_success: Option<u64>,
175}
176
177impl ManagedDevice {
178 pub fn new(id: &str) -> Self {
180 Self {
181 id: id.to_string(),
182 name: None,
183 device_type: None,
184 device: None,
185 connecting: AtomicBool::new(false),
186 auto_reconnect: true,
187 last_reading: None,
188 info: None,
189 reconnect_options: ReconnectOptions::default(),
190 priority: DevicePriority::default(),
191 consecutive_failures: 0,
192 last_success: None,
193 }
194 }
195
196 pub fn with_reconnect_options(id: &str, options: ReconnectOptions) -> Self {
198 Self {
199 reconnect_options: options,
200 ..Self::new(id)
201 }
202 }
203
204 pub fn with_priority(id: &str, priority: DevicePriority) -> Self {
206 Self {
207 priority,
208 ..Self::new(id)
209 }
210 }
211
212 pub fn with_options(id: &str, options: ReconnectOptions, priority: DevicePriority) -> Self {
214 Self {
215 reconnect_options: options,
216 priority,
217 ..Self::new(id)
218 }
219 }
220
221 pub fn record_success(&mut self) {
223 self.consecutive_failures = 0;
224 self.last_success = Some(
225 std::time::SystemTime::now()
226 .duration_since(std::time::UNIX_EPOCH)
227 .unwrap_or_default()
228 .as_millis() as u64,
229 );
230 }
231
232 pub fn record_failure(&mut self) {
234 self.consecutive_failures += 1;
235 }
236
237 pub fn has_device(&self) -> bool {
239 self.device.is_some()
240 }
241
242 pub async fn is_connected(&self) -> bool {
244 if let Some(device) = &self.device {
245 device.is_connected().await
246 } else {
247 false
248 }
249 }
250
251 pub fn device(&self) -> Option<&Arc<Device>> {
253 self.device.as_ref()
254 }
255
256 pub fn device_arc(&self) -> Option<Arc<Device>> {
258 self.device.clone()
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct ManagerConfig {
265 pub scan_options: ScanOptions,
267 pub default_reconnect_options: ReconnectOptions,
269 pub event_capacity: usize,
271 pub health_check_interval: Duration,
273 pub max_concurrent_connections: usize,
279 pub use_adaptive_interval: bool,
285 pub min_health_check_interval: Duration,
287 pub max_health_check_interval: Duration,
289 pub default_priority: DevicePriority,
291 pub use_connection_validation: bool,
297}
298
299impl Default for ManagerConfig {
300 fn default() -> Self {
301 let platform_config = crate::platform::PlatformConfig::for_current_platform();
303
304 Self {
305 scan_options: ScanOptions::default(),
306 default_reconnect_options: ReconnectOptions::default(),
307 event_capacity: 100,
308 health_check_interval: Duration::from_secs(30),
309 max_concurrent_connections: platform_config.max_concurrent_connections,
310 use_adaptive_interval: true,
311 min_health_check_interval: Duration::from_secs(5),
312 max_health_check_interval: Duration::from_secs(120),
313 default_priority: DevicePriority::Normal,
314 use_connection_validation: true,
315 }
316 }
317}
318
319impl ManagerConfig {
320 pub fn with_max_connections(mut self, max: usize) -> Self {
322 self.max_concurrent_connections = max;
323 self
324 }
325
326 pub fn unlimited_connections(mut self) -> Self {
328 self.max_concurrent_connections = 0;
329 self
330 }
331
332 pub fn adaptive_interval(mut self, enabled: bool) -> Self {
334 self.use_adaptive_interval = enabled;
335 self
336 }
337
338 pub fn health_check_interval(mut self, interval: Duration) -> Self {
340 self.health_check_interval = interval;
341 self
342 }
343
344 pub fn default_priority(mut self, priority: DevicePriority) -> Self {
346 self.default_priority = priority;
347 self
348 }
349
350 pub fn connection_validation(mut self, enabled: bool) -> Self {
352 self.use_connection_validation = enabled;
353 self
354 }
355}
356
357pub struct DeviceManager {
359 devices: RwLock<HashMap<String, ManagedDevice>>,
361 events: EventDispatcher,
363 config: ManagerConfig,
365}
366
367impl DeviceManager {
368 pub fn new() -> Self {
370 Self::with_config(ManagerConfig::default())
371 }
372
373 pub fn with_event_capacity(capacity: usize) -> Self {
375 Self::with_config(ManagerConfig {
376 event_capacity: capacity,
377 ..Default::default()
378 })
379 }
380
381 pub fn with_config(config: ManagerConfig) -> Self {
383 Self {
384 devices: RwLock::new(HashMap::new()),
385 events: EventDispatcher::new(config.event_capacity),
386 config,
387 }
388 }
389
390 pub fn events(&self) -> &EventDispatcher {
392 &self.events
393 }
394
395 pub fn config(&self) -> &ManagerConfig {
397 &self.config
398 }
399
400 pub async fn scan(&self) -> Result<Vec<DiscoveredDevice>> {
402 scan_with_options(self.config.scan_options.clone()).await
403 }
404
405 pub async fn scan_with_options(&self, options: ScanOptions) -> Result<Vec<DiscoveredDevice>> {
407 let devices = scan_with_options(options).await?;
408
409 for device in &devices {
411 self.events.send(DeviceEvent::Discovered {
412 device: DeviceId {
413 id: device.identifier.clone(),
414 name: device.name.clone(),
415 device_type: device.device_type,
416 },
417 rssi: device.rssi,
418 });
419 }
420
421 Ok(devices)
422 }
423
424 pub async fn add_device(&self, identifier: &str) -> Result<()> {
426 self.add_device_with_options(identifier, self.config.default_reconnect_options.clone())
427 .await
428 }
429
430 pub async fn add_device_with_options(
432 &self,
433 identifier: &str,
434 reconnect_options: ReconnectOptions,
435 ) -> Result<()> {
436 let mut devices = self.devices.write().await;
437
438 if devices.contains_key(identifier) {
439 return Ok(()); }
441
442 let managed = ManagedDevice::with_reconnect_options(identifier, reconnect_options);
443 devices.insert(identifier.to_string(), managed);
444
445 info!("Added device to manager: {}", identifier);
446 Ok(())
447 }
448
449 pub async fn connect(&self, identifier: &str) -> Result<()> {
465 let reconnect_options = {
467 let mut devices = self.devices.write().await;
468
469 if self.config.max_concurrent_connections > 0 {
471 let already_connected = devices
473 .get(identifier)
474 .map(|m| m.has_device())
475 .unwrap_or(false);
476
477 if !already_connected {
478 let current_connections = devices.values().filter(|m| m.has_device()).count();
479 if current_connections >= self.config.max_concurrent_connections {
480 warn!(
481 "Connection limit reached ({}/{}), cannot connect to {}",
482 current_connections, self.config.max_concurrent_connections, identifier
483 );
484 return Err(Error::connection_failed(
485 Some(identifier.to_string()),
486 crate::error::ConnectionFailureReason::Other(format!(
487 "Connection limit reached ({}/{})",
488 current_connections, self.config.max_concurrent_connections
489 )),
490 ));
491 }
492 }
493 }
494
495 let managed = devices.entry(identifier.to_string()).or_insert_with(|| {
497 info!("Adding device to manager: {}", identifier);
498 ManagedDevice::with_reconnect_options(
499 identifier,
500 self.config.default_reconnect_options.clone(),
501 )
502 });
503
504 if managed.device.is_some() {
506 debug!("Device {} already has a connection handle", identifier);
507 return Ok(());
508 }
509
510 if managed
517 .connecting
518 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
519 .is_err()
520 {
521 debug!(
522 "Another task is already connecting to device {}, returning early",
523 identifier
524 );
525 return Ok(());
526 }
527
528 managed.reconnect_options.clone()
530 };
531 let _ = reconnect_options;
536 let connect_result = Device::connect(identifier).await;
537
538 let device = match connect_result {
540 Ok(d) => Arc::new(d),
541 Err(e) => {
542 let devices = self.devices.read().await;
544 if let Some(managed) = devices.get(identifier) {
545 managed.connecting.store(false, Ordering::SeqCst);
546 }
547 return Err(e);
548 }
549 };
550
551 let info = device.read_device_info().await.ok();
552 let device_type = device.device_type();
553 let name = device.name().map(|s| s.to_string());
554
555 {
557 let mut devices = self.devices.write().await;
558 if let Some(managed) = devices.get_mut(identifier) {
559 managed.connecting.store(false, Ordering::SeqCst);
561
562 if managed.device.is_some() {
565 debug!(
567 "Another task connected {} while we were connecting, discarding our connection",
568 identifier
569 );
570 drop(devices); let _ = device.disconnect().await;
572 return Ok(());
573 }
574
575 managed.device = Some(device);
576 managed.info = info.clone();
577 managed.device_type = device_type;
578 managed.name = name.clone();
579 } else {
580 let mut managed = ManagedDevice::new(identifier);
582 managed.device = Some(device);
583 managed.info = info.clone();
584 managed.device_type = device_type;
585 managed.name = name.clone();
586 devices.insert(identifier.to_string(), managed);
587 }
588 }
589
590 self.events.send(DeviceEvent::Connected {
592 device: DeviceId {
593 id: identifier.to_string(),
594 name,
595 device_type,
596 },
597 info,
598 });
599
600 info!("Connected to device: {}", identifier);
601 Ok(())
602 }
603
604 pub async fn disconnect(&self, identifier: &str) -> Result<()> {
606 let device_arc = {
607 let mut devices = self.devices.write().await;
608 if let Some(managed) = devices.get_mut(identifier) {
609 managed.device.take()
610 } else {
611 None
612 }
613 };
614
615 if let Some(device) = device_arc {
617 device.disconnect().await?;
618 self.events.send(DeviceEvent::Disconnected {
619 device: DeviceId::new(identifier),
620 reason: DisconnectReason::UserRequested,
621 });
622 }
623
624 Ok(())
625 }
626
627 pub async fn remove_device(&self, identifier: &str) -> Result<()> {
629 self.disconnect(identifier).await?;
630 self.devices.write().await.remove(identifier);
631 info!("Removed device from manager: {}", identifier);
632 Ok(())
633 }
634
635 pub async fn device_ids(&self) -> Vec<String> {
637 self.devices.read().await.keys().cloned().collect()
638 }
639
640 pub async fn device_count(&self) -> usize {
642 self.devices.read().await.len()
643 }
644
645 pub async fn connected_count(&self) -> usize {
651 let devices = self.devices.read().await;
652 devices.values().filter(|m| m.has_device()).count()
653 }
654
655 pub async fn can_connect(&self) -> bool {
660 if self.config.max_concurrent_connections == 0 {
661 return true;
662 }
663 self.connected_count().await < self.config.max_concurrent_connections
664 }
665
666 pub async fn connection_status(&self) -> (usize, usize) {
670 (
671 self.connected_count().await,
672 self.config.max_concurrent_connections,
673 )
674 }
675
676 pub async fn available_connections(&self) -> Option<usize> {
680 if self.config.max_concurrent_connections == 0 {
681 return None;
682 }
683 let current = self.connected_count().await;
684 Some(
685 self.config
686 .max_concurrent_connections
687 .saturating_sub(current),
688 )
689 }
690
691 pub async fn connected_count_verified(&self) -> usize {
696 let device_arcs: Vec<Arc<Device>> = {
698 let devices = self.devices.read().await;
699 devices.values().filter_map(|m| m.device_arc()).collect()
700 };
701 let futures = device_arcs.iter().map(|d| d.is_connected());
705 let results = join_all(futures).await;
706
707 results.into_iter().filter(|&connected| connected).count()
708 }
709
710 pub async fn read_current(&self, identifier: &str) -> Result<CurrentReading> {
712 let device = {
714 let devices = self.devices.read().await;
715 let managed = devices
716 .get(identifier)
717 .ok_or_else(|| Error::device_not_found(identifier))?;
718 managed.device_arc().ok_or(Error::NotConnected)?
719 };
720 let reading = device.read_current().await?;
723
724 self.events.send(DeviceEvent::Reading {
726 device: DeviceId::new(identifier),
727 reading,
728 });
729
730 {
732 let mut devices = self.devices.write().await;
733 if let Some(managed) = devices.get_mut(identifier) {
734 managed.last_reading = Some(reading);
735 }
736 }
737
738 Ok(reading)
739 }
740
741 pub async fn read_all(&self) -> HashMap<String, Result<CurrentReading>> {
747 let devices_to_read: Vec<(String, Arc<Device>)> = {
749 let devices = self.devices.read().await;
750 devices
751 .iter()
752 .filter_map(|(id, managed)| managed.device_arc().map(|d| (id.clone(), d)))
753 .collect()
754 };
755 let read_futures = devices_to_read.iter().map(|(id, device)| {
759 let id = id.clone();
760 let device = Arc::clone(device);
761 async move {
762 let result = device.read_current().await;
763 (id, result)
764 }
765 });
766
767 let read_results: Vec<(String, Result<CurrentReading>)> = join_all(read_futures).await;
768
769 for (id, result) in &read_results {
771 if let Ok(reading) = result {
772 self.events.send(DeviceEvent::Reading {
773 device: DeviceId::new(id),
774 reading: *reading,
775 });
776 }
777 }
778
779 {
781 let mut devices = self.devices.write().await;
782 for (id, result) in &read_results {
783 if let Ok(reading) = result
784 && let Some(managed) = devices.get_mut(id)
785 {
786 managed.last_reading = Some(*reading);
787 }
788 }
789 }
790
791 read_results.into_iter().collect()
792 }
793
794 pub async fn connect_all(&self) -> HashMap<String, Result<()>> {
798 let ids: Vec<_> = self.devices.read().await.keys().cloned().collect();
799
800 let connect_futures = ids.iter().map(|id| {
803 let id = id.clone();
804 async move {
805 let result = self.connect(&id).await;
806 (id, result)
807 }
808 });
809
810 join_all(connect_futures).await.into_iter().collect()
811 }
812
813 pub async fn disconnect_all(&self) -> HashMap<String, Result<()>> {
817 let devices_to_disconnect: Vec<(String, Arc<Device>)> = {
819 let mut devices = self.devices.write().await;
820 devices
821 .iter_mut()
822 .filter_map(|(id, managed)| managed.device.take().map(|d| (id.clone(), d)))
823 .collect()
824 };
825
826 let disconnect_futures = devices_to_disconnect.iter().map(|(id, device)| {
828 let id = id.clone();
829 let device = Arc::clone(device);
830 async move {
831 let result = device.disconnect().await;
832 (id, result)
833 }
834 });
835
836 let results: Vec<(String, Result<()>)> = join_all(disconnect_futures).await;
837
838 for (id, result) in &results {
840 if result.is_ok() {
841 self.events.send(DeviceEvent::Disconnected {
842 device: DeviceId::new(id),
843 reason: DisconnectReason::UserRequested,
844 });
845 }
846 }
847
848 results.into_iter().collect()
849 }
850
851 pub fn try_is_connected(&self, identifier: &str) -> Option<bool> {
861 match self.devices.try_read() {
863 Ok(devices) => Some(
864 devices
865 .get(identifier)
866 .map(|m| m.has_device())
867 .unwrap_or(false),
868 ),
869 Err(_) => None, }
871 }
872
873 pub async fn is_connected(&self, identifier: &str) -> bool {
877 let device = {
878 let devices = self.devices.read().await;
879 devices.get(identifier).and_then(|m| m.device_arc())
880 };
881
882 if let Some(device) = device {
883 device.is_connected().await
884 } else {
885 false
886 }
887 }
888
889 pub async fn get_device_info(&self, identifier: &str) -> Option<DeviceInfo> {
891 let devices = self.devices.read().await;
892 devices.get(identifier).and_then(|m| m.info.clone())
893 }
894
895 pub async fn get_last_reading(&self, identifier: &str) -> Option<CurrentReading> {
897 let devices = self.devices.read().await;
898 devices.get(identifier).and_then(|m| m.last_reading)
899 }
900
901 pub fn start_health_monitor(
935 self: &Arc<Self>,
936 cancel_token: CancellationToken,
937 ) -> tokio::task::JoinHandle<()> {
938 let manager = Arc::clone(self);
939
940 tokio::spawn(async move {
941 let mut adaptive = if manager.config.use_adaptive_interval {
943 Some(AdaptiveInterval::new(
944 manager.config.health_check_interval,
945 manager.config.min_health_check_interval,
946 manager.config.max_health_check_interval,
947 ))
948 } else {
949 None
950 };
951
952 loop {
953 let current_interval = adaptive
955 .as_ref()
956 .map(|a| a.current())
957 .unwrap_or(manager.config.health_check_interval);
958
959 tokio::select! {
960 _ = cancel_token.cancelled() => {
961 info!("Health monitor cancelled, shutting down");
962 break;
963 }
964 _ = tokio::time::sleep(current_interval) => {
965 let mut any_failures = false;
966 let mut any_successes = false;
967
968 let devices_to_check: Vec<(String, Option<Arc<Device>>, bool, DevicePriority)> = {
970 let devices = manager.devices.read().await;
971 devices
972 .iter()
973 .map(|(id, m)| {
974 (
975 id.clone(),
976 m.device_arc(),
977 m.auto_reconnect,
978 m.priority,
979 )
980 })
981 .collect()
982 };
983
984 let mut sorted_devices = devices_to_check;
986 sorted_devices.sort_by(|a, b| b.3.cmp(&a.3));
987
988 for (id, device_opt, auto_reconnect, _priority) in sorted_devices {
989 let should_reconnect = match device_opt {
990 Some(device) => {
991 if manager.config.use_connection_validation {
993 !device.is_connection_alive().await
994 } else {
995 !device.is_connected().await
996 }
997 }
998 None => true,
999 };
1000
1001 if should_reconnect && auto_reconnect {
1002 debug!("Health monitor: attempting reconnect for {}", id);
1003 any_failures = true;
1004
1005 match manager.connect(&id).await {
1006 Ok(()) => {
1007 any_successes = true;
1008 if let Some(m) = manager.devices.write().await.get_mut(&id) {
1010 m.record_success();
1011 }
1012 }
1013 Err(e) => {
1014 warn!("Health monitor: reconnect failed for {}: {}", id, e);
1015 if let Some(m) = manager.devices.write().await.get_mut(&id) {
1017 m.record_failure();
1018 }
1019 }
1020 }
1021 } else if !should_reconnect {
1022 any_successes = true;
1023 }
1024 }
1025
1026 if let Some(ref mut adaptive) = adaptive {
1028 if any_failures && !any_successes {
1029 adaptive.on_failure();
1030 } else if any_successes && !any_failures {
1031 adaptive.on_success();
1032 }
1033 }
1035 }
1036 }
1037 }
1038 })
1039 }
1040
1041 pub async fn add_device_with_priority(
1043 &self,
1044 identifier: &str,
1045 priority: DevicePriority,
1046 ) -> Result<()> {
1047 let mut devices = self.devices.write().await;
1048
1049 if devices.contains_key(identifier) {
1050 if let Some(m) = devices.get_mut(identifier) {
1052 m.priority = priority;
1053 }
1054 return Ok(());
1055 }
1056
1057 let mut managed = ManagedDevice::new(identifier);
1058 managed.priority = priority;
1059 managed.reconnect_options = self.config.default_reconnect_options.clone();
1060 devices.insert(identifier.to_string(), managed);
1061
1062 info!(
1063 "Added device to manager with priority {:?}: {}",
1064 priority, identifier
1065 );
1066 Ok(())
1067 }
1068
1069 pub async fn lowest_priority_connected(&self) -> Option<String> {
1073 let devices = self.devices.read().await;
1074 devices
1075 .iter()
1076 .filter(|(_, m)| m.has_device() && m.priority != DevicePriority::Critical)
1077 .min_by_key(|(_, m)| m.priority)
1078 .map(|(id, _)| id.clone())
1079 }
1080
1081 pub async fn evict_lowest_priority(&self) -> Result<bool> {
1085 if let Some(id) = self.lowest_priority_connected().await {
1086 info!("Evicting lowest priority device: {}", id);
1087 self.disconnect(&id).await?;
1088 Ok(true)
1089 } else {
1090 Ok(false)
1091 }
1092 }
1093
1094 pub fn start_hybrid_monitor(
1123 self: &Arc<Self>,
1124 cancel_token: CancellationToken,
1125 passive_options: Option<PassiveMonitorOptions>,
1126 ) -> tokio::task::JoinHandle<()> {
1127 let manager = Arc::clone(self);
1128 let options = passive_options.unwrap_or_default();
1129
1130 tokio::spawn(async move {
1131 info!("Starting hybrid monitor (passive + active)");
1132
1133 let passive_monitor = Arc::new(PassiveMonitor::new(options));
1135 let mut passive_rx = passive_monitor.subscribe();
1136
1137 let passive_cancel = cancel_token.clone();
1139 let _passive_handle = passive_monitor.start(passive_cancel);
1140
1141 loop {
1142 tokio::select! {
1143 _ = cancel_token.cancelled() => {
1144 info!("Hybrid monitor cancelled");
1145 break;
1146 }
1147 result = passive_rx.recv() => {
1148 match result {
1149 Ok(passive_reading) => {
1150 if let Some(reading) = passive_reading_to_current(&passive_reading) {
1152 if let Some(m) = manager.devices.write().await.get_mut(&passive_reading.device_id) {
1154 m.last_reading = Some(reading);
1155 m.record_success();
1156 }
1157
1158 manager.events.send(DeviceEvent::Reading {
1160 device: DeviceId {
1161 id: passive_reading.device_id.clone(),
1162 name: passive_reading.device_name.clone(),
1163 device_type: Some(passive_reading.data.device_type),
1164 },
1165 reading,
1166 });
1167 }
1168 }
1169 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1170 warn!("Hybrid monitor lagged {} messages", n);
1171 }
1172 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1173 info!("Passive monitor channel closed");
1174 break;
1175 }
1176 }
1177 }
1178 }
1179 }
1180 })
1181 }
1182
1183 pub async fn read_hybrid(
1193 &self,
1194 identifier: &str,
1195 max_passive_age: Option<Duration>,
1196 ) -> Result<CurrentReading> {
1197 let max_age = max_passive_age.unwrap_or(Duration::from_secs(60));
1198
1199 {
1201 let devices = self.devices.read().await;
1202 if let Some(managed) = devices.get(identifier)
1203 && let Some(reading) = managed.last_reading
1204 {
1205 if let Some(captured) = reading.captured_at {
1207 let age = time::OffsetDateTime::now_utc() - captured;
1208 if age
1209 < time::Duration::try_from(max_age).unwrap_or(time::Duration::seconds(60))
1210 {
1211 debug!("Using cached passive reading for {}", identifier);
1212 return Ok(reading);
1213 }
1214 }
1215 }
1216 }
1217
1218 debug!(
1220 "No recent passive reading, using active connection for {}",
1221 identifier
1222 );
1223 self.read_current(identifier).await
1224 }
1225
1226 pub async fn supports_passive_monitoring(&self, identifier: &str) -> bool {
1231 let options = PassiveMonitorOptions::default()
1233 .scan_duration(Duration::from_secs(5))
1234 .filter_devices(vec![identifier.to_string()]);
1235
1236 let monitor = Arc::new(PassiveMonitor::new(options));
1237 let mut rx = monitor.subscribe();
1238 let cancel = CancellationToken::new();
1239
1240 let _handle = monitor.start(cancel.clone());
1241
1242 let result = tokio::time::timeout(Duration::from_secs(6), rx.recv()).await;
1244 cancel.cancel();
1245
1246 matches!(result, Ok(Ok(_)))
1247 }
1248}
1249
1250fn passive_reading_to_current(passive: &PassiveReading) -> Option<CurrentReading> {
1252 let data = &passive.data;
1253
1254 if data.co2.is_none()
1256 && data.temperature.is_none()
1257 && data.humidity.is_none()
1258 && data.radon.is_none()
1259 && data.radiation_dose_rate.is_none()
1260 {
1261 return None;
1262 }
1263
1264 Some(CurrentReading {
1265 co2: data.co2.unwrap_or(0),
1266 temperature: data.temperature.unwrap_or(0.0),
1267 pressure: data.pressure.unwrap_or(0.0),
1268 humidity: data.humidity.unwrap_or(0),
1269 battery: data.battery,
1270 status: data.status,
1271 interval: data.interval,
1272 age: data.age,
1273 captured_at: Some(time::OffsetDateTime::now_utc()),
1274 radon: data.radon,
1275 radon_avg_24h: None,
1276 radon_avg_7d: None,
1277 radon_avg_30d: None,
1278 radiation_rate: data.radiation_dose_rate,
1279 radiation_total: None, })
1281}
1282
1283impl Default for DeviceManager {
1284 fn default() -> Self {
1285 Self::new()
1286 }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291 use super::*;
1292
1293 #[tokio::test]
1294 async fn test_manager_add_device() {
1295 let manager = DeviceManager::new();
1296 manager.add_device("test-device").await.unwrap();
1297
1298 assert_eq!(manager.device_count().await, 1);
1299 assert!(
1300 manager
1301 .device_ids()
1302 .await
1303 .contains(&"test-device".to_string())
1304 );
1305 }
1306
1307 #[tokio::test]
1308 async fn test_manager_remove_device() {
1309 let manager = DeviceManager::new();
1310 manager.add_device("test-device").await.unwrap();
1311 manager.remove_device("test-device").await.unwrap();
1312
1313 assert_eq!(manager.device_count().await, 0);
1314 }
1315
1316 #[tokio::test]
1317 async fn test_manager_not_connected_by_default() {
1318 let manager = DeviceManager::new();
1319 manager.add_device("test-device").await.unwrap();
1320
1321 assert!(!manager.is_connected("test-device").await);
1322 assert_eq!(manager.connected_count().await, 0);
1323 }
1324
1325 #[tokio::test]
1326 async fn test_manager_events() {
1327 let manager = DeviceManager::new();
1328 let _rx = manager.events().subscribe();
1329
1330 manager.add_device("test-device").await.unwrap();
1331
1332 assert_eq!(manager.events().receiver_count(), 1);
1334 }
1335}