1use std::collections::HashMap;
19use std::path::PathBuf;
20use std::sync::Arc;
21use std::time::Duration;
22
23async fn disconnect_quietly(device: &aranet_core::Device) {
25 if let Err(e) = device.disconnect().await {
26 tracing::debug!("Failed to disconnect device: {e}");
27 }
28}
29
30use aranet_core::device::{ConnectionConfig, SignalQuality};
31use aranet_core::messages::{ErrorContext, ServiceDeviceStats};
32use aranet_core::service_client::ServiceClient;
33use aranet_core::settings::{DeviceSettings, MeasurementInterval};
34use aranet_core::{
35 BluetoothRange, Device, RetryConfig, ScanOptions, scan::scan_with_options, with_retry,
36};
37use aranet_store::Store;
38use aranet_types::{CurrentReading, DeviceType};
39use tokio::sync::{RwLock, mpsc};
40use tokio::time::interval;
41use tokio_util::sync::CancellationToken;
42use tracing::{debug, error, info, warn};
43
44use super::messages::{CachedDevice, Command, SensorEvent};
45
46pub struct SensorWorker {
56 command_rx: mpsc::Receiver<Command>,
58 event_tx: mpsc::Sender<SensorEvent>,
60 store_path: PathBuf,
62 service_client: Option<ServiceClient>,
64 connection_config: ConnectionConfig,
66 background_polling: Arc<RwLock<HashMap<String, tokio::sync::watch::Sender<bool>>>>,
69 signal_quality_cache: Arc<RwLock<HashMap<String, SignalQuality>>>,
71 cancel_token: CancellationToken,
74}
75
76const DEFAULT_SERVICE_URL: &str = "http://localhost:8080";
78
79impl SensorWorker {
80 pub fn new(
88 command_rx: mpsc::Receiver<Command>,
89 event_tx: mpsc::Sender<SensorEvent>,
90 store_path: PathBuf,
91 ) -> Self {
92 Self::with_service_config(command_rx, event_tx, store_path, DEFAULT_SERVICE_URL, None)
93 }
94
95 pub fn with_service_config(
97 command_rx: mpsc::Receiver<Command>,
98 event_tx: mpsc::Sender<SensorEvent>,
99 store_path: PathBuf,
100 service_url: &str,
101 service_api_key: Option<String>,
102 ) -> Self {
103 let service_client = ServiceClient::new_with_api_key(service_url, service_api_key).ok();
105
106 let connection_config = ConnectionConfig::for_current_platform();
108 info!(
109 ?connection_config,
110 "Using platform-optimized connection config"
111 );
112
113 Self {
114 command_rx,
115 event_tx,
116 store_path,
117 service_client,
118 connection_config,
119 background_polling: Arc::new(RwLock::new(HashMap::new())),
120 signal_quality_cache: Arc::new(RwLock::new(HashMap::new())),
121 cancel_token: CancellationToken::new(),
122 }
123 }
124
125 fn open_store(&self) -> Option<Store> {
129 match Store::open(&self.store_path) {
130 Ok(store) => Some(store),
131 Err(e) => {
132 warn!(error = %e, "Failed to open store");
133 None
134 }
135 }
136 }
137
138 pub async fn run(mut self) {
143 info!("SensorWorker started");
144
145 loop {
146 tokio::select! {
147 cmd = self.command_rx.recv() => {
149 match cmd {
150 Some(Command::Shutdown) => {
151 info!("SensorWorker received shutdown command");
152 break;
153 }
154 Some(cmd) => {
155 self.handle_command(cmd).await;
156 }
157 None => {
158 info!("Command channel closed, shutting down worker");
159 break;
160 }
161 }
162 }
163 }
164 }
165
166 info!("SensorWorker stopped");
167 }
168
169 async fn handle_command(&mut self, cmd: Command) {
171 info!(?cmd, "Handling command");
172
173 match cmd {
174 Command::LoadCachedData => {
175 self.handle_load_cached_data().await;
176 }
177 Command::Scan { duration } => {
178 self.handle_scan(duration).await;
179 }
180 Command::Connect { device_id } => {
181 self.handle_connect(&device_id).await;
182 }
183 Command::Disconnect { device_id } => {
184 self.handle_disconnect(&device_id).await;
185 }
186 Command::RefreshReading { device_id } => {
187 self.handle_refresh_reading(&device_id).await;
188 }
189 Command::RefreshAll => {
190 self.handle_refresh_all().await;
191 }
192 Command::SyncHistory { device_id } => {
193 self.handle_sync_history(&device_id).await;
194 }
195 Command::SetInterval {
196 device_id,
197 interval_secs,
198 } => {
199 self.handle_set_interval(&device_id, interval_secs).await;
200 }
201 Command::SetBluetoothRange {
202 device_id,
203 extended,
204 } => {
205 self.handle_set_bluetooth_range(&device_id, extended).await;
206 }
207 Command::SetSmartHome { device_id, enabled } => {
208 self.handle_set_smart_home(&device_id, enabled).await;
209 }
210 Command::RefreshServiceStatus => {
211 self.handle_refresh_service_status().await;
212 }
213 Command::StartServiceCollector => {
214 self.handle_start_service_collector().await;
215 }
216 Command::StopServiceCollector => {
217 self.handle_stop_service_collector().await;
218 }
219 Command::SetAlias { device_id, alias } => {
220 self.handle_set_alias(&device_id, alias).await;
221 }
222 Command::ForgetDevice { device_id } => {
223 self.handle_forget_device(&device_id).await;
224 }
225 Command::CancelOperation => {
226 self.handle_cancel_operation().await;
227 }
228 Command::StartBackgroundPolling {
229 device_id,
230 interval_secs,
231 } => {
232 self.handle_start_background_polling(&device_id, interval_secs)
233 .await;
234 }
235 Command::StopBackgroundPolling { device_id } => {
236 self.handle_stop_background_polling(&device_id).await;
237 }
238 Command::Shutdown => {
239 }
241 Command::InstallSystemService { .. }
243 | Command::UninstallSystemService { .. }
244 | Command::StartSystemService { .. }
245 | Command::StopSystemService { .. }
246 | Command::CheckSystemServiceStatus { .. }
247 | Command::FetchServiceConfig
248 | Command::AddServiceDevice { .. }
249 | Command::UpdateServiceDevice { .. }
250 | Command::RemoveServiceDevice { .. } => {
251 info!("System service commands not supported in TUI");
252 }
253 }
254 }
255
256 async fn handle_load_cached_data(&self) {
258 info!("Loading cached data from store");
259
260 let Some(store) = self.open_store() else {
261 let _ = self
263 .event_tx
264 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
265 .await;
266 return;
267 };
268
269 let stored_devices = match store.list_devices() {
271 Ok(devices) => devices,
272 Err(e) => {
273 warn!("Failed to list devices: {}", e);
274 let _ = self
275 .event_tx
276 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
277 .await;
278 return;
279 }
280 };
281
282 let mut cached_devices = Vec::new();
284 for stored in stored_devices {
285 let reading = match store.get_latest_reading(&stored.id) {
286 Ok(Some(stored_reading)) => Some(stored_reading.to_reading()),
287 Ok(None) => None,
288 Err(e) => {
289 debug!("Failed to get latest reading for {}: {}", stored.id, e);
290 None
291 }
292 };
293
294 let last_sync = match store.get_sync_state(&stored.id) {
296 Ok(Some(state)) => state.last_sync_at,
297 Ok(None) => None,
298 Err(e) => {
299 debug!("Failed to get sync state for {}: {}", stored.id, e);
300 None
301 }
302 };
303
304 cached_devices.push(CachedDevice {
305 id: stored.id,
306 name: stored.name,
307 device_type: stored.device_type,
308 reading,
309 last_sync,
310 });
311 }
312
313 info!(count = cached_devices.len(), "Loaded cached devices");
314
315 let device_ids: Vec<String> = cached_devices.iter().map(|d| d.id.clone()).collect();
317
318 if let Err(e) = self
319 .event_tx
320 .send(SensorEvent::CachedDataLoaded {
321 devices: cached_devices,
322 })
323 .await
324 {
325 error!("Failed to send CachedDataLoaded event: {}", e);
326 }
327
328 for device_id in device_ids {
330 self.load_and_send_history(&device_id).await;
331 }
332 }
333
334 async fn handle_scan(&self, duration: Duration) {
336 info!(?duration, "Starting device scan");
337
338 if let Err(e) = self.event_tx.send(SensorEvent::ScanStarted).await {
340 error!("Failed to send ScanStarted event: {}", e);
341 return;
342 }
343
344 let cancel_token = self.cancel_token.clone();
346
347 let options = ScanOptions::default().duration(duration);
349 let scan_result = tokio::select! {
350 result = scan_with_options(options) => result,
351 _ = cancel_token.cancelled() => {
352 info!("Scan cancelled by user");
353 let _ = self
354 .event_tx
355 .send(SensorEvent::OperationCancelled {
356 operation: "Device scan".to_string(),
357 })
358 .await;
359 return;
360 }
361 };
362
363 match scan_result {
364 Ok(devices) => {
365 info!(count = devices.len(), "Scan complete");
366
367 self.save_discovered_devices(&devices);
369
370 if let Err(e) = self
371 .event_tx
372 .send(SensorEvent::ScanComplete { devices })
373 .await
374 {
375 error!("Failed to send ScanComplete event: {}", e);
376 }
377 }
378 Err(e) => {
379 error!("Scan failed: {}", e);
380 if let Err(send_err) = self
381 .event_tx
382 .send(SensorEvent::ScanError {
383 error: e.to_string(),
384 })
385 .await
386 {
387 error!("Failed to send ScanError event: {}", send_err);
388 }
389 }
390 }
391 }
392
393 async fn handle_connect(&self, device_id: &str) {
395 info!(device_id, "Connecting to device");
396
397 if let Err(e) = self
399 .event_tx
400 .send(SensorEvent::DeviceConnecting {
401 device_id: device_id.to_string(),
402 })
403 .await
404 {
405 error!("Failed to send DeviceConnecting event: {}", e);
406 return;
407 }
408
409 let cancel_token = self.cancel_token.clone();
411
412 let retry_config = RetryConfig::for_connect();
414 let device_id_owned = device_id.to_string();
415 let config = self.connection_config.clone();
416
417 let connect_future = with_retry(&retry_config, "connect_and_read", || {
418 let device_id = device_id_owned.clone();
419 let config = config.clone();
420 async move { Self::connect_and_read_with_config(&device_id, config).await }
421 });
422
423 let result = tokio::select! {
425 result = connect_future => result,
426 _ = cancel_token.cancelled() => {
427 info!(device_id, "Connection cancelled by user");
428 let _ = self
429 .event_tx
430 .send(SensorEvent::OperationCancelled {
431 operation: format!("Connect to {}", device_id),
432 })
433 .await;
434 return;
435 }
436 };
437
438 match result {
439 Ok((name, device_type, reading, settings, rssi, signal_quality)) => {
440 info!(
441 device_id,
442 ?name,
443 ?device_type,
444 ?rssi,
445 ?signal_quality,
446 "Device connected"
447 );
448
449 if let Some(quality) = signal_quality {
451 self.signal_quality_cache
452 .write()
453 .await
454 .insert(device_id.to_string(), quality);
455
456 if let Some(rssi_val) = rssi {
458 let _ = self
459 .event_tx
460 .send(SensorEvent::SignalStrengthUpdate {
461 device_id: device_id.to_string(),
462 rssi: rssi_val,
463 quality: aranet_core::messages::SignalQuality::from_rssi(rssi_val),
464 })
465 .await;
466 }
467
468 if quality == SignalQuality::Poor {
470 warn!(
471 device_id,
472 "Poor signal quality - connection may be unstable"
473 );
474 }
475 }
476
477 self.update_device_metadata(device_id, name.as_deref(), device_type);
479
480 if let Err(e) = self
482 .event_tx
483 .send(SensorEvent::DeviceConnected {
484 device_id: device_id.to_string(),
485 name,
486 device_type,
487 rssi,
488 })
489 .await
490 {
491 error!("Failed to send DeviceConnected event: {}", e);
492 }
493
494 if let Some(settings) = settings
496 && let Err(e) = self
497 .event_tx
498 .send(SensorEvent::SettingsLoaded {
499 device_id: device_id.to_string(),
500 settings,
501 })
502 .await
503 {
504 error!("Failed to send SettingsLoaded event: {}", e);
505 }
506
507 if let Some(reading) = reading {
509 self.save_reading(device_id, &reading);
511
512 if let Err(e) = self
513 .event_tx
514 .send(SensorEvent::ReadingUpdated {
515 device_id: device_id.to_string(),
516 reading,
517 })
518 .await
519 {
520 error!("Failed to send ReadingUpdated event: {}", e);
521 }
522 }
523
524 self.load_and_send_history(device_id).await;
526 }
527 Err(e) => {
528 error!(device_id, error = %e, "Failed to connect to device after retries");
529 let context = ErrorContext::from_error(&e);
531 if let Err(send_err) = self
532 .event_tx
533 .send(SensorEvent::ConnectionError {
534 device_id: device_id.to_string(),
535 error: e.to_string(),
536 context: Some(context),
537 })
538 .await
539 {
540 error!("Failed to send ConnectionError event: {}", send_err);
541 }
542 }
543 }
544 }
545
546 async fn handle_disconnect(&self, device_id: &str) {
552 info!(device_id, "Disconnecting device");
553
554 if let Err(e) = self
556 .event_tx
557 .send(SensorEvent::DeviceDisconnected {
558 device_id: device_id.to_string(),
559 })
560 .await
561 {
562 error!("Failed to send DeviceDisconnected event: {}", e);
563 }
564 }
565
566 async fn handle_refresh_reading(&self, device_id: &str) {
568 info!(device_id, "Refreshing reading for device");
569
570 let signal_quality = self
572 .signal_quality_cache
573 .read()
574 .await
575 .get(device_id)
576 .copied();
577
578 let retry_config = match signal_quality {
580 Some(SignalQuality::Poor) | Some(SignalQuality::Fair) => {
581 debug!(
582 device_id,
583 ?signal_quality,
584 "Using aggressive retry config for weak signal"
585 );
586 RetryConfig::aggressive()
587 }
588 _ => RetryConfig::for_read(),
589 };
590
591 let device_id_owned = device_id.to_string();
592 let config = self.connection_config.clone();
593
594 let result = with_retry(&retry_config, "refresh_reading", || {
595 let device_id = device_id_owned.clone();
596 let config = config.clone();
597 async move { Self::connect_and_read_with_config(&device_id, config).await }
598 })
599 .await;
600
601 match result {
602 Ok((_, _, reading, settings, rssi, new_signal_quality)) => {
603 if let Some(quality) = new_signal_quality {
605 self.signal_quality_cache
606 .write()
607 .await
608 .insert(device_id.to_string(), quality);
609 }
610
611 if let Some(rssi_val) = rssi {
613 let _ = self
614 .event_tx
615 .send(SensorEvent::SignalStrengthUpdate {
616 device_id: device_id.to_string(),
617 rssi: rssi_val,
618 quality: aranet_core::messages::SignalQuality::from_rssi(rssi_val),
619 })
620 .await;
621 }
622
623 if let Some(settings) = settings
625 && let Err(e) = self
626 .event_tx
627 .send(SensorEvent::SettingsLoaded {
628 device_id: device_id.to_string(),
629 settings,
630 })
631 .await
632 {
633 error!("Failed to send SettingsLoaded event: {}", e);
634 }
635
636 if let Some(reading) = reading {
637 info!(device_id, "Reading refreshed successfully");
638
639 self.save_reading(device_id, &reading);
641
642 if let Err(e) = self
643 .event_tx
644 .send(SensorEvent::ReadingUpdated {
645 device_id: device_id.to_string(),
646 reading,
647 })
648 .await
649 {
650 error!("Failed to send ReadingUpdated event: {}", e);
651 }
652 } else {
653 warn!(device_id, "Connected but failed to read current values");
654 let context = ErrorContext::transient(
655 "Failed to read current values",
656 "Device connected but returned no data. Try again.",
657 );
658 if let Err(e) = self
659 .event_tx
660 .send(SensorEvent::ReadingError {
661 device_id: device_id.to_string(),
662 error: "Failed to read current values".to_string(),
663 context: Some(context),
664 })
665 .await
666 {
667 error!("Failed to send ReadingError event: {}", e);
668 }
669 }
670 }
671 Err(e) => {
672 error!(device_id, error = %e, "Failed to refresh reading after retries");
673 let context = ErrorContext::from_error(&e);
674 if let Err(send_err) = self
675 .event_tx
676 .send(SensorEvent::ReadingError {
677 device_id: device_id.to_string(),
678 error: e.to_string(),
679 context: Some(context),
680 })
681 .await
682 {
683 error!("Failed to send ReadingError event: {}", send_err);
684 }
685 }
686 }
687 }
688
689 async fn handle_refresh_all(&self) {
694 info!("Refreshing all devices");
695
696 let Some(store) = self.open_store() else {
698 return;
699 };
700
701 let devices = match store.list_devices() {
702 Ok(devices) => devices,
703 Err(e) => {
704 warn!("Failed to list devices for refresh all: {}", e);
705 return;
706 }
707 };
708
709 for device in devices {
711 self.handle_refresh_reading(&device.id).await;
712 }
713
714 info!("Completed refreshing all devices");
715 }
716
717 async fn handle_set_interval(&self, device_id: &str, interval_secs: u16) {
722 info!(device_id, interval_secs, "Setting measurement interval");
723
724 let interval = match MeasurementInterval::from_seconds(interval_secs) {
726 Some(i) => i,
727 None => {
728 let error = format!(
729 "Invalid interval: {} seconds. Must be 60, 120, 300, or 600.",
730 interval_secs
731 );
732 error!(device_id, %error, "Invalid interval value");
733 let context = ErrorContext::permanent(&error);
734 let _ = self
735 .event_tx
736 .send(SensorEvent::IntervalError {
737 device_id: device_id.to_string(),
738 error,
739 context: Some(context),
740 })
741 .await;
742 return;
743 }
744 };
745
746 let retry_config = RetryConfig::for_connect();
748 let device_id_owned = device_id.to_string();
749 let config = self.connection_config.clone();
750
751 let device = match with_retry(&retry_config, "connect_for_interval", || {
752 let device_id = device_id_owned.clone();
753 let config = config.clone();
754 async move { Device::connect_with_config(&device_id, config).await }
755 })
756 .await
757 {
758 Ok(d) => d,
759 Err(e) => {
760 error!(device_id, error = %e, "Failed to connect for set interval");
761 let context = ErrorContext::from_error(&e);
762 let _ = self
763 .event_tx
764 .send(SensorEvent::IntervalError {
765 device_id: device_id.to_string(),
766 error: e.to_string(),
767 context: Some(context),
768 })
769 .await;
770 return;
771 }
772 };
773
774 let retry_config = RetryConfig::for_write();
776 if let Err(e) = with_retry(&retry_config, "set_interval", || async {
777 device.set_interval(interval).await
778 })
779 .await
780 {
781 error!(device_id, error = %e, "Failed to set interval");
782 disconnect_quietly(&device).await;
783 let context = ErrorContext::from_error(&e);
784 let _ = self
785 .event_tx
786 .send(SensorEvent::IntervalError {
787 device_id: device_id.to_string(),
788 error: e.to_string(),
789 context: Some(context),
790 })
791 .await;
792 return;
793 }
794
795 disconnect_quietly(&device).await;
797
798 info!(
799 device_id,
800 interval_secs, "Measurement interval set successfully"
801 );
802
803 if let Err(e) = self
805 .event_tx
806 .send(SensorEvent::IntervalChanged {
807 device_id: device_id.to_string(),
808 interval_secs,
809 })
810 .await
811 {
812 error!("Failed to send IntervalChanged event: {}", e);
813 }
814 }
815
816 async fn handle_set_bluetooth_range(&self, device_id: &str, extended: bool) {
818 let range_name = if extended { "Extended" } else { "Standard" };
819 info!(device_id, range_name, "Setting Bluetooth range");
820
821 let retry_config = RetryConfig::for_connect();
823 let device_id_owned = device_id.to_string();
824 let config = self.connection_config.clone();
825
826 let device = match with_retry(&retry_config, "connect_for_bt_range", || {
827 let device_id = device_id_owned.clone();
828 let config = config.clone();
829 async move { Device::connect_with_config(&device_id, config).await }
830 })
831 .await
832 {
833 Ok(d) => d,
834 Err(e) => {
835 error!(device_id, error = %e, "Failed to connect for set Bluetooth range");
836 let context = ErrorContext::from_error(&e);
837 let _ = self
838 .event_tx
839 .send(SensorEvent::BluetoothRangeError {
840 device_id: device_id.to_string(),
841 error: e.to_string(),
842 context: Some(context),
843 })
844 .await;
845 return;
846 }
847 };
848
849 let range = if extended {
851 BluetoothRange::Extended
852 } else {
853 BluetoothRange::Standard
854 };
855
856 let retry_config = RetryConfig::for_write();
858 if let Err(e) = with_retry(&retry_config, "set_bt_range", || async {
859 device.set_bluetooth_range(range).await
860 })
861 .await
862 {
863 error!(device_id, error = %e, "Failed to set Bluetooth range");
864 disconnect_quietly(&device).await;
865 let context = ErrorContext::from_error(&e);
866 let _ = self
867 .event_tx
868 .send(SensorEvent::BluetoothRangeError {
869 device_id: device_id.to_string(),
870 error: e.to_string(),
871 context: Some(context),
872 })
873 .await;
874 return;
875 }
876
877 disconnect_quietly(&device).await;
879
880 info!(device_id, range_name, "Bluetooth range set successfully");
881
882 if let Err(e) = self
884 .event_tx
885 .send(SensorEvent::BluetoothRangeChanged {
886 device_id: device_id.to_string(),
887 extended,
888 })
889 .await
890 {
891 error!("Failed to send BluetoothRangeChanged event: {}", e);
892 }
893 }
894
895 async fn handle_set_smart_home(&self, device_id: &str, enabled: bool) {
897 let mode = if enabled { "enabled" } else { "disabled" };
898 info!(device_id, mode, "Setting Smart Home");
899
900 let retry_config = RetryConfig::for_connect();
902 let device_id_owned = device_id.to_string();
903 let config = self.connection_config.clone();
904
905 let device = match with_retry(&retry_config, "connect_for_smart_home", || {
906 let device_id = device_id_owned.clone();
907 let config = config.clone();
908 async move { Device::connect_with_config(&device_id, config).await }
909 })
910 .await
911 {
912 Ok(d) => d,
913 Err(e) => {
914 error!(device_id, error = %e, "Failed to connect for set Smart Home");
915 let context = ErrorContext::from_error(&e);
916 let _ = self
917 .event_tx
918 .send(SensorEvent::SmartHomeError {
919 device_id: device_id.to_string(),
920 error: e.to_string(),
921 context: Some(context),
922 })
923 .await;
924 return;
925 }
926 };
927
928 let retry_config = RetryConfig::for_write();
930 if let Err(e) = with_retry(&retry_config, "set_smart_home", || async {
931 device.set_smart_home(enabled).await
932 })
933 .await
934 {
935 error!(device_id, error = %e, "Failed to set Smart Home");
936 disconnect_quietly(&device).await;
937 let context = ErrorContext::from_error(&e);
938 let _ = self
939 .event_tx
940 .send(SensorEvent::SmartHomeError {
941 device_id: device_id.to_string(),
942 error: e.to_string(),
943 context: Some(context),
944 })
945 .await;
946 return;
947 }
948
949 disconnect_quietly(&device).await;
951
952 info!(device_id, mode, "Smart Home set successfully");
953
954 if let Err(e) = self
956 .event_tx
957 .send(SensorEvent::SmartHomeChanged {
958 device_id: device_id.to_string(),
959 enabled,
960 })
961 .await
962 {
963 error!("Failed to send SmartHomeChanged event: {}", e);
964 }
965 }
966
967 async fn connect_and_read_with_config(
975 device_id: &str,
976 config: ConnectionConfig,
977 ) -> Result<
978 (
979 Option<String>,
980 Option<DeviceType>,
981 Option<CurrentReading>,
982 Option<DeviceSettings>,
983 Option<i16>,
984 Option<SignalQuality>,
985 ),
986 aranet_core::Error,
987 > {
988 let device = Device::connect_with_config(device_id, config).await?;
989
990 if !device.validate_connection().await {
992 warn!(
993 device_id,
994 "Connection validation failed - device may be out of range"
995 );
996 disconnect_quietly(&device).await;
997 return Err(aranet_core::Error::NotConnected);
998 }
999 debug!(device_id, "Connection validated successfully");
1000
1001 let name = device.name().map(String::from);
1002 let device_type = device.device_type();
1003
1004 let rssi = device.read_rssi().await.ok();
1006 let signal_quality = rssi.map(SignalQuality::from_rssi);
1007
1008 if let Some(quality) = signal_quality {
1009 debug!(device_id, ?quality, rssi = ?rssi, "Signal quality assessed");
1010 }
1011
1012 if let Some(quality) = signal_quality {
1014 let delay = quality.recommended_read_delay();
1015 if delay > Duration::from_millis(50) {
1016 debug!(device_id, ?delay, "Adding read delay for signal quality");
1017 tokio::time::sleep(delay).await;
1018 }
1019 }
1020
1021 let reading = match device.read_current().await {
1023 Ok(reading) => {
1024 info!(device_id, "Read current values successfully");
1025 Some(reading)
1026 }
1027 Err(e) => {
1028 warn!(device_id, error = %e, "Failed to read current values");
1029 None
1030 }
1031 };
1032
1033 let settings = match device.get_settings().await {
1035 Ok(settings) => {
1036 info!(device_id, ?settings, "Read device settings successfully");
1037 Some(settings)
1038 }
1039 Err(e) => {
1040 warn!(device_id, error = %e, "Failed to read device settings");
1041 None
1042 }
1043 };
1044
1045 disconnect_quietly(&device).await;
1047
1048 Ok((name, device_type, reading, settings, rssi, signal_quality))
1049 }
1050
1051 fn save_reading(&self, device_id: &str, reading: &CurrentReading) {
1053 let Some(store) = self.open_store() else {
1054 return;
1055 };
1056
1057 if let Err(e) = store.insert_reading(device_id, reading) {
1058 warn!(device_id, error = %e, "Failed to save reading to store");
1059 } else {
1060 debug!(device_id, "Reading saved to store");
1061 }
1062 }
1063
1064 fn save_discovered_devices(&self, devices: &[aranet_core::DiscoveredDevice]) {
1066 let Some(store) = self.open_store() else {
1067 return;
1068 };
1069
1070 for device in devices {
1071 let device_id = device.id.to_string();
1072 if let Err(e) = store.upsert_device(&device_id, device.name.as_deref()) {
1074 warn!(device_id, error = %e, "Failed to upsert device");
1075 continue;
1076 }
1077 if let Some(device_type) = device.device_type
1079 && let Err(e) = store.update_device_metadata(&device_id, None, Some(device_type))
1080 {
1081 warn!(device_id, error = %e, "Failed to update device metadata");
1082 }
1083 }
1084
1085 debug!(count = devices.len(), "Saved discovered devices to store");
1086 }
1087
1088 fn update_device_metadata(
1090 &self,
1091 device_id: &str,
1092 name: Option<&str>,
1093 device_type: Option<DeviceType>,
1094 ) {
1095 let Some(store) = self.open_store() else {
1096 return;
1097 };
1098
1099 if let Err(e) = store.upsert_device(device_id, name) {
1101 warn!(device_id, error = %e, "Failed to upsert device");
1102 return;
1103 }
1104
1105 if let Err(e) = store.update_device_metadata(device_id, name, device_type) {
1107 warn!(device_id, error = %e, "Failed to update device metadata");
1108 } else {
1109 debug!(device_id, ?name, ?device_type, "Device metadata updated");
1110 }
1111 }
1112
1113 async fn load_and_send_history(&self, device_id: &str) {
1115 let Some(store) = self.open_store() else {
1116 return;
1117 };
1118
1119 use aranet_store::HistoryQuery;
1122 let query = HistoryQuery::new().device(device_id).oldest_first(); match store.query_history(&query) {
1125 Ok(stored_records) => {
1126 let records: Vec<aranet_types::HistoryRecord> =
1127 stored_records.into_iter().map(|r| r.to_history()).collect();
1128
1129 info!(
1130 device_id,
1131 count = records.len(),
1132 "Loaded history from store"
1133 );
1134
1135 if let Err(e) = self
1136 .event_tx
1137 .send(SensorEvent::HistoryLoaded {
1138 device_id: device_id.to_string(),
1139 records,
1140 })
1141 .await
1142 {
1143 error!("Failed to send HistoryLoaded event: {}", e);
1144 }
1145 }
1146 Err(e) => {
1147 warn!(device_id, error = %e, "Failed to query history from store");
1148 }
1149 }
1150 }
1151
1152 async fn handle_sync_history(&self, device_id: &str) {
1157 use aranet_core::history::HistoryOptions;
1158
1159 info!(device_id, "Syncing history from device");
1160
1161 let Some(store) = self.open_store() else {
1163 let context = ErrorContext::permanent("Failed to open local database");
1164 let _ = self
1165 .event_tx
1166 .send(SensorEvent::HistorySyncError {
1167 device_id: device_id.to_string(),
1168 error: "Failed to open store".to_string(),
1169 context: Some(context),
1170 })
1171 .await;
1172 return;
1173 };
1174
1175 let retry_config = RetryConfig::for_connect();
1177 let device_id_owned = device_id.to_string();
1178 let config = self.connection_config.clone();
1179
1180 let device = match with_retry(&retry_config, "connect_for_history", || {
1181 let device_id = device_id_owned.clone();
1182 let config = config.clone();
1183 async move { Device::connect_with_config(&device_id, config).await }
1184 })
1185 .await
1186 {
1187 Ok(d) => d,
1188 Err(e) => {
1189 error!(device_id, error = %e, "Failed to connect for history sync");
1190 let context = ErrorContext::from_error(&e);
1191 let _ = self
1192 .event_tx
1193 .send(SensorEvent::HistorySyncError {
1194 device_id: device_id.to_string(),
1195 error: e.to_string(),
1196 context: Some(context),
1197 })
1198 .await;
1199 return;
1200 }
1201 };
1202
1203 if !device.validate_connection().await {
1205 warn!(device_id, "Connection validation failed for history sync");
1206 disconnect_quietly(&device).await;
1207 let context = ErrorContext::transient(
1208 "Connection validation failed",
1209 "Device connected but is not responding. Try moving closer.",
1210 );
1211 let _ = self
1212 .event_tx
1213 .send(SensorEvent::HistorySyncError {
1214 device_id: device_id.to_string(),
1215 error: "Connection validation failed".to_string(),
1216 context: Some(context),
1217 })
1218 .await;
1219 return;
1220 }
1221
1222 let history_info = match device.get_history_info().await {
1224 Ok(info) => info,
1225 Err(e) => {
1226 error!(device_id, error = %e, "Failed to get history info");
1227 disconnect_quietly(&device).await;
1228 let context = ErrorContext::from_error(&e);
1229 let _ = self
1230 .event_tx
1231 .send(SensorEvent::HistorySyncError {
1232 device_id: device_id.to_string(),
1233 error: e.to_string(),
1234 context: Some(context),
1235 })
1236 .await;
1237 return;
1238 }
1239 };
1240
1241 let total_on_device = history_info.total_readings;
1242
1243 let start_index = match store.calculate_sync_start(device_id, total_on_device) {
1245 Ok(idx) => idx,
1246 Err(e) => {
1247 warn!(device_id, error = %e, "Failed to calculate sync start, doing full sync");
1248 1u16
1249 }
1250 };
1251
1252 if start_index > total_on_device {
1254 info!(device_id, "Already up to date, no new readings to sync");
1255 disconnect_quietly(&device).await;
1256 let _ = self
1257 .event_tx
1258 .send(SensorEvent::HistorySynced {
1259 device_id: device_id.to_string(),
1260 count: 0,
1261 })
1262 .await;
1263 self.load_and_send_history(device_id).await;
1265 return;
1266 }
1267
1268 let records_to_download = total_on_device.saturating_sub(start_index) + 1;
1269 info!(
1270 device_id,
1271 start_index,
1272 total_on_device,
1273 records_to_download,
1274 "Downloading history (incremental sync)"
1275 );
1276
1277 if let Err(e) = self
1279 .event_tx
1280 .send(SensorEvent::HistorySyncStarted {
1281 device_id: device_id.to_string(),
1282 total_records: Some(records_to_download),
1283 })
1284 .await
1285 {
1286 error!("Failed to send HistorySyncStarted event: {}", e);
1287 }
1288
1289 let signal_quality = self
1292 .signal_quality_cache
1293 .read()
1294 .await
1295 .get(device_id)
1296 .copied();
1297 let read_delay = signal_quality
1298 .map(|q| q.recommended_read_delay())
1299 .unwrap_or(Duration::from_millis(50));
1300
1301 let history_options = HistoryOptions {
1302 start_index: Some(start_index),
1303 end_index: None, read_delay,
1305 use_adaptive_delay: true, ..Default::default()
1307 };
1308
1309 let event_tx = self.event_tx.clone();
1311 let device_id_for_progress = device_id.to_string();
1312 let total = records_to_download as usize;
1313
1314 let last_progress_update = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1316 let last_progress_clone = last_progress_update.clone();
1317
1318 let progress_task = {
1320 let event_tx = event_tx.clone();
1321 let device_id = device_id_for_progress.clone();
1322 tokio::spawn(async move {
1323 let mut interval = interval(Duration::from_millis(500));
1324 loop {
1325 interval.tick().await;
1326 let downloaded = last_progress_clone.load(std::sync::atomic::Ordering::Relaxed);
1327 if downloaded > 0 && downloaded < total {
1328 let _ = event_tx
1329 .send(SensorEvent::HistorySyncProgress {
1330 device_id: device_id.clone(),
1331 downloaded,
1332 total,
1333 })
1334 .await;
1335 }
1336 if downloaded >= total {
1337 break;
1338 }
1339 }
1340 })
1341 };
1342
1343 let cancel_token = self.cancel_token.clone();
1345
1346 let retry_config = RetryConfig::for_history();
1348 let download_future = with_retry(&retry_config, "download_history", || {
1349 let options = history_options.clone();
1350 let progress = last_progress_update.clone();
1351 let device = &device;
1352 async move {
1353 let records = device.download_history_with_options(options).await?;
1354 progress.store(records.len(), std::sync::atomic::Ordering::Relaxed);
1355 Ok(records)
1356 }
1357 });
1358
1359 let download_result = tokio::select! {
1361 result = download_future => result,
1362 _ = cancel_token.cancelled() => {
1363 progress_task.abort();
1364 info!(device_id, "History sync cancelled by user");
1365 disconnect_quietly(&device).await;
1366 let _ = self
1367 .event_tx
1368 .send(SensorEvent::OperationCancelled {
1369 operation: format!("History sync for {}", device_id),
1370 })
1371 .await;
1372 return;
1373 }
1374 };
1375
1376 let records = match download_result {
1377 Ok(r) => {
1378 progress_task.abort();
1379 r
1380 }
1381 Err(e) => {
1382 progress_task.abort();
1383 error!(device_id, error = %e, "Failed to download history");
1384 disconnect_quietly(&device).await;
1385 let context = ErrorContext::from_error(&e);
1386 let _ = self
1387 .event_tx
1388 .send(SensorEvent::HistorySyncError {
1389 device_id: device_id.to_string(),
1390 error: e.to_string(),
1391 context: Some(context),
1392 })
1393 .await;
1394 return;
1395 }
1396 };
1397
1398 let record_count = records.len();
1399 info!(
1400 device_id,
1401 count = record_count,
1402 "Downloaded history from device"
1403 );
1404
1405 let _ = self
1407 .event_tx
1408 .send(SensorEvent::HistorySyncProgress {
1409 device_id: device_id.to_string(),
1410 downloaded: record_count,
1411 total,
1412 })
1413 .await;
1414
1415 disconnect_quietly(&device).await;
1417
1418 match store.insert_history(device_id, &records) {
1421 Ok(inserted) => {
1422 debug!(
1423 device_id,
1424 downloaded = record_count,
1425 inserted,
1426 "History saved to store"
1427 );
1428
1429 if let Err(e) = store.update_sync_state(device_id, total_on_device, total_on_device)
1431 {
1432 warn!(device_id, error = %e, "Failed to update sync state");
1433 }
1434 }
1435 Err(e) => {
1436 warn!(device_id, error = %e, "Failed to save history to store - sync state not updated");
1437 }
1438 }
1439
1440 if let Err(e) = self
1442 .event_tx
1443 .send(SensorEvent::HistorySynced {
1444 device_id: device_id.to_string(),
1445 count: record_count,
1446 })
1447 .await
1448 {
1449 error!("Failed to send HistorySynced event: {}", e);
1450 }
1451
1452 self.load_and_send_history(device_id).await;
1454 }
1455
1456 async fn handle_refresh_service_status(&self) {
1458 info!("Refreshing service status");
1459
1460 let Some(ref client) = self.service_client else {
1461 let _ = self
1462 .event_tx
1463 .send(SensorEvent::ServiceStatusError {
1464 error: "Service client not available".to_string(),
1465 })
1466 .await;
1467 return;
1468 };
1469
1470 match client.status().await {
1471 Ok(status) => {
1472 let devices: Vec<ServiceDeviceStats> = status
1474 .devices
1475 .into_iter()
1476 .map(|d| ServiceDeviceStats {
1477 device_id: d.device_id,
1478 alias: d.alias,
1479 poll_interval: d.poll_interval,
1480 polling: d.polling,
1481 success_count: d.success_count,
1482 failure_count: d.failure_count,
1483 last_poll_at: d.last_poll_at,
1484 last_error: d.last_error,
1485 })
1486 .collect();
1487
1488 let _ = self
1489 .event_tx
1490 .send(SensorEvent::ServiceStatusRefreshed {
1491 reachable: true,
1492 collector_running: status.collector.running,
1493 uptime_seconds: status.collector.uptime_seconds,
1494 devices,
1495 })
1496 .await;
1497 }
1498 Err(e) => match &e {
1499 aranet_core::service_client::ServiceClientError::NotReachable { .. } => {
1500 let _ = self
1501 .event_tx
1502 .send(SensorEvent::ServiceStatusRefreshed {
1503 reachable: false,
1504 collector_running: false,
1505 uptime_seconds: None,
1506 devices: vec![],
1507 })
1508 .await;
1509 }
1510 _ => {
1511 let _ = self
1512 .event_tx
1513 .send(SensorEvent::ServiceStatusError {
1514 error: Self::format_service_error(&e),
1515 })
1516 .await;
1517 }
1518 },
1519 }
1520 }
1521
1522 async fn handle_start_service_collector(&self) {
1524 info!("Starting service collector");
1525
1526 let Some(ref client) = self.service_client else {
1527 let _ = self
1528 .event_tx
1529 .send(SensorEvent::ServiceCollectorError {
1530 error: "Service client not available".to_string(),
1531 })
1532 .await;
1533 return;
1534 };
1535
1536 match client.start_collector().await {
1537 Ok(_) => {
1538 let _ = self
1539 .event_tx
1540 .send(SensorEvent::ServiceCollectorStarted)
1541 .await;
1542 self.handle_refresh_service_status().await;
1544 }
1545 Err(e) => {
1546 let _ = self
1547 .event_tx
1548 .send(SensorEvent::ServiceCollectorError {
1549 error: Self::format_service_error(&e),
1550 })
1551 .await;
1552 }
1553 }
1554 }
1555
1556 async fn handle_stop_service_collector(&self) {
1558 info!("Stopping service collector");
1559
1560 let Some(ref client) = self.service_client else {
1561 let _ = self
1562 .event_tx
1563 .send(SensorEvent::ServiceCollectorError {
1564 error: "Service client not available".to_string(),
1565 })
1566 .await;
1567 return;
1568 };
1569
1570 match client.stop_collector().await {
1571 Ok(_) => {
1572 let _ = self
1573 .event_tx
1574 .send(SensorEvent::ServiceCollectorStopped)
1575 .await;
1576 self.handle_refresh_service_status().await;
1578 }
1579 Err(e) => {
1580 let _ = self
1581 .event_tx
1582 .send(SensorEvent::ServiceCollectorError {
1583 error: Self::format_service_error(&e),
1584 })
1585 .await;
1586 }
1587 }
1588 }
1589
1590 fn format_service_error(e: &aranet_core::service_client::ServiceClientError) -> String {
1591 use aranet_core::service_client::ServiceClientError;
1592
1593 match e {
1594 ServiceClientError::NotReachable { url, .. } => {
1595 format!(
1596 "Service not reachable at {}. Run 'aranet-service run' to start it.",
1597 url
1598 )
1599 }
1600 ServiceClientError::InvalidUrl(url) => {
1601 format!("Invalid service URL: '{}'. Check your configuration.", url)
1602 }
1603 ServiceClientError::ApiError { status, message } => match *status {
1604 401 => "Authentication required. Check your API key.".to_string(),
1605 403 => "Access denied. Check your API key permissions.".to_string(),
1606 404 => "Endpoint not found. The service may be an older version.".to_string(),
1607 409 => message.clone(),
1608 500..=599 => format!("Service error ({}): {}", status, message),
1609 _ => format!("API error ({}): {}", status, message),
1610 },
1611 ServiceClientError::Request(req_err) => {
1612 if req_err.is_timeout() {
1613 "Request timed out. The service may be overloaded.".to_string()
1614 } else if req_err.is_connect() {
1615 "Connection failed. The service may not be running.".to_string()
1616 } else {
1617 format!("Request failed: {}", req_err)
1618 }
1619 }
1620 }
1621 }
1622
1623 async fn handle_set_alias(&self, device_id: &str, alias: Option<String>) {
1624 info!("Setting alias for device {} to {:?}", device_id, alias);
1625
1626 let Some(store) = self.open_store() else {
1627 let _ = self
1628 .event_tx
1629 .send(SensorEvent::AliasError {
1630 device_id: device_id.to_string(),
1631 error: "Could not open database".to_string(),
1632 })
1633 .await;
1634 return;
1635 };
1636
1637 match store.update_device_metadata(device_id, alias.as_deref(), None) {
1638 Ok(()) => {
1639 info!("Alias updated successfully for {}", device_id);
1640 let _ = self
1641 .event_tx
1642 .send(SensorEvent::AliasChanged {
1643 device_id: device_id.to_string(),
1644 alias,
1645 })
1646 .await;
1647 }
1648 Err(e) => {
1649 let _ = self
1650 .event_tx
1651 .send(SensorEvent::AliasError {
1652 device_id: device_id.to_string(),
1653 error: e.to_string(),
1654 })
1655 .await;
1656 }
1657 }
1658 }
1659
1660 async fn handle_start_background_polling(&self, device_id: &str, interval_secs: u64) {
1666 info!(device_id, interval_secs, "Starting background polling");
1667
1668 {
1670 let polling = self.background_polling.read().await;
1671 if polling.contains_key(device_id) {
1672 warn!(device_id, "Background polling already active for device");
1673 return;
1674 }
1675 }
1676
1677 let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);
1679
1680 {
1682 let mut polling = self.background_polling.write().await;
1683 polling.insert(device_id.to_string(), cancel_tx);
1684 }
1685
1686 let device_id_owned = device_id.to_string();
1688 let event_tx = self.event_tx.clone();
1689 let config = self.connection_config.clone();
1690 let signal_quality_cache = self.signal_quality_cache.clone();
1691 let store_path = self.store_path.clone();
1692 let polling_interval = Duration::from_secs(interval_secs);
1693
1694 let _ = event_tx
1696 .send(SensorEvent::BackgroundPollingStarted {
1697 device_id: device_id.to_string(),
1698 interval_secs,
1699 })
1700 .await;
1701
1702 tokio::spawn(async move {
1704 let mut interval_timer = interval(polling_interval);
1705 interval_timer.tick().await;
1707
1708 loop {
1709 tokio::select! {
1710 _ = cancel_rx.changed() => {
1711 if *cancel_rx.borrow() {
1712 info!(device_id = %device_id_owned, "Background polling cancelled");
1713 break;
1714 }
1715 }
1716 _ = interval_timer.tick() => {
1717 debug!(device_id = %device_id_owned, "Background poll tick");
1718
1719 let signal_quality = signal_quality_cache.read().await.get(&device_id_owned).copied();
1721
1722 let retry_config = match signal_quality {
1724 Some(SignalQuality::Poor) | Some(SignalQuality::Fair) => {
1725 RetryConfig::aggressive()
1726 }
1727 _ => RetryConfig::for_read(),
1728 };
1729
1730 match with_retry(&retry_config, "background_poll", || {
1732 let device_id = device_id_owned.clone();
1733 let config = config.clone();
1734 async move {
1735 Self::connect_and_read_with_config(&device_id, config).await
1736 }
1737 })
1738 .await
1739 {
1740 Ok((_, _, reading, _, rssi, new_signal_quality)) => {
1741 if let Some(quality) = new_signal_quality {
1743 signal_quality_cache
1744 .write()
1745 .await
1746 .insert(device_id_owned.clone(), quality);
1747 }
1748
1749 if let Some(rssi_val) = rssi {
1751 let _ = event_tx
1752 .send(SensorEvent::SignalStrengthUpdate {
1753 device_id: device_id_owned.clone(),
1754 rssi: rssi_val,
1755 quality: aranet_core::messages::SignalQuality::from_rssi(rssi_val),
1756 })
1757 .await;
1758 }
1759
1760 if let Some(reading) = reading {
1761 debug!(device_id = %device_id_owned, "Background poll successful");
1762
1763 if let Ok(store) = Store::open(&store_path)
1765 && let Err(e) = store.insert_reading(&device_id_owned, &reading)
1766 {
1767 warn!(device_id = %device_id_owned, error = %e, "Failed to save background reading to store");
1768 }
1769
1770 let _ = event_tx
1772 .send(SensorEvent::ReadingUpdated {
1773 device_id: device_id_owned.clone(),
1774 reading,
1775 })
1776 .await;
1777 }
1778 }
1779 Err(e) => {
1780 warn!(device_id = %device_id_owned, error = %e, "Background poll failed");
1781 let context = ErrorContext::from_error(&e);
1782 let _ = event_tx
1783 .send(SensorEvent::ReadingError {
1784 device_id: device_id_owned.clone(),
1785 error: e.to_string(),
1786 context: Some(context),
1787 })
1788 .await;
1789 }
1790 }
1791 }
1792 }
1793 }
1794
1795 let _ = event_tx
1797 .send(SensorEvent::BackgroundPollingStopped {
1798 device_id: device_id_owned,
1799 })
1800 .await;
1801 });
1802
1803 info!(device_id, "Background polling task spawned");
1804 }
1805
1806 async fn handle_cancel_operation(&mut self) {
1811 info!("Cancelling current operation");
1812
1813 self.cancel_token.cancel();
1815
1816 self.cancel_token = CancellationToken::new();
1818
1819 let _ = self
1821 .event_tx
1822 .send(SensorEvent::OperationCancelled {
1823 operation: "Current operation".to_string(),
1824 })
1825 .await;
1826 }
1827
1828 async fn handle_forget_device(&self, device_id: &str) {
1830 info!(device_id, "Forgetting device");
1831
1832 {
1834 let mut polling = self.background_polling.write().await;
1835 if let Some(cancel_tx) = polling.remove(device_id) {
1836 let _ = cancel_tx.send(true);
1837 info!(device_id, "Stopped background polling for forgotten device");
1838 }
1839 }
1840
1841 {
1843 let mut cache = self.signal_quality_cache.write().await;
1844 cache.remove(device_id);
1845 }
1846
1847 let Some(store) = self.open_store() else {
1849 let _ = self
1850 .event_tx
1851 .send(SensorEvent::ForgetDeviceError {
1852 device_id: device_id.to_string(),
1853 error: "Could not open database".to_string(),
1854 })
1855 .await;
1856 return;
1857 };
1858
1859 match store.delete_device(device_id) {
1860 Ok(deleted) => {
1861 if deleted {
1862 info!(device_id, "Device forgotten (deleted from store)");
1863 } else {
1864 info!(
1865 device_id,
1866 "Device not found in store (removing from UI only)"
1867 );
1868 }
1869 let _ = self
1870 .event_tx
1871 .send(SensorEvent::DeviceForgotten {
1872 device_id: device_id.to_string(),
1873 })
1874 .await;
1875 }
1876 Err(e) => {
1877 error!(device_id, error = %e, "Failed to forget device");
1878 let _ = self
1879 .event_tx
1880 .send(SensorEvent::ForgetDeviceError {
1881 device_id: device_id.to_string(),
1882 error: e.to_string(),
1883 })
1884 .await;
1885 }
1886 }
1887 }
1888
1889 async fn handle_stop_background_polling(&self, device_id: &str) {
1891 info!(device_id, "Stopping background polling");
1892
1893 let mut polling = self.background_polling.write().await;
1894 if let Some(cancel_tx) = polling.remove(device_id) {
1895 let _ = cancel_tx.send(true);
1897 info!(device_id, "Background polling stop signal sent");
1898 } else {
1899 warn!(device_id, "No active background polling found for device");
1900 }
1901 }
1902}
1903
1904#[cfg(test)]
1905mod tests {
1906 use std::time::{SystemTime, UNIX_EPOCH};
1907
1908 use super::*;
1909
1910 fn test_store_path(label: &str) -> PathBuf {
1911 let unique = SystemTime::now()
1912 .duration_since(UNIX_EPOCH)
1913 .unwrap()
1914 .as_nanos();
1915 std::env::temp_dir().join(format!(
1916 "aranet-tui-worker-{label}-{}-{unique}.db",
1917 std::process::id()
1918 ))
1919 }
1920
1921 #[tokio::test]
1922 async fn handle_cancel_operation_resets_token_and_emits_event() {
1923 let (_command_tx, command_rx) = mpsc::channel(1);
1924 let (event_tx, mut event_rx) = mpsc::channel(1);
1925 let mut worker = SensorWorker::new(command_rx, event_tx, test_store_path("cancel"));
1926 let original_token = worker.cancel_token.clone();
1927
1928 worker.handle_cancel_operation().await;
1929
1930 assert!(original_token.is_cancelled());
1931 assert!(!worker.cancel_token.is_cancelled());
1932
1933 let event = event_rx.recv().await.unwrap();
1934 match event {
1935 SensorEvent::OperationCancelled { operation } => {
1936 assert_eq!(operation, "Current operation");
1937 }
1938 other => panic!("unexpected event: {other:?}"),
1939 }
1940 }
1941
1942 #[tokio::test]
1943 async fn handle_stop_background_polling_removes_cancel_sender() {
1944 let (_command_tx, command_rx) = mpsc::channel(1);
1945 let (event_tx, _event_rx) = mpsc::channel(1);
1946 let worker = SensorWorker::new(command_rx, event_tx, test_store_path("polling"));
1947 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1948 let device_id = "aranet-test";
1949
1950 worker
1951 .background_polling
1952 .write()
1953 .await
1954 .insert(device_id.to_string(), cancel_tx);
1955
1956 worker.handle_stop_background_polling(device_id).await;
1957
1958 assert!(
1959 !worker
1960 .background_polling
1961 .read()
1962 .await
1963 .contains_key(device_id)
1964 );
1965 assert!(*cancel_rx.borrow());
1966 }
1967}