1use std::collections::HashMap;
19use std::path::PathBuf;
20use std::sync::Arc;
21use std::time::Duration;
22
23use aranet_core::device::{ConnectionConfig, SignalQuality};
24use aranet_core::messages::{ErrorContext, ServiceDeviceStats};
25use aranet_core::service_client::ServiceClient;
26use aranet_core::settings::{DeviceSettings, MeasurementInterval};
27use aranet_core::{
28 BluetoothRange, Device, RetryConfig, ScanOptions, scan::scan_with_options, with_retry,
29};
30use aranet_store::Store;
31use aranet_types::{CurrentReading, DeviceType};
32use tokio::sync::{RwLock, mpsc};
33use tokio::time::interval;
34use tokio_util::sync::CancellationToken;
35use tracing::{debug, error, info, warn};
36
37use super::messages::{CachedDevice, Command, SensorEvent};
38
39pub struct SensorWorker {
49 command_rx: mpsc::Receiver<Command>,
51 event_tx: mpsc::Sender<SensorEvent>,
53 store_path: PathBuf,
55 service_client: Option<ServiceClient>,
57 connection_config: ConnectionConfig,
59 background_polling: Arc<RwLock<HashMap<String, tokio::sync::watch::Sender<bool>>>>,
62 signal_quality_cache: Arc<RwLock<HashMap<String, SignalQuality>>>,
64 cancel_token: CancellationToken,
67}
68
69const DEFAULT_SERVICE_URL: &str = "http://localhost:8080";
71
72impl SensorWorker {
73 pub fn new(
81 command_rx: mpsc::Receiver<Command>,
82 event_tx: mpsc::Sender<SensorEvent>,
83 store_path: PathBuf,
84 ) -> Self {
85 let service_client = ServiceClient::new(DEFAULT_SERVICE_URL).ok();
87
88 let connection_config = ConnectionConfig::for_current_platform();
90 info!(
91 ?connection_config,
92 "Using platform-optimized connection config"
93 );
94
95 Self {
96 command_rx,
97 event_tx,
98 store_path,
99 service_client,
100 connection_config,
101 background_polling: Arc::new(RwLock::new(HashMap::new())),
102 signal_quality_cache: Arc::new(RwLock::new(HashMap::new())),
103 cancel_token: CancellationToken::new(),
104 }
105 }
106
107 fn open_store(&self) -> Option<Store> {
111 match Store::open(&self.store_path) {
112 Ok(store) => Some(store),
113 Err(e) => {
114 warn!(error = %e, "Failed to open store");
115 None
116 }
117 }
118 }
119
120 pub async fn run(mut self) {
125 info!("SensorWorker started");
126
127 loop {
128 tokio::select! {
129 cmd = self.command_rx.recv() => {
131 match cmd {
132 Some(Command::Shutdown) => {
133 info!("SensorWorker received shutdown command");
134 break;
135 }
136 Some(cmd) => {
137 self.handle_command(cmd).await;
138 }
139 None => {
140 info!("Command channel closed, shutting down worker");
141 break;
142 }
143 }
144 }
145 }
146 }
147
148 info!("SensorWorker stopped");
149 }
150
151 async fn handle_command(&mut self, cmd: Command) {
153 info!(?cmd, "Handling command");
154
155 match cmd {
156 Command::LoadCachedData => {
157 self.handle_load_cached_data().await;
158 }
159 Command::Scan { duration } => {
160 self.handle_scan(duration).await;
161 }
162 Command::Connect { device_id } => {
163 self.handle_connect(&device_id).await;
164 }
165 Command::Disconnect { device_id } => {
166 self.handle_disconnect(&device_id).await;
167 }
168 Command::RefreshReading { device_id } => {
169 self.handle_refresh_reading(&device_id).await;
170 }
171 Command::RefreshAll => {
172 self.handle_refresh_all().await;
173 }
174 Command::SyncHistory { device_id } => {
175 self.handle_sync_history(&device_id).await;
176 }
177 Command::SetInterval {
178 device_id,
179 interval_secs,
180 } => {
181 self.handle_set_interval(&device_id, interval_secs).await;
182 }
183 Command::SetBluetoothRange {
184 device_id,
185 extended,
186 } => {
187 self.handle_set_bluetooth_range(&device_id, extended).await;
188 }
189 Command::SetSmartHome { device_id, enabled } => {
190 self.handle_set_smart_home(&device_id, enabled).await;
191 }
192 Command::RefreshServiceStatus => {
193 self.handle_refresh_service_status().await;
194 }
195 Command::StartServiceCollector => {
196 self.handle_start_service_collector().await;
197 }
198 Command::StopServiceCollector => {
199 self.handle_stop_service_collector().await;
200 }
201 Command::SetAlias { device_id, alias } => {
202 self.handle_set_alias(&device_id, alias).await;
203 }
204 Command::ForgetDevice { device_id } => {
205 self.handle_forget_device(&device_id).await;
206 }
207 Command::CancelOperation => {
208 self.handle_cancel_operation().await;
209 }
210 Command::StartBackgroundPolling {
211 device_id,
212 interval_secs,
213 } => {
214 self.handle_start_background_polling(&device_id, interval_secs)
215 .await;
216 }
217 Command::StopBackgroundPolling { device_id } => {
218 self.handle_stop_background_polling(&device_id).await;
219 }
220 Command::Shutdown => {
221 }
223 Command::InstallSystemService { .. }
225 | Command::UninstallSystemService { .. }
226 | Command::StartSystemService { .. }
227 | Command::StopSystemService { .. }
228 | Command::CheckSystemServiceStatus { .. }
229 | Command::FetchServiceConfig
230 | Command::AddServiceDevice { .. }
231 | Command::UpdateServiceDevice { .. }
232 | Command::RemoveServiceDevice { .. } => {
233 info!("System service commands not supported in TUI");
234 }
235 }
236 }
237
238 async fn handle_load_cached_data(&self) {
240 info!("Loading cached data from store");
241
242 let Some(store) = self.open_store() else {
243 let _ = self
245 .event_tx
246 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
247 .await;
248 return;
249 };
250
251 let stored_devices = match store.list_devices() {
253 Ok(devices) => devices,
254 Err(e) => {
255 warn!("Failed to list devices: {}", e);
256 let _ = self
257 .event_tx
258 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
259 .await;
260 return;
261 }
262 };
263
264 let mut cached_devices = Vec::new();
266 for stored in stored_devices {
267 let reading = match store.get_latest_reading(&stored.id) {
268 Ok(Some(stored_reading)) => Some(CurrentReading {
269 co2: stored_reading.co2,
270 temperature: stored_reading.temperature,
271 pressure: stored_reading.pressure,
272 humidity: stored_reading.humidity,
273 battery: stored_reading.battery,
274 status: stored_reading.status,
275 interval: 0, age: 0, captured_at: Some(stored_reading.captured_at),
278 radon: stored_reading.radon,
279 radiation_rate: stored_reading.radiation_rate,
280 radiation_total: stored_reading.radiation_total,
281 radon_avg_24h: None,
282 radon_avg_7d: None,
283 radon_avg_30d: None,
284 }),
285 Ok(None) => None,
286 Err(e) => {
287 debug!("Failed to get latest reading for {}: {}", stored.id, e);
288 None
289 }
290 };
291
292 let last_sync = match store.get_sync_state(&stored.id) {
294 Ok(Some(state)) => state.last_sync_at,
295 Ok(None) => None,
296 Err(e) => {
297 debug!("Failed to get sync state for {}: {}", stored.id, e);
298 None
299 }
300 };
301
302 cached_devices.push(CachedDevice {
303 id: stored.id,
304 name: stored.name,
305 device_type: stored.device_type,
306 reading,
307 last_sync,
308 });
309 }
310
311 info!(count = cached_devices.len(), "Loaded cached devices");
312
313 let device_ids: Vec<String> = cached_devices.iter().map(|d| d.id.clone()).collect();
315
316 if let Err(e) = self
317 .event_tx
318 .send(SensorEvent::CachedDataLoaded {
319 devices: cached_devices,
320 })
321 .await
322 {
323 error!("Failed to send CachedDataLoaded event: {}", e);
324 }
325
326 for device_id in device_ids {
328 self.load_and_send_history(&device_id).await;
329 }
330 }
331
332 async fn handle_scan(&self, duration: Duration) {
334 info!(?duration, "Starting device scan");
335
336 if let Err(e) = self.event_tx.send(SensorEvent::ScanStarted).await {
338 error!("Failed to send ScanStarted event: {}", e);
339 return;
340 }
341
342 let cancel_token = self.cancel_token.clone();
344
345 let options = ScanOptions::default().duration(duration);
347 let scan_result = tokio::select! {
348 result = scan_with_options(options) => result,
349 _ = cancel_token.cancelled() => {
350 info!("Scan cancelled by user");
351 let _ = self
352 .event_tx
353 .send(SensorEvent::OperationCancelled {
354 operation: "Device scan".to_string(),
355 })
356 .await;
357 return;
358 }
359 };
360
361 match scan_result {
362 Ok(devices) => {
363 info!(count = devices.len(), "Scan complete");
364
365 self.save_discovered_devices(&devices);
367
368 if let Err(e) = self
369 .event_tx
370 .send(SensorEvent::ScanComplete { devices })
371 .await
372 {
373 error!("Failed to send ScanComplete event: {}", e);
374 }
375 }
376 Err(e) => {
377 error!("Scan failed: {}", e);
378 if let Err(send_err) = self
379 .event_tx
380 .send(SensorEvent::ScanError {
381 error: e.to_string(),
382 })
383 .await
384 {
385 error!("Failed to send ScanError event: {}", send_err);
386 }
387 }
388 }
389 }
390
391 async fn handle_connect(&self, device_id: &str) {
393 info!(device_id, "Connecting to device");
394
395 if let Err(e) = self
397 .event_tx
398 .send(SensorEvent::DeviceConnecting {
399 device_id: device_id.to_string(),
400 })
401 .await
402 {
403 error!("Failed to send DeviceConnecting event: {}", e);
404 return;
405 }
406
407 let cancel_token = self.cancel_token.clone();
409
410 let retry_config = RetryConfig::for_connect();
412 let device_id_owned = device_id.to_string();
413 let config = self.connection_config.clone();
414
415 let connect_future = with_retry(&retry_config, "connect_and_read", || {
416 let device_id = device_id_owned.clone();
417 let config = config.clone();
418 async move { Self::connect_and_read_with_config(&device_id, config).await }
419 });
420
421 let result = tokio::select! {
423 result = connect_future => result,
424 _ = cancel_token.cancelled() => {
425 info!(device_id, "Connection cancelled by user");
426 let _ = self
427 .event_tx
428 .send(SensorEvent::OperationCancelled {
429 operation: format!("Connect to {}", device_id),
430 })
431 .await;
432 return;
433 }
434 };
435
436 match result {
437 Ok((name, device_type, reading, settings, rssi, signal_quality)) => {
438 info!(
439 device_id,
440 ?name,
441 ?device_type,
442 ?rssi,
443 ?signal_quality,
444 "Device connected"
445 );
446
447 if let Some(quality) = signal_quality {
449 self.signal_quality_cache
450 .write()
451 .await
452 .insert(device_id.to_string(), quality);
453
454 if let Some(rssi_val) = rssi {
456 let _ = self
457 .event_tx
458 .send(SensorEvent::SignalStrengthUpdate {
459 device_id: device_id.to_string(),
460 rssi: rssi_val,
461 quality: aranet_core::messages::SignalQuality::from_rssi(rssi_val),
462 })
463 .await;
464 }
465
466 if quality == SignalQuality::Poor {
468 warn!(
469 device_id,
470 "Poor signal quality - connection may be unstable"
471 );
472 }
473 }
474
475 self.update_device_metadata(device_id, name.as_deref(), device_type);
477
478 if let Err(e) = self
480 .event_tx
481 .send(SensorEvent::DeviceConnected {
482 device_id: device_id.to_string(),
483 name,
484 device_type,
485 rssi,
486 })
487 .await
488 {
489 error!("Failed to send DeviceConnected event: {}", e);
490 }
491
492 if let Some(settings) = settings
494 && let Err(e) = self
495 .event_tx
496 .send(SensorEvent::SettingsLoaded {
497 device_id: device_id.to_string(),
498 settings,
499 })
500 .await
501 {
502 error!("Failed to send SettingsLoaded event: {}", e);
503 }
504
505 if let Some(reading) = reading {
507 self.save_reading(device_id, &reading);
509
510 if let Err(e) = self
511 .event_tx
512 .send(SensorEvent::ReadingUpdated {
513 device_id: device_id.to_string(),
514 reading,
515 })
516 .await
517 {
518 error!("Failed to send ReadingUpdated event: {}", e);
519 }
520 }
521
522 self.load_and_send_history(device_id).await;
524 }
525 Err(e) => {
526 error!(device_id, error = %e, "Failed to connect to device after retries");
527 let context = ErrorContext::from_error(&e);
529 if let Err(send_err) = self
530 .event_tx
531 .send(SensorEvent::ConnectionError {
532 device_id: device_id.to_string(),
533 error: e.to_string(),
534 context: Some(context),
535 })
536 .await
537 {
538 error!("Failed to send ConnectionError event: {}", send_err);
539 }
540 }
541 }
542 }
543
544 async fn handle_disconnect(&self, device_id: &str) {
550 info!(device_id, "Disconnecting device");
551
552 if let Err(e) = self
554 .event_tx
555 .send(SensorEvent::DeviceDisconnected {
556 device_id: device_id.to_string(),
557 })
558 .await
559 {
560 error!("Failed to send DeviceDisconnected event: {}", e);
561 }
562 }
563
564 async fn handle_refresh_reading(&self, device_id: &str) {
566 info!(device_id, "Refreshing reading for device");
567
568 let signal_quality = self
570 .signal_quality_cache
571 .read()
572 .await
573 .get(device_id)
574 .copied();
575
576 let retry_config = match signal_quality {
578 Some(SignalQuality::Poor) | Some(SignalQuality::Fair) => {
579 debug!(
580 device_id,
581 ?signal_quality,
582 "Using aggressive retry config for weak signal"
583 );
584 RetryConfig::aggressive()
585 }
586 _ => RetryConfig::for_read(),
587 };
588
589 let device_id_owned = device_id.to_string();
590 let config = self.connection_config.clone();
591
592 let result = with_retry(&retry_config, "refresh_reading", || {
593 let device_id = device_id_owned.clone();
594 let config = config.clone();
595 async move { Self::connect_and_read_with_config(&device_id, config).await }
596 })
597 .await;
598
599 match result {
600 Ok((_, _, reading, settings, rssi, new_signal_quality)) => {
601 if let Some(quality) = new_signal_quality {
603 self.signal_quality_cache
604 .write()
605 .await
606 .insert(device_id.to_string(), quality);
607 }
608
609 if let Some(rssi_val) = rssi {
611 let _ = self
612 .event_tx
613 .send(SensorEvent::SignalStrengthUpdate {
614 device_id: device_id.to_string(),
615 rssi: rssi_val,
616 quality: aranet_core::messages::SignalQuality::from_rssi(rssi_val),
617 })
618 .await;
619 }
620
621 if let Some(settings) = settings
623 && let Err(e) = self
624 .event_tx
625 .send(SensorEvent::SettingsLoaded {
626 device_id: device_id.to_string(),
627 settings,
628 })
629 .await
630 {
631 error!("Failed to send SettingsLoaded event: {}", e);
632 }
633
634 if let Some(reading) = reading {
635 info!(device_id, "Reading refreshed successfully");
636
637 self.save_reading(device_id, &reading);
639
640 if let Err(e) = self
641 .event_tx
642 .send(SensorEvent::ReadingUpdated {
643 device_id: device_id.to_string(),
644 reading,
645 })
646 .await
647 {
648 error!("Failed to send ReadingUpdated event: {}", e);
649 }
650 } else {
651 warn!(device_id, "Connected but failed to read current values");
652 let context = ErrorContext::transient(
653 "Failed to read current values",
654 "Device connected but returned no data. Try again.",
655 );
656 if let Err(e) = self
657 .event_tx
658 .send(SensorEvent::ReadingError {
659 device_id: device_id.to_string(),
660 error: "Failed to read current values".to_string(),
661 context: Some(context),
662 })
663 .await
664 {
665 error!("Failed to send ReadingError event: {}", e);
666 }
667 }
668 }
669 Err(e) => {
670 error!(device_id, error = %e, "Failed to refresh reading after retries");
671 let context = ErrorContext::from_error(&e);
672 if let Err(send_err) = self
673 .event_tx
674 .send(SensorEvent::ReadingError {
675 device_id: device_id.to_string(),
676 error: e.to_string(),
677 context: Some(context),
678 })
679 .await
680 {
681 error!("Failed to send ReadingError event: {}", send_err);
682 }
683 }
684 }
685 }
686
687 async fn handle_refresh_all(&self) {
692 info!("Refreshing all devices");
693
694 let Some(store) = self.open_store() else {
696 return;
697 };
698
699 let devices = match store.list_devices() {
700 Ok(devices) => devices,
701 Err(e) => {
702 warn!("Failed to list devices for refresh all: {}", e);
703 return;
704 }
705 };
706
707 for device in devices {
709 self.handle_refresh_reading(&device.id).await;
710 }
711
712 info!("Completed refreshing all devices");
713 }
714
715 async fn handle_set_interval(&self, device_id: &str, interval_secs: u16) {
720 info!(device_id, interval_secs, "Setting measurement interval");
721
722 let interval = match MeasurementInterval::from_seconds(interval_secs) {
724 Some(i) => i,
725 None => {
726 let error = format!(
727 "Invalid interval: {} seconds. Must be 60, 120, 300, or 600.",
728 interval_secs
729 );
730 error!(device_id, %error, "Invalid interval value");
731 let context = ErrorContext::permanent(&error);
732 let _ = self
733 .event_tx
734 .send(SensorEvent::IntervalError {
735 device_id: device_id.to_string(),
736 error,
737 context: Some(context),
738 })
739 .await;
740 return;
741 }
742 };
743
744 let retry_config = RetryConfig::for_connect();
746 let device_id_owned = device_id.to_string();
747 let config = self.connection_config.clone();
748
749 let device = match with_retry(&retry_config, "connect_for_interval", || {
750 let device_id = device_id_owned.clone();
751 let config = config.clone();
752 async move { Device::connect_with_config(&device_id, config).await }
753 })
754 .await
755 {
756 Ok(d) => d,
757 Err(e) => {
758 error!(device_id, error = %e, "Failed to connect for set interval");
759 let context = ErrorContext::from_error(&e);
760 let _ = self
761 .event_tx
762 .send(SensorEvent::IntervalError {
763 device_id: device_id.to_string(),
764 error: e.to_string(),
765 context: Some(context),
766 })
767 .await;
768 return;
769 }
770 };
771
772 let retry_config = RetryConfig::for_write();
774 if let Err(e) = with_retry(&retry_config, "set_interval", || async {
775 device.set_interval(interval).await
776 })
777 .await
778 {
779 error!(device_id, error = %e, "Failed to set interval");
780 let _ = device.disconnect().await;
781 let context = ErrorContext::from_error(&e);
782 let _ = self
783 .event_tx
784 .send(SensorEvent::IntervalError {
785 device_id: device_id.to_string(),
786 error: e.to_string(),
787 context: Some(context),
788 })
789 .await;
790 return;
791 }
792
793 if let Err(e) = device.disconnect().await {
795 warn!(device_id, error = %e, "Failed to disconnect after setting interval");
796 }
797
798 info!(
799 device_id,
800 interval_secs, "Measurement interval set successfully"
801 );
802
803 if let Err(e) = self
805 .event_tx
806 .send(SensorEvent::IntervalChanged {
807 device_id: device_id.to_string(),
808 interval_secs,
809 })
810 .await
811 {
812 error!("Failed to send IntervalChanged event: {}", e);
813 }
814 }
815
816 async fn handle_set_bluetooth_range(&self, device_id: &str, extended: bool) {
818 let range_name = if extended { "Extended" } else { "Standard" };
819 info!(device_id, range_name, "Setting Bluetooth range");
820
821 let retry_config = RetryConfig::for_connect();
823 let device_id_owned = device_id.to_string();
824 let config = self.connection_config.clone();
825
826 let device = match with_retry(&retry_config, "connect_for_bt_range", || {
827 let device_id = device_id_owned.clone();
828 let config = config.clone();
829 async move { Device::connect_with_config(&device_id, config).await }
830 })
831 .await
832 {
833 Ok(d) => d,
834 Err(e) => {
835 error!(device_id, error = %e, "Failed to connect for set Bluetooth range");
836 let context = ErrorContext::from_error(&e);
837 let _ = self
838 .event_tx
839 .send(SensorEvent::BluetoothRangeError {
840 device_id: device_id.to_string(),
841 error: e.to_string(),
842 context: Some(context),
843 })
844 .await;
845 return;
846 }
847 };
848
849 let range = if extended {
851 BluetoothRange::Extended
852 } else {
853 BluetoothRange::Standard
854 };
855
856 let retry_config = RetryConfig::for_write();
858 if let Err(e) = with_retry(&retry_config, "set_bt_range", || async {
859 device.set_bluetooth_range(range).await
860 })
861 .await
862 {
863 error!(device_id, error = %e, "Failed to set Bluetooth range");
864 let _ = device.disconnect().await;
865 let context = ErrorContext::from_error(&e);
866 let _ = self
867 .event_tx
868 .send(SensorEvent::BluetoothRangeError {
869 device_id: device_id.to_string(),
870 error: e.to_string(),
871 context: Some(context),
872 })
873 .await;
874 return;
875 }
876
877 if let Err(e) = device.disconnect().await {
879 warn!(device_id, error = %e, "Failed to disconnect after setting Bluetooth range");
880 }
881
882 info!(device_id, range_name, "Bluetooth range set successfully");
883
884 if let Err(e) = self
886 .event_tx
887 .send(SensorEvent::BluetoothRangeChanged {
888 device_id: device_id.to_string(),
889 extended,
890 })
891 .await
892 {
893 error!("Failed to send BluetoothRangeChanged event: {}", e);
894 }
895 }
896
897 async fn handle_set_smart_home(&self, device_id: &str, enabled: bool) {
899 let mode = if enabled { "enabled" } else { "disabled" };
900 info!(device_id, mode, "Setting Smart Home");
901
902 let retry_config = RetryConfig::for_connect();
904 let device_id_owned = device_id.to_string();
905 let config = self.connection_config.clone();
906
907 let device = match with_retry(&retry_config, "connect_for_smart_home", || {
908 let device_id = device_id_owned.clone();
909 let config = config.clone();
910 async move { Device::connect_with_config(&device_id, config).await }
911 })
912 .await
913 {
914 Ok(d) => d,
915 Err(e) => {
916 error!(device_id, error = %e, "Failed to connect for set Smart Home");
917 let context = ErrorContext::from_error(&e);
918 let _ = self
919 .event_tx
920 .send(SensorEvent::SmartHomeError {
921 device_id: device_id.to_string(),
922 error: e.to_string(),
923 context: Some(context),
924 })
925 .await;
926 return;
927 }
928 };
929
930 let retry_config = RetryConfig::for_write();
932 if let Err(e) = with_retry(&retry_config, "set_smart_home", || async {
933 device.set_smart_home(enabled).await
934 })
935 .await
936 {
937 error!(device_id, error = %e, "Failed to set Smart Home");
938 let _ = device.disconnect().await;
939 let context = ErrorContext::from_error(&e);
940 let _ = self
941 .event_tx
942 .send(SensorEvent::SmartHomeError {
943 device_id: device_id.to_string(),
944 error: e.to_string(),
945 context: Some(context),
946 })
947 .await;
948 return;
949 }
950
951 if let Err(e) = device.disconnect().await {
953 warn!(device_id, error = %e, "Failed to disconnect after setting Smart Home");
954 }
955
956 info!(device_id, mode, "Smart Home set successfully");
957
958 if let Err(e) = self
960 .event_tx
961 .send(SensorEvent::SmartHomeChanged {
962 device_id: device_id.to_string(),
963 enabled,
964 })
965 .await
966 {
967 error!("Failed to send SmartHomeChanged event: {}", e);
968 }
969 }
970
971 async fn connect_and_read_with_config(
979 device_id: &str,
980 config: ConnectionConfig,
981 ) -> Result<
982 (
983 Option<String>,
984 Option<DeviceType>,
985 Option<CurrentReading>,
986 Option<DeviceSettings>,
987 Option<i16>,
988 Option<SignalQuality>,
989 ),
990 aranet_core::Error,
991 > {
992 let device = Device::connect_with_config(device_id, config).await?;
993
994 if !device.validate_connection().await {
996 warn!(
997 device_id,
998 "Connection validation failed - device may be out of range"
999 );
1000 let _ = device.disconnect().await;
1001 return Err(aranet_core::Error::NotConnected);
1002 }
1003 debug!(device_id, "Connection validated successfully");
1004
1005 let name = device.name().map(String::from);
1006 let device_type = device.device_type();
1007
1008 let rssi = device.read_rssi().await.ok();
1010 let signal_quality = rssi.map(SignalQuality::from_rssi);
1011
1012 if let Some(quality) = signal_quality {
1013 debug!(device_id, ?quality, rssi = ?rssi, "Signal quality assessed");
1014 }
1015
1016 if let Some(quality) = signal_quality {
1018 let delay = quality.recommended_read_delay();
1019 if delay > Duration::from_millis(50) {
1020 debug!(device_id, ?delay, "Adding read delay for signal quality");
1021 tokio::time::sleep(delay).await;
1022 }
1023 }
1024
1025 let reading = match device.read_current().await {
1027 Ok(reading) => {
1028 info!(device_id, "Read current values successfully");
1029 Some(reading)
1030 }
1031 Err(e) => {
1032 warn!(device_id, error = %e, "Failed to read current values");
1033 None
1034 }
1035 };
1036
1037 let settings = match device.get_settings().await {
1039 Ok(settings) => {
1040 info!(device_id, ?settings, "Read device settings successfully");
1041 Some(settings)
1042 }
1043 Err(e) => {
1044 warn!(device_id, error = %e, "Failed to read device settings");
1045 None
1046 }
1047 };
1048
1049 if let Err(e) = device.disconnect().await {
1051 warn!(device_id, error = %e, "Failed to disconnect from device");
1052 }
1053
1054 Ok((name, device_type, reading, settings, rssi, signal_quality))
1055 }
1056
1057 #[allow(dead_code)]
1062 async fn connect_and_read(
1063 &self,
1064 device_id: &str,
1065 ) -> Result<
1066 (
1067 Option<String>,
1068 Option<DeviceType>,
1069 Option<CurrentReading>,
1070 Option<DeviceSettings>,
1071 Option<i16>,
1072 ),
1073 aranet_core::Error,
1074 > {
1075 let (name, device_type, reading, settings, rssi, _signal_quality) =
1076 Self::connect_and_read_with_config(device_id, self.connection_config.clone()).await?;
1077 Ok((name, device_type, reading, settings, rssi))
1078 }
1079
1080 fn save_reading(&self, device_id: &str, reading: &CurrentReading) {
1082 let Some(store) = self.open_store() else {
1083 return;
1084 };
1085
1086 if let Err(e) = store.insert_reading(device_id, reading) {
1087 warn!(device_id, error = %e, "Failed to save reading to store");
1088 } else {
1089 debug!(device_id, "Reading saved to store");
1090 }
1091 }
1092
1093 fn save_discovered_devices(&self, devices: &[aranet_core::DiscoveredDevice]) {
1095 let Some(store) = self.open_store() else {
1096 return;
1097 };
1098
1099 for device in devices {
1100 let device_id = device.id.to_string();
1101 if let Err(e) = store.upsert_device(&device_id, device.name.as_deref()) {
1103 warn!(device_id, error = %e, "Failed to upsert device");
1104 continue;
1105 }
1106 if let Some(device_type) = device.device_type
1108 && let Err(e) = store.update_device_metadata(&device_id, None, Some(device_type))
1109 {
1110 warn!(device_id, error = %e, "Failed to update device metadata");
1111 }
1112 }
1113
1114 debug!(count = devices.len(), "Saved discovered devices to store");
1115 }
1116
1117 fn update_device_metadata(
1119 &self,
1120 device_id: &str,
1121 name: Option<&str>,
1122 device_type: Option<DeviceType>,
1123 ) {
1124 let Some(store) = self.open_store() else {
1125 return;
1126 };
1127
1128 if let Err(e) = store.upsert_device(device_id, name) {
1130 warn!(device_id, error = %e, "Failed to upsert device");
1131 return;
1132 }
1133
1134 if let Err(e) = store.update_device_metadata(device_id, name, device_type) {
1136 warn!(device_id, error = %e, "Failed to update device metadata");
1137 } else {
1138 debug!(device_id, ?name, ?device_type, "Device metadata updated");
1139 }
1140 }
1141
1142 async fn load_and_send_history(&self, device_id: &str) {
1144 let Some(store) = self.open_store() else {
1145 return;
1146 };
1147
1148 use aranet_store::HistoryQuery;
1151 let query = HistoryQuery::new().device(device_id).oldest_first(); match store.query_history(&query) {
1154 Ok(stored_records) => {
1155 let records: Vec<aranet_types::HistoryRecord> = stored_records
1157 .into_iter()
1158 .map(|r| aranet_types::HistoryRecord {
1159 timestamp: r.timestamp,
1160 co2: r.co2,
1161 temperature: r.temperature,
1162 pressure: r.pressure,
1163 humidity: r.humidity,
1164 radon: r.radon,
1165 radiation_rate: r.radiation_rate,
1166 radiation_total: r.radiation_total,
1167 })
1168 .collect();
1169
1170 info!(
1171 device_id,
1172 count = records.len(),
1173 "Loaded history from store"
1174 );
1175
1176 if let Err(e) = self
1177 .event_tx
1178 .send(SensorEvent::HistoryLoaded {
1179 device_id: device_id.to_string(),
1180 records,
1181 })
1182 .await
1183 {
1184 error!("Failed to send HistoryLoaded event: {}", e);
1185 }
1186 }
1187 Err(e) => {
1188 warn!(device_id, error = %e, "Failed to query history from store");
1189 }
1190 }
1191 }
1192
1193 async fn handle_sync_history(&self, device_id: &str) {
1198 use aranet_core::history::HistoryOptions;
1199
1200 info!(device_id, "Syncing history from device");
1201
1202 let Some(store) = self.open_store() else {
1204 let context = ErrorContext::permanent("Failed to open local database");
1205 let _ = self
1206 .event_tx
1207 .send(SensorEvent::HistorySyncError {
1208 device_id: device_id.to_string(),
1209 error: "Failed to open store".to_string(),
1210 context: Some(context),
1211 })
1212 .await;
1213 return;
1214 };
1215
1216 let retry_config = RetryConfig::for_connect();
1218 let device_id_owned = device_id.to_string();
1219 let config = self.connection_config.clone();
1220
1221 let device = match with_retry(&retry_config, "connect_for_history", || {
1222 let device_id = device_id_owned.clone();
1223 let config = config.clone();
1224 async move { Device::connect_with_config(&device_id, config).await }
1225 })
1226 .await
1227 {
1228 Ok(d) => d,
1229 Err(e) => {
1230 error!(device_id, error = %e, "Failed to connect for history sync");
1231 let context = ErrorContext::from_error(&e);
1232 let _ = self
1233 .event_tx
1234 .send(SensorEvent::HistorySyncError {
1235 device_id: device_id.to_string(),
1236 error: e.to_string(),
1237 context: Some(context),
1238 })
1239 .await;
1240 return;
1241 }
1242 };
1243
1244 if !device.validate_connection().await {
1246 warn!(device_id, "Connection validation failed for history sync");
1247 let _ = device.disconnect().await;
1248 let context = ErrorContext::transient(
1249 "Connection validation failed",
1250 "Device connected but is not responding. Try moving closer.",
1251 );
1252 let _ = self
1253 .event_tx
1254 .send(SensorEvent::HistorySyncError {
1255 device_id: device_id.to_string(),
1256 error: "Connection validation failed".to_string(),
1257 context: Some(context),
1258 })
1259 .await;
1260 return;
1261 }
1262
1263 let history_info = match device.get_history_info().await {
1265 Ok(info) => info,
1266 Err(e) => {
1267 error!(device_id, error = %e, "Failed to get history info");
1268 let _ = device.disconnect().await;
1269 let context = ErrorContext::from_error(&e);
1270 let _ = self
1271 .event_tx
1272 .send(SensorEvent::HistorySyncError {
1273 device_id: device_id.to_string(),
1274 error: e.to_string(),
1275 context: Some(context),
1276 })
1277 .await;
1278 return;
1279 }
1280 };
1281
1282 let total_on_device = history_info.total_readings;
1283
1284 let start_index = match store.calculate_sync_start(device_id, total_on_device) {
1286 Ok(idx) => idx,
1287 Err(e) => {
1288 warn!(device_id, error = %e, "Failed to calculate sync start, doing full sync");
1289 1u16
1290 }
1291 };
1292
1293 if start_index > total_on_device {
1295 info!(device_id, "Already up to date, no new readings to sync");
1296 let _ = device.disconnect().await;
1297 let _ = self
1298 .event_tx
1299 .send(SensorEvent::HistorySynced {
1300 device_id: device_id.to_string(),
1301 count: 0,
1302 })
1303 .await;
1304 self.load_and_send_history(device_id).await;
1306 return;
1307 }
1308
1309 let records_to_download = total_on_device.saturating_sub(start_index) + 1;
1310 info!(
1311 device_id,
1312 start_index,
1313 total_on_device,
1314 records_to_download,
1315 "Downloading history (incremental sync)"
1316 );
1317
1318 if let Err(e) = self
1320 .event_tx
1321 .send(SensorEvent::HistorySyncStarted {
1322 device_id: device_id.to_string(),
1323 total_records: Some(records_to_download),
1324 })
1325 .await
1326 {
1327 error!("Failed to send HistorySyncStarted event: {}", e);
1328 }
1329
1330 let signal_quality = self
1333 .signal_quality_cache
1334 .read()
1335 .await
1336 .get(device_id)
1337 .copied();
1338 let read_delay = signal_quality
1339 .map(|q| q.recommended_read_delay())
1340 .unwrap_or(Duration::from_millis(50));
1341
1342 let history_options = HistoryOptions {
1343 start_index: Some(start_index),
1344 end_index: None, read_delay,
1346 use_adaptive_delay: true, ..Default::default()
1348 };
1349
1350 let event_tx = self.event_tx.clone();
1352 let device_id_for_progress = device_id.to_string();
1353 let total = records_to_download as usize;
1354
1355 let last_progress_update = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1357 let last_progress_clone = last_progress_update.clone();
1358
1359 let progress_task = {
1361 let event_tx = event_tx.clone();
1362 let device_id = device_id_for_progress.clone();
1363 tokio::spawn(async move {
1364 let mut interval = interval(Duration::from_millis(500));
1365 loop {
1366 interval.tick().await;
1367 let downloaded = last_progress_clone.load(std::sync::atomic::Ordering::Relaxed);
1368 if downloaded > 0 && downloaded < total {
1369 let _ = event_tx
1370 .send(SensorEvent::HistorySyncProgress {
1371 device_id: device_id.clone(),
1372 downloaded,
1373 total,
1374 })
1375 .await;
1376 }
1377 if downloaded >= total {
1378 break;
1379 }
1380 }
1381 })
1382 };
1383
1384 let cancel_token = self.cancel_token.clone();
1386
1387 let retry_config = RetryConfig::for_history();
1389 let download_future = with_retry(&retry_config, "download_history", || {
1390 let options = history_options.clone();
1391 let progress = last_progress_update.clone();
1392 let device = &device;
1393 async move {
1394 let records = device.download_history_with_options(options).await?;
1395 progress.store(records.len(), std::sync::atomic::Ordering::Relaxed);
1396 Ok(records)
1397 }
1398 });
1399
1400 let download_result = tokio::select! {
1402 result = download_future => result,
1403 _ = cancel_token.cancelled() => {
1404 progress_task.abort();
1405 info!(device_id, "History sync cancelled by user");
1406 let _ = device.disconnect().await;
1407 let _ = self
1408 .event_tx
1409 .send(SensorEvent::OperationCancelled {
1410 operation: format!("History sync for {}", device_id),
1411 })
1412 .await;
1413 return;
1414 }
1415 };
1416
1417 let records = match download_result {
1418 Ok(r) => {
1419 progress_task.abort();
1420 r
1421 }
1422 Err(e) => {
1423 progress_task.abort();
1424 error!(device_id, error = %e, "Failed to download history");
1425 let _ = device.disconnect().await;
1426 let context = ErrorContext::from_error(&e);
1427 let _ = self
1428 .event_tx
1429 .send(SensorEvent::HistorySyncError {
1430 device_id: device_id.to_string(),
1431 error: e.to_string(),
1432 context: Some(context),
1433 })
1434 .await;
1435 return;
1436 }
1437 };
1438
1439 let record_count = records.len();
1440 info!(
1441 device_id,
1442 count = record_count,
1443 "Downloaded history from device"
1444 );
1445
1446 let _ = self
1448 .event_tx
1449 .send(SensorEvent::HistorySyncProgress {
1450 device_id: device_id.to_string(),
1451 downloaded: record_count,
1452 total,
1453 })
1454 .await;
1455
1456 let _ = device.disconnect().await;
1458
1459 match store.insert_history(device_id, &records) {
1462 Ok(inserted) => {
1463 debug!(
1464 device_id,
1465 downloaded = record_count,
1466 inserted,
1467 "History saved to store"
1468 );
1469
1470 if let Err(e) = store.update_sync_state(device_id, total_on_device, total_on_device)
1472 {
1473 warn!(device_id, error = %e, "Failed to update sync state");
1474 }
1475 }
1476 Err(e) => {
1477 warn!(device_id, error = %e, "Failed to save history to store - sync state not updated");
1478 }
1479 }
1480
1481 if let Err(e) = self
1483 .event_tx
1484 .send(SensorEvent::HistorySynced {
1485 device_id: device_id.to_string(),
1486 count: record_count,
1487 })
1488 .await
1489 {
1490 error!("Failed to send HistorySynced event: {}", e);
1491 }
1492
1493 self.load_and_send_history(device_id).await;
1495 }
1496
1497 async fn handle_refresh_service_status(&self) {
1499 info!("Refreshing service status");
1500
1501 let Some(ref client) = self.service_client else {
1502 let _ = self
1503 .event_tx
1504 .send(SensorEvent::ServiceStatusError {
1505 error: "Service client not available".to_string(),
1506 })
1507 .await;
1508 return;
1509 };
1510
1511 match client.status().await {
1512 Ok(status) => {
1513 let devices: Vec<ServiceDeviceStats> = status
1515 .devices
1516 .into_iter()
1517 .map(|d| ServiceDeviceStats {
1518 device_id: d.device_id,
1519 alias: d.alias,
1520 poll_interval: d.poll_interval,
1521 polling: d.polling,
1522 success_count: d.success_count,
1523 failure_count: d.failure_count,
1524 last_poll_at: d.last_poll_at,
1525 last_error: d.last_error,
1526 })
1527 .collect();
1528
1529 let _ = self
1530 .event_tx
1531 .send(SensorEvent::ServiceStatusRefreshed {
1532 reachable: true,
1533 collector_running: status.collector.running,
1534 uptime_seconds: status.collector.uptime_seconds,
1535 devices,
1536 })
1537 .await;
1538 }
1539 Err(e) => {
1540 let (reachable, error_msg) = match &e {
1542 aranet_core::service_client::ServiceClientError::NotReachable { .. } => {
1543 (false, "Service not reachable".to_string())
1544 }
1545 _ => (false, e.to_string()),
1546 };
1547
1548 if reachable {
1549 let _ = self
1550 .event_tx
1551 .send(SensorEvent::ServiceStatusError { error: error_msg })
1552 .await;
1553 } else {
1554 let _ = self
1556 .event_tx
1557 .send(SensorEvent::ServiceStatusRefreshed {
1558 reachable: false,
1559 collector_running: false,
1560 uptime_seconds: None,
1561 devices: vec![],
1562 })
1563 .await;
1564 }
1565 }
1566 }
1567 }
1568
1569 async fn handle_start_service_collector(&self) {
1571 info!("Starting service collector");
1572
1573 let Some(ref client) = self.service_client else {
1574 let _ = self
1575 .event_tx
1576 .send(SensorEvent::ServiceCollectorError {
1577 error: "Service client not available".to_string(),
1578 })
1579 .await;
1580 return;
1581 };
1582
1583 match client.start_collector().await {
1584 Ok(_) => {
1585 let _ = self
1586 .event_tx
1587 .send(SensorEvent::ServiceCollectorStarted)
1588 .await;
1589 self.handle_refresh_service_status().await;
1591 }
1592 Err(e) => {
1593 let _ = self
1594 .event_tx
1595 .send(SensorEvent::ServiceCollectorError {
1596 error: e.to_string(),
1597 })
1598 .await;
1599 }
1600 }
1601 }
1602
1603 async fn handle_stop_service_collector(&self) {
1605 info!("Stopping service collector");
1606
1607 let Some(ref client) = self.service_client else {
1608 let _ = self
1609 .event_tx
1610 .send(SensorEvent::ServiceCollectorError {
1611 error: "Service client not available".to_string(),
1612 })
1613 .await;
1614 return;
1615 };
1616
1617 match client.stop_collector().await {
1618 Ok(_) => {
1619 let _ = self
1620 .event_tx
1621 .send(SensorEvent::ServiceCollectorStopped)
1622 .await;
1623 self.handle_refresh_service_status().await;
1625 }
1626 Err(e) => {
1627 let _ = self
1628 .event_tx
1629 .send(SensorEvent::ServiceCollectorError {
1630 error: e.to_string(),
1631 })
1632 .await;
1633 }
1634 }
1635 }
1636
1637 async fn handle_set_alias(&self, device_id: &str, alias: Option<String>) {
1638 info!("Setting alias for device {} to {:?}", device_id, alias);
1639
1640 let Some(store) = self.open_store() else {
1641 let _ = self
1642 .event_tx
1643 .send(SensorEvent::AliasError {
1644 device_id: device_id.to_string(),
1645 error: "Could not open database".to_string(),
1646 })
1647 .await;
1648 return;
1649 };
1650
1651 match store.update_device_metadata(device_id, alias.as_deref(), None) {
1652 Ok(()) => {
1653 info!("Alias updated successfully for {}", device_id);
1654 let _ = self
1655 .event_tx
1656 .send(SensorEvent::AliasChanged {
1657 device_id: device_id.to_string(),
1658 alias,
1659 })
1660 .await;
1661 }
1662 Err(e) => {
1663 let _ = self
1664 .event_tx
1665 .send(SensorEvent::AliasError {
1666 device_id: device_id.to_string(),
1667 error: e.to_string(),
1668 })
1669 .await;
1670 }
1671 }
1672 }
1673
1674 async fn handle_start_background_polling(&self, device_id: &str, interval_secs: u64) {
1680 info!(device_id, interval_secs, "Starting background polling");
1681
1682 {
1684 let polling = self.background_polling.read().await;
1685 if polling.contains_key(device_id) {
1686 warn!(device_id, "Background polling already active for device");
1687 return;
1688 }
1689 }
1690
1691 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
1693
1694 {
1696 let mut polling = self.background_polling.write().await;
1697 polling.insert(device_id.to_string(), cancel_tx);
1698 }
1699
1700 let device_id_owned = device_id.to_string();
1702 let event_tx = self.event_tx.clone();
1703 let config = self.connection_config.clone();
1704 let signal_quality_cache = self.signal_quality_cache.clone();
1705 let store_path = self.store_path.clone();
1706 let polling_interval = Duration::from_secs(interval_secs);
1707
1708 let _ = event_tx
1710 .send(SensorEvent::BackgroundPollingStarted {
1711 device_id: device_id.to_string(),
1712 interval_secs,
1713 })
1714 .await;
1715
1716 tokio::spawn(async move {
1718 let mut interval_timer = interval(polling_interval);
1719 interval_timer.tick().await;
1721
1722 loop {
1723 tokio::select! {
1724 _ = cancel_rx.changed() => {
1725 if *cancel_rx.borrow() {
1726 info!(device_id = %device_id_owned, "Background polling cancelled");
1727 break;
1728 }
1729 }
1730 _ = interval_timer.tick() => {
1731 debug!(device_id = %device_id_owned, "Background poll tick");
1732
1733 let signal_quality = signal_quality_cache.read().await.get(&device_id_owned).copied();
1735
1736 let retry_config = match signal_quality {
1738 Some(SignalQuality::Poor) | Some(SignalQuality::Fair) => {
1739 RetryConfig::aggressive()
1740 }
1741 _ => RetryConfig::for_read(),
1742 };
1743
1744 match with_retry(&retry_config, "background_poll", || {
1746 let device_id = device_id_owned.clone();
1747 let config = config.clone();
1748 async move {
1749 Self::connect_and_read_with_config(&device_id, config).await
1750 }
1751 })
1752 .await
1753 {
1754 Ok((_, _, reading, _, rssi, new_signal_quality)) => {
1755 if let Some(quality) = new_signal_quality {
1757 signal_quality_cache
1758 .write()
1759 .await
1760 .insert(device_id_owned.clone(), quality);
1761 }
1762
1763 if let Some(rssi_val) = rssi {
1765 let _ = event_tx
1766 .send(SensorEvent::SignalStrengthUpdate {
1767 device_id: device_id_owned.clone(),
1768 rssi: rssi_val,
1769 quality: aranet_core::messages::SignalQuality::from_rssi(rssi_val),
1770 })
1771 .await;
1772 }
1773
1774 if let Some(reading) = reading {
1775 debug!(device_id = %device_id_owned, "Background poll successful");
1776
1777 if let Ok(store) = Store::open(&store_path)
1779 && let Err(e) = store.insert_reading(&device_id_owned, &reading)
1780 {
1781 warn!(device_id = %device_id_owned, error = %e, "Failed to save background reading to store");
1782 }
1783
1784 let _ = event_tx
1786 .send(SensorEvent::ReadingUpdated {
1787 device_id: device_id_owned.clone(),
1788 reading,
1789 })
1790 .await;
1791 }
1792 }
1793 Err(e) => {
1794 warn!(device_id = %device_id_owned, error = %e, "Background poll failed");
1795 let context = ErrorContext::from_error(&e);
1796 let _ = event_tx
1797 .send(SensorEvent::ReadingError {
1798 device_id: device_id_owned.clone(),
1799 error: e.to_string(),
1800 context: Some(context),
1801 })
1802 .await;
1803 }
1804 }
1805 }
1806 }
1807 }
1808
1809 let _ = event_tx
1811 .send(SensorEvent::BackgroundPollingStopped {
1812 device_id: device_id_owned,
1813 })
1814 .await;
1815 });
1816
1817 info!(device_id, "Background polling task spawned");
1818 }
1819
1820 async fn handle_cancel_operation(&mut self) {
1825 info!("Cancelling current operation");
1826
1827 self.cancel_token.cancel();
1829
1830 self.cancel_token = CancellationToken::new();
1832
1833 let _ = self
1835 .event_tx
1836 .send(SensorEvent::OperationCancelled {
1837 operation: "Current operation".to_string(),
1838 })
1839 .await;
1840 }
1841
1842 async fn handle_forget_device(&self, device_id: &str) {
1844 info!(device_id, "Forgetting device");
1845
1846 {
1848 let mut polling = self.background_polling.write().await;
1849 if let Some(cancel_tx) = polling.remove(device_id) {
1850 let _ = cancel_tx.send(true);
1851 info!(device_id, "Stopped background polling for forgotten device");
1852 }
1853 }
1854
1855 {
1857 let mut cache = self.signal_quality_cache.write().await;
1858 cache.remove(device_id);
1859 }
1860
1861 let Some(store) = self.open_store() else {
1863 let _ = self
1864 .event_tx
1865 .send(SensorEvent::ForgetDeviceError {
1866 device_id: device_id.to_string(),
1867 error: "Could not open database".to_string(),
1868 })
1869 .await;
1870 return;
1871 };
1872
1873 match store.delete_device(device_id) {
1874 Ok(deleted) => {
1875 if deleted {
1876 info!(device_id, "Device forgotten (deleted from store)");
1877 } else {
1878 info!(
1879 device_id,
1880 "Device not found in store (removing from UI only)"
1881 );
1882 }
1883 let _ = self
1884 .event_tx
1885 .send(SensorEvent::DeviceForgotten {
1886 device_id: device_id.to_string(),
1887 })
1888 .await;
1889 }
1890 Err(e) => {
1891 error!(device_id, error = %e, "Failed to forget device");
1892 let _ = self
1893 .event_tx
1894 .send(SensorEvent::ForgetDeviceError {
1895 device_id: device_id.to_string(),
1896 error: e.to_string(),
1897 })
1898 .await;
1899 }
1900 }
1901 }
1902
1903 async fn handle_stop_background_polling(&self, device_id: &str) {
1905 info!(device_id, "Stopping background polling");
1906
1907 let mut polling = self.background_polling.write().await;
1908 if let Some(cancel_tx) = polling.remove(device_id) {
1909 let _ = cancel_tx.send(true);
1911 info!(device_id, "Background polling stop signal sent");
1912 } else {
1913 warn!(device_id, "No active background polling found for device");
1914 }
1915 }
1916}