1use std::collections::HashMap;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::Duration;
10
11use futures::future::join_all;
12use tokio::sync::RwLock;
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, info, warn};
15
16use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
17
18use crate::device::Device;
19use crate::error::{Error, Result};
20use crate::events::{DeviceEvent, DeviceId, DisconnectReason, EventDispatcher};
21use crate::passive::{PassiveMonitor, PassiveMonitorOptions, PassiveReading};
22use crate::reconnect::ReconnectOptions;
23use crate::scan::{DiscoveredDevice, ScanOptions, scan_with_options};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
30pub enum DevicePriority {
31 Low,
33 #[default]
35 Normal,
36 High,
38 Critical,
40}
41
42#[derive(Debug, Clone)]
47pub struct AdaptiveInterval {
48 pub base: Duration,
50 current: Duration,
52 pub min: Duration,
54 pub max: Duration,
56 consecutive_successes: u32,
58 consecutive_failures: u32,
60 success_threshold: u32,
62 failure_threshold: u32,
64}
65
66impl Default for AdaptiveInterval {
67 fn default() -> Self {
68 Self {
69 base: Duration::from_secs(30),
70 current: Duration::from_secs(30),
71 min: Duration::from_secs(5),
72 max: Duration::from_secs(120),
73 consecutive_successes: 0,
74 consecutive_failures: 0,
75 success_threshold: 3,
76 failure_threshold: 1,
77 }
78 }
79}
80
81impl AdaptiveInterval {
82 pub fn new(base: Duration, min: Duration, max: Duration) -> Self {
84 Self {
85 base,
86 current: base,
87 min,
88 max,
89 ..Default::default()
90 }
91 }
92
93 pub fn current(&self) -> Duration {
95 self.current
96 }
97
98 pub fn on_success(&mut self) {
103 self.consecutive_failures = 0;
104 self.consecutive_successes += 1;
105
106 if self.consecutive_successes >= self.success_threshold {
107 let new_interval = self.current.saturating_mul(2);
109 self.current = new_interval.min(self.max);
110 self.consecutive_successes = 0;
111 debug!(
112 "Health check stable, increasing interval to {:?}",
113 self.current
114 );
115 }
116 }
117
118 pub fn on_failure(&mut self) {
123 self.consecutive_successes = 0;
124 self.consecutive_failures += 1;
125
126 if self.consecutive_failures >= self.failure_threshold {
127 let new_interval = self.current / 2;
129 self.current = new_interval.max(self.min);
130 self.consecutive_failures = 0;
131 debug!(
132 "Health check unstable, decreasing interval to {:?}",
133 self.current
134 );
135 }
136 }
137
138 pub fn reset(&mut self) {
140 self.current = self.base;
141 self.consecutive_successes = 0;
142 self.consecutive_failures = 0;
143 }
144}
145
146#[derive(Debug)]
148pub struct ManagedDevice {
149 pub id: String,
151 pub name: Option<String>,
153 pub device_type: Option<DeviceType>,
155 device: Option<Arc<Device>>,
158 connecting: AtomicBool,
161 pub auto_reconnect: bool,
163 pub last_reading: Option<CurrentReading>,
165 pub info: Option<DeviceInfo>,
167 pub reconnect_options: ReconnectOptions,
169 pub priority: DevicePriority,
171 pub consecutive_failures: u32,
173 pub last_success: Option<u64>,
175}
176
177impl ManagedDevice {
178 pub fn new(id: &str) -> Self {
180 Self {
181 id: id.to_string(),
182 name: None,
183 device_type: None,
184 device: None,
185 connecting: AtomicBool::new(false),
186 auto_reconnect: true,
187 last_reading: None,
188 info: None,
189 reconnect_options: ReconnectOptions::default(),
190 priority: DevicePriority::default(),
191 consecutive_failures: 0,
192 last_success: None,
193 }
194 }
195
196 pub fn with_reconnect_options(id: &str, options: ReconnectOptions) -> Self {
198 Self {
199 reconnect_options: options,
200 ..Self::new(id)
201 }
202 }
203
204 pub fn with_priority(id: &str, priority: DevicePriority) -> Self {
206 Self {
207 priority,
208 ..Self::new(id)
209 }
210 }
211
212 pub fn with_options(id: &str, options: ReconnectOptions, priority: DevicePriority) -> Self {
214 Self {
215 reconnect_options: options,
216 priority,
217 ..Self::new(id)
218 }
219 }
220
221 pub fn record_success(&mut self) {
223 self.consecutive_failures = 0;
224 self.last_success = Some(
225 std::time::SystemTime::now()
226 .duration_since(std::time::UNIX_EPOCH)
227 .unwrap_or_default()
228 .as_millis() as u64,
229 );
230 }
231
232 pub fn record_failure(&mut self) {
234 self.consecutive_failures += 1;
235 }
236
237 pub fn has_device(&self) -> bool {
239 self.device.is_some()
240 }
241
242 pub async fn is_connected(&self) -> bool {
244 if let Some(device) = &self.device {
245 device.is_connected().await
246 } else {
247 false
248 }
249 }
250
251 pub fn device(&self) -> Option<&Arc<Device>> {
253 self.device.as_ref()
254 }
255
256 pub fn device_arc(&self) -> Option<Arc<Device>> {
258 self.device.clone()
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct ManagerConfig {
265 pub scan_options: ScanOptions,
267 pub default_reconnect_options: ReconnectOptions,
269 pub event_capacity: usize,
271 pub health_check_interval: Duration,
273 pub max_concurrent_connections: usize,
279 pub use_adaptive_interval: bool,
285 pub min_health_check_interval: Duration,
287 pub max_health_check_interval: Duration,
289 pub default_priority: DevicePriority,
291 pub use_connection_validation: bool,
297}
298
299impl Default for ManagerConfig {
300 fn default() -> Self {
301 let platform_config = crate::platform::PlatformConfig::for_current_platform();
303
304 Self {
305 scan_options: ScanOptions::default(),
306 default_reconnect_options: ReconnectOptions::default(),
307 event_capacity: 100,
308 health_check_interval: Duration::from_secs(30),
309 max_concurrent_connections: platform_config.max_concurrent_connections,
310 use_adaptive_interval: true,
311 min_health_check_interval: Duration::from_secs(5),
312 max_health_check_interval: Duration::from_secs(120),
313 default_priority: DevicePriority::Normal,
314 use_connection_validation: true,
315 }
316 }
317}
318
319impl ManagerConfig {
320 pub fn with_max_connections(mut self, max: usize) -> Self {
322 self.max_concurrent_connections = max;
323 self
324 }
325
326 pub fn unlimited_connections(mut self) -> Self {
328 self.max_concurrent_connections = 0;
329 self
330 }
331
332 pub fn adaptive_interval(mut self, enabled: bool) -> Self {
334 self.use_adaptive_interval = enabled;
335 self
336 }
337
338 pub fn health_check_interval(mut self, interval: Duration) -> Self {
340 self.health_check_interval = interval;
341 self
342 }
343
344 pub fn default_priority(mut self, priority: DevicePriority) -> Self {
346 self.default_priority = priority;
347 self
348 }
349
350 pub fn connection_validation(mut self, enabled: bool) -> Self {
352 self.use_connection_validation = enabled;
353 self
354 }
355}
356
357pub struct DeviceManager {
359 devices: RwLock<HashMap<String, ManagedDevice>>,
361 events: EventDispatcher,
363 config: ManagerConfig,
365}
366
367impl DeviceManager {
368 pub fn new() -> Self {
370 Self::with_config(ManagerConfig::default())
371 }
372
373 pub fn with_event_capacity(capacity: usize) -> Self {
375 Self::with_config(ManagerConfig {
376 event_capacity: capacity,
377 ..Default::default()
378 })
379 }
380
381 pub fn with_config(config: ManagerConfig) -> Self {
383 Self {
384 devices: RwLock::new(HashMap::new()),
385 events: EventDispatcher::new(config.event_capacity),
386 config,
387 }
388 }
389
390 pub fn events(&self) -> &EventDispatcher {
392 &self.events
393 }
394
395 pub fn config(&self) -> &ManagerConfig {
397 &self.config
398 }
399
400 pub async fn scan(&self) -> Result<Vec<DiscoveredDevice>> {
402 scan_with_options(self.config.scan_options.clone()).await
403 }
404
405 pub async fn scan_with_options(&self, options: ScanOptions) -> Result<Vec<DiscoveredDevice>> {
407 let devices = scan_with_options(options).await?;
408
409 for device in &devices {
411 self.events.send(DeviceEvent::Discovered {
412 device: DeviceId {
413 id: device.identifier.clone(),
414 name: device.name.clone(),
415 device_type: device.device_type,
416 },
417 rssi: device.rssi,
418 });
419 }
420
421 Ok(devices)
422 }
423
424 pub async fn add_device(&self, identifier: &str) -> Result<()> {
426 self.add_device_with_options(identifier, self.config.default_reconnect_options.clone())
427 .await
428 }
429
430 pub async fn add_device_with_options(
432 &self,
433 identifier: &str,
434 reconnect_options: ReconnectOptions,
435 ) -> Result<()> {
436 let mut devices = self.devices.write().await;
437
438 if devices.contains_key(identifier) {
439 return Ok(()); }
441
442 let managed = ManagedDevice::with_reconnect_options(identifier, reconnect_options);
443 devices.insert(identifier.to_string(), managed);
444
445 info!("Added device to manager: {}", identifier);
446 Ok(())
447 }
448
449 pub async fn connect(&self, identifier: &str) -> Result<()> {
465 let reconnect_options = {
467 let mut devices = self.devices.write().await;
468
469 if self.config.max_concurrent_connections > 0 {
471 let already_connected = devices
473 .get(identifier)
474 .map(|m| m.has_device())
475 .unwrap_or(false);
476
477 if !already_connected {
478 let current_connections = devices.values().filter(|m| m.has_device()).count();
479 if current_connections >= self.config.max_concurrent_connections {
480 warn!(
481 "Connection limit reached ({}/{}), cannot connect to {}",
482 current_connections, self.config.max_concurrent_connections, identifier
483 );
484 return Err(Error::connection_failed(
485 Some(identifier.to_string()),
486 crate::error::ConnectionFailureReason::Other(format!(
487 "Connection limit reached ({}/{})",
488 current_connections, self.config.max_concurrent_connections
489 )),
490 ));
491 }
492 }
493 }
494
495 let managed = devices.entry(identifier.to_string()).or_insert_with(|| {
497 info!("Adding device to manager: {}", identifier);
498 ManagedDevice::with_reconnect_options(
499 identifier,
500 self.config.default_reconnect_options.clone(),
501 )
502 });
503
504 if managed.device.is_some() {
506 debug!("Device {} already has a connection handle", identifier);
507 return Ok(());
508 }
509
510 if managed
513 .connecting
514 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
515 .is_err()
516 {
517 debug!(
518 "Another task is already connecting to device {}",
519 identifier
520 );
521 return Ok(());
522 }
523
524 managed.reconnect_options.clone()
526 };
527 let _ = reconnect_options;
532 let connect_result = Device::connect(identifier).await;
533
534 let device = match connect_result {
536 Ok(d) => Arc::new(d),
537 Err(e) => {
538 let devices = self.devices.read().await;
540 if let Some(managed) = devices.get(identifier) {
541 managed.connecting.store(false, Ordering::SeqCst);
542 }
543 return Err(e);
544 }
545 };
546
547 let info = device.read_device_info().await.ok();
548 let device_type = device.device_type();
549 let name = device.name().map(|s| s.to_string());
550
551 {
553 let mut devices = self.devices.write().await;
554 if let Some(managed) = devices.get_mut(identifier) {
555 managed.connecting.store(false, Ordering::SeqCst);
557
558 if managed.device.is_some() {
561 debug!(
563 "Another task connected {} while we were connecting, discarding our connection",
564 identifier
565 );
566 drop(devices); let _ = device.disconnect().await;
568 return Ok(());
569 }
570
571 managed.device = Some(device);
572 managed.info = info.clone();
573 managed.device_type = device_type;
574 managed.name = name.clone();
575 } else {
576 let mut managed = ManagedDevice::new(identifier);
578 managed.device = Some(device);
579 managed.info = info.clone();
580 managed.device_type = device_type;
581 managed.name = name.clone();
582 devices.insert(identifier.to_string(), managed);
583 }
584 }
585
586 self.events.send(DeviceEvent::Connected {
588 device: DeviceId {
589 id: identifier.to_string(),
590 name,
591 device_type,
592 },
593 info,
594 });
595
596 info!("Connected to device: {}", identifier);
597 Ok(())
598 }
599
600 pub async fn disconnect(&self, identifier: &str) -> Result<()> {
602 let device_arc = {
603 let mut devices = self.devices.write().await;
604 if let Some(managed) = devices.get_mut(identifier) {
605 managed.device.take()
606 } else {
607 None
608 }
609 };
610
611 if let Some(device) = device_arc {
613 device.disconnect().await?;
614 self.events.send(DeviceEvent::Disconnected {
615 device: DeviceId::new(identifier),
616 reason: DisconnectReason::UserRequested,
617 });
618 }
619
620 Ok(())
621 }
622
623 pub async fn remove_device(&self, identifier: &str) -> Result<()> {
625 self.disconnect(identifier).await?;
626 self.devices.write().await.remove(identifier);
627 info!("Removed device from manager: {}", identifier);
628 Ok(())
629 }
630
631 pub async fn device_ids(&self) -> Vec<String> {
633 self.devices.read().await.keys().cloned().collect()
634 }
635
636 pub async fn device_count(&self) -> usize {
638 self.devices.read().await.len()
639 }
640
641 pub async fn connected_count(&self) -> usize {
647 let devices = self.devices.read().await;
648 devices.values().filter(|m| m.has_device()).count()
649 }
650
651 pub async fn can_connect(&self) -> bool {
656 if self.config.max_concurrent_connections == 0 {
657 return true;
658 }
659 self.connected_count().await < self.config.max_concurrent_connections
660 }
661
662 pub async fn connection_status(&self) -> (usize, usize) {
666 (
667 self.connected_count().await,
668 self.config.max_concurrent_connections,
669 )
670 }
671
672 pub async fn available_connections(&self) -> Option<usize> {
676 if self.config.max_concurrent_connections == 0 {
677 return None;
678 }
679 let current = self.connected_count().await;
680 Some(
681 self.config
682 .max_concurrent_connections
683 .saturating_sub(current),
684 )
685 }
686
687 pub async fn connected_count_verified(&self) -> usize {
692 let device_arcs: Vec<Arc<Device>> = {
694 let devices = self.devices.read().await;
695 devices.values().filter_map(|m| m.device_arc()).collect()
696 };
697 let futures = device_arcs.iter().map(|d| d.is_connected());
701 let results = join_all(futures).await;
702
703 results.into_iter().filter(|&connected| connected).count()
704 }
705
706 pub async fn read_current(&self, identifier: &str) -> Result<CurrentReading> {
708 let device = {
710 let devices = self.devices.read().await;
711 let managed = devices
712 .get(identifier)
713 .ok_or_else(|| Error::device_not_found(identifier))?;
714 managed.device_arc().ok_or(Error::NotConnected)?
715 };
716 let reading = device.read_current().await?;
719
720 self.events.send(DeviceEvent::Reading {
722 device: DeviceId::new(identifier),
723 reading,
724 });
725
726 {
728 let mut devices = self.devices.write().await;
729 if let Some(managed) = devices.get_mut(identifier) {
730 managed.last_reading = Some(reading);
731 }
732 }
733
734 Ok(reading)
735 }
736
737 pub async fn read_all(&self) -> HashMap<String, Result<CurrentReading>> {
743 let devices_to_read: Vec<(String, Arc<Device>)> = {
745 let devices = self.devices.read().await;
746 devices
747 .iter()
748 .filter_map(|(id, managed)| managed.device_arc().map(|d| (id.clone(), d)))
749 .collect()
750 };
751 let read_futures = devices_to_read.iter().map(|(id, device)| {
755 let id = id.clone();
756 let device = Arc::clone(device);
757 async move {
758 let result = device.read_current().await;
759 (id, result)
760 }
761 });
762
763 let read_results: Vec<(String, Result<CurrentReading>)> = join_all(read_futures).await;
764
765 for (id, result) in &read_results {
767 if let Ok(reading) = result {
768 self.events.send(DeviceEvent::Reading {
769 device: DeviceId::new(id),
770 reading: *reading,
771 });
772 }
773 }
774
775 {
777 let mut devices = self.devices.write().await;
778 for (id, result) in &read_results {
779 if let Ok(reading) = result
780 && let Some(managed) = devices.get_mut(id)
781 {
782 managed.last_reading = Some(*reading);
783 }
784 }
785 }
786
787 read_results.into_iter().collect()
788 }
789
790 pub async fn connect_all(&self) -> HashMap<String, Result<()>> {
794 let ids: Vec<_> = self.devices.read().await.keys().cloned().collect();
795
796 let connect_futures = ids.iter().map(|id| {
799 let id = id.clone();
800 async move {
801 let result = self.connect(&id).await;
802 (id, result)
803 }
804 });
805
806 join_all(connect_futures).await.into_iter().collect()
807 }
808
809 pub async fn disconnect_all(&self) -> HashMap<String, Result<()>> {
813 let devices_to_disconnect: Vec<(String, Arc<Device>)> = {
815 let mut devices = self.devices.write().await;
816 devices
817 .iter_mut()
818 .filter_map(|(id, managed)| managed.device.take().map(|d| (id.clone(), d)))
819 .collect()
820 };
821
822 let disconnect_futures = devices_to_disconnect.iter().map(|(id, device)| {
824 let id = id.clone();
825 let device = Arc::clone(device);
826 async move {
827 let result = device.disconnect().await;
828 (id, result)
829 }
830 });
831
832 let results: Vec<(String, Result<()>)> = join_all(disconnect_futures).await;
833
834 for (id, result) in &results {
836 if result.is_ok() {
837 self.events.send(DeviceEvent::Disconnected {
838 device: DeviceId::new(id),
839 reason: DisconnectReason::UserRequested,
840 });
841 }
842 }
843
844 results.into_iter().collect()
845 }
846
847 pub fn try_is_connected(&self, identifier: &str) -> Option<bool> {
857 match self.devices.try_read() {
859 Ok(devices) => Some(
860 devices
861 .get(identifier)
862 .map(|m| m.has_device())
863 .unwrap_or(false),
864 ),
865 Err(_) => None, }
867 }
868
869 pub async fn is_connected(&self, identifier: &str) -> bool {
873 let device = {
874 let devices = self.devices.read().await;
875 devices.get(identifier).and_then(|m| m.device_arc())
876 };
877
878 if let Some(device) = device {
879 device.is_connected().await
880 } else {
881 false
882 }
883 }
884
885 pub async fn get_device_info(&self, identifier: &str) -> Option<DeviceInfo> {
887 let devices = self.devices.read().await;
888 devices.get(identifier).and_then(|m| m.info.clone())
889 }
890
891 pub async fn get_last_reading(&self, identifier: &str) -> Option<CurrentReading> {
893 let devices = self.devices.read().await;
894 devices.get(identifier).and_then(|m| m.last_reading)
895 }
896
897 pub fn start_health_monitor(
931 self: &Arc<Self>,
932 cancel_token: CancellationToken,
933 ) -> tokio::task::JoinHandle<()> {
934 let manager = Arc::clone(self);
935
936 tokio::spawn(async move {
937 let mut adaptive = if manager.config.use_adaptive_interval {
939 Some(AdaptiveInterval::new(
940 manager.config.health_check_interval,
941 manager.config.min_health_check_interval,
942 manager.config.max_health_check_interval,
943 ))
944 } else {
945 None
946 };
947
948 loop {
949 let current_interval = adaptive
951 .as_ref()
952 .map(|a| a.current())
953 .unwrap_or(manager.config.health_check_interval);
954
955 tokio::select! {
956 _ = cancel_token.cancelled() => {
957 info!("Health monitor cancelled, shutting down");
958 break;
959 }
960 _ = tokio::time::sleep(current_interval) => {
961 let mut any_failures = false;
962 let mut any_successes = false;
963
964 let devices_to_check: Vec<(String, Option<Arc<Device>>, bool, DevicePriority)> = {
966 let devices = manager.devices.read().await;
967 devices
968 .iter()
969 .map(|(id, m)| {
970 (
971 id.clone(),
972 m.device_arc(),
973 m.auto_reconnect,
974 m.priority,
975 )
976 })
977 .collect()
978 };
979
980 let mut sorted_devices = devices_to_check;
982 sorted_devices.sort_by(|a, b| b.3.cmp(&a.3));
983
984 for (id, device_opt, auto_reconnect, _priority) in sorted_devices {
985 let should_reconnect = match device_opt {
986 Some(device) => {
987 if manager.config.use_connection_validation {
989 !device.is_connection_alive().await
990 } else {
991 !device.is_connected().await
992 }
993 }
994 None => true,
995 };
996
997 if should_reconnect && auto_reconnect {
998 debug!("Health monitor: attempting reconnect for {}", id);
999 any_failures = true;
1000
1001 match manager.connect(&id).await {
1002 Ok(()) => {
1003 any_successes = true;
1004 if let Some(m) = manager.devices.write().await.get_mut(&id) {
1006 m.record_success();
1007 }
1008 }
1009 Err(e) => {
1010 warn!("Health monitor: reconnect failed for {}: {}", id, e);
1011 if let Some(m) = manager.devices.write().await.get_mut(&id) {
1013 m.record_failure();
1014 }
1015 }
1016 }
1017 } else if !should_reconnect {
1018 any_successes = true;
1019 }
1020 }
1021
1022 if let Some(ref mut adaptive) = adaptive {
1024 if any_failures && !any_successes {
1025 adaptive.on_failure();
1026 } else if any_successes && !any_failures {
1027 adaptive.on_success();
1028 }
1029 }
1031 }
1032 }
1033 }
1034 })
1035 }
1036
1037 pub async fn add_device_with_priority(
1039 &self,
1040 identifier: &str,
1041 priority: DevicePriority,
1042 ) -> Result<()> {
1043 let mut devices = self.devices.write().await;
1044
1045 if devices.contains_key(identifier) {
1046 if let Some(m) = devices.get_mut(identifier) {
1048 m.priority = priority;
1049 }
1050 return Ok(());
1051 }
1052
1053 let mut managed = ManagedDevice::new(identifier);
1054 managed.priority = priority;
1055 managed.reconnect_options = self.config.default_reconnect_options.clone();
1056 devices.insert(identifier.to_string(), managed);
1057
1058 info!(
1059 "Added device to manager with priority {:?}: {}",
1060 priority, identifier
1061 );
1062 Ok(())
1063 }
1064
1065 pub async fn lowest_priority_connected(&self) -> Option<String> {
1069 let devices = self.devices.read().await;
1070 devices
1071 .iter()
1072 .filter(|(_, m)| m.has_device() && m.priority != DevicePriority::Critical)
1073 .min_by_key(|(_, m)| m.priority)
1074 .map(|(id, _)| id.clone())
1075 }
1076
1077 pub async fn evict_lowest_priority(&self) -> Result<bool> {
1081 if let Some(id) = self.lowest_priority_connected().await {
1082 info!("Evicting lowest priority device: {}", id);
1083 self.disconnect(&id).await?;
1084 Ok(true)
1085 } else {
1086 Ok(false)
1087 }
1088 }
1089
1090 pub fn start_hybrid_monitor(
1119 self: &Arc<Self>,
1120 cancel_token: CancellationToken,
1121 passive_options: Option<PassiveMonitorOptions>,
1122 ) -> tokio::task::JoinHandle<()> {
1123 let manager = Arc::clone(self);
1124 let options = passive_options.unwrap_or_default();
1125
1126 tokio::spawn(async move {
1127 info!("Starting hybrid monitor (passive + active)");
1128
1129 let passive_monitor = Arc::new(PassiveMonitor::new(options));
1131 let mut passive_rx = passive_monitor.subscribe();
1132
1133 let passive_cancel = cancel_token.clone();
1135 let _passive_handle = passive_monitor.start(passive_cancel);
1136
1137 loop {
1138 tokio::select! {
1139 _ = cancel_token.cancelled() => {
1140 info!("Hybrid monitor cancelled");
1141 break;
1142 }
1143 result = passive_rx.recv() => {
1144 match result {
1145 Ok(passive_reading) => {
1146 if let Some(reading) = passive_reading_to_current(&passive_reading) {
1148 if let Some(m) = manager.devices.write().await.get_mut(&passive_reading.device_id) {
1150 m.last_reading = Some(reading);
1151 m.record_success();
1152 }
1153
1154 manager.events.send(DeviceEvent::Reading {
1156 device: DeviceId {
1157 id: passive_reading.device_id.clone(),
1158 name: passive_reading.device_name.clone(),
1159 device_type: Some(passive_reading.data.device_type),
1160 },
1161 reading,
1162 });
1163 }
1164 }
1165 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
1166 warn!("Hybrid monitor lagged {} messages", n);
1167 }
1168 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
1169 info!("Passive monitor channel closed");
1170 break;
1171 }
1172 }
1173 }
1174 }
1175 }
1176 })
1177 }
1178
1179 pub async fn read_hybrid(
1189 &self,
1190 identifier: &str,
1191 max_passive_age: Option<Duration>,
1192 ) -> Result<CurrentReading> {
1193 let max_age = max_passive_age.unwrap_or(Duration::from_secs(60));
1194
1195 {
1197 let devices = self.devices.read().await;
1198 if let Some(managed) = devices.get(identifier)
1199 && let Some(reading) = managed.last_reading
1200 {
1201 if let Some(captured) = reading.captured_at {
1203 let age = time::OffsetDateTime::now_utc() - captured;
1204 if age
1205 < time::Duration::try_from(max_age).unwrap_or(time::Duration::seconds(60))
1206 {
1207 debug!("Using cached passive reading for {}", identifier);
1208 return Ok(reading);
1209 }
1210 }
1211 }
1212 }
1213
1214 debug!(
1216 "No recent passive reading, using active connection for {}",
1217 identifier
1218 );
1219 self.read_current(identifier).await
1220 }
1221
1222 pub async fn supports_passive_monitoring(&self, identifier: &str) -> bool {
1227 let options = PassiveMonitorOptions::default()
1229 .scan_duration(Duration::from_secs(5))
1230 .filter_devices(vec![identifier.to_string()]);
1231
1232 let monitor = Arc::new(PassiveMonitor::new(options));
1233 let mut rx = monitor.subscribe();
1234 let cancel = CancellationToken::new();
1235
1236 let _handle = monitor.start(cancel.clone());
1237
1238 let result = tokio::time::timeout(Duration::from_secs(6), rx.recv()).await;
1240 cancel.cancel();
1241
1242 matches!(result, Ok(Ok(_)))
1243 }
1244}
1245
1246fn passive_reading_to_current(passive: &PassiveReading) -> Option<CurrentReading> {
1248 let data = &passive.data;
1249
1250 if data.co2.is_none()
1252 && data.temperature.is_none()
1253 && data.humidity.is_none()
1254 && data.radon.is_none()
1255 && data.radiation_dose_rate.is_none()
1256 {
1257 return None;
1258 }
1259
1260 Some(CurrentReading {
1261 co2: data.co2.unwrap_or(0),
1262 temperature: data.temperature.unwrap_or(0.0),
1263 pressure: data.pressure.unwrap_or(0.0),
1264 humidity: data.humidity.unwrap_or(0),
1265 battery: data.battery,
1266 status: data.status,
1267 interval: data.interval,
1268 age: data.age,
1269 captured_at: Some(time::OffsetDateTime::now_utc()),
1270 radon: data.radon,
1271 radon_avg_24h: None,
1272 radon_avg_7d: None,
1273 radon_avg_30d: None,
1274 radiation_rate: data.radiation_dose_rate,
1275 radiation_total: None, })
1277}
1278
1279impl Default for DeviceManager {
1280 fn default() -> Self {
1281 Self::new()
1282 }
1283}
1284
1285#[cfg(test)]
1286mod tests {
1287 use super::*;
1288
1289 #[tokio::test]
1290 async fn test_manager_add_device() {
1291 let manager = DeviceManager::new();
1292 manager.add_device("test-device").await.unwrap();
1293
1294 assert_eq!(manager.device_count().await, 1);
1295 assert!(
1296 manager
1297 .device_ids()
1298 .await
1299 .contains(&"test-device".to_string())
1300 );
1301 }
1302
1303 #[tokio::test]
1304 async fn test_manager_remove_device() {
1305 let manager = DeviceManager::new();
1306 manager.add_device("test-device").await.unwrap();
1307 manager.remove_device("test-device").await.unwrap();
1308
1309 assert_eq!(manager.device_count().await, 0);
1310 }
1311
1312 #[tokio::test]
1313 async fn test_manager_not_connected_by_default() {
1314 let manager = DeviceManager::new();
1315 manager.add_device("test-device").await.unwrap();
1316
1317 assert!(!manager.is_connected("test-device").await);
1318 assert_eq!(manager.connected_count().await, 0);
1319 }
1320
1321 #[tokio::test]
1322 async fn test_manager_events() {
1323 let manager = DeviceManager::new();
1324 let _rx = manager.events().subscribe();
1325
1326 manager.add_device("test-device").await.unwrap();
1327
1328 assert_eq!(manager.events().receiver_count(), 1);
1330 }
1331}