1use std::path::PathBuf;
19use std::time::Duration;
20
21use aranet_core::service_client::ServiceClient;
22use aranet_core::settings::{DeviceSettings, MeasurementInterval};
23use aranet_core::{BluetoothRange, Device, ScanOptions, scan::scan_with_options};
24use aranet_store::Store;
25use aranet_types::{CurrentReading, DeviceType};
26use tokio::sync::mpsc;
27use tokio::time::timeout;
28use tracing::{debug, error, info, warn};
29
30use super::messages::{CachedDevice, Command, SensorEvent};
31use aranet_core::messages::ServiceDeviceStats;
32
33const CONNECT_READ_TIMEOUT: Duration = Duration::from_secs(30);
35
36pub struct SensorWorker {
46 command_rx: mpsc::Receiver<Command>,
48 event_tx: mpsc::Sender<SensorEvent>,
50 store_path: PathBuf,
52 service_client: Option<ServiceClient>,
54}
55
56const DEFAULT_SERVICE_URL: &str = "http://localhost:8080";
58
59impl SensorWorker {
60 pub fn new(
68 command_rx: mpsc::Receiver<Command>,
69 event_tx: mpsc::Sender<SensorEvent>,
70 store_path: PathBuf,
71 ) -> Self {
72 let service_client = ServiceClient::new(DEFAULT_SERVICE_URL).ok();
74
75 Self {
76 command_rx,
77 event_tx,
78 store_path,
79 service_client,
80 }
81 }
82
83 fn open_store(&self) -> Option<Store> {
87 match Store::open(&self.store_path) {
88 Ok(store) => Some(store),
89 Err(e) => {
90 warn!(error = %e, "Failed to open store");
91 None
92 }
93 }
94 }
95
96 pub async fn run(mut self) {
101 info!("SensorWorker started");
102
103 loop {
104 tokio::select! {
105 cmd = self.command_rx.recv() => {
107 match cmd {
108 Some(Command::Shutdown) => {
109 info!("SensorWorker received shutdown command");
110 break;
111 }
112 Some(cmd) => {
113 self.handle_command(cmd).await;
114 }
115 None => {
116 info!("Command channel closed, shutting down worker");
117 break;
118 }
119 }
120 }
121 }
122 }
123
124 info!("SensorWorker stopped");
125 }
126
127 async fn handle_command(&mut self, cmd: Command) {
129 info!(?cmd, "Handling command");
130
131 match cmd {
132 Command::LoadCachedData => {
133 self.handle_load_cached_data().await;
134 }
135 Command::Scan { duration } => {
136 self.handle_scan(duration).await;
137 }
138 Command::Connect { device_id } => {
139 self.handle_connect(&device_id).await;
140 }
141 Command::Disconnect { device_id } => {
142 self.handle_disconnect(&device_id).await;
143 }
144 Command::RefreshReading { device_id } => {
145 self.handle_refresh_reading(&device_id).await;
146 }
147 Command::RefreshAll => {
148 self.handle_refresh_all().await;
149 }
150 Command::SyncHistory { device_id } => {
151 self.handle_sync_history(&device_id).await;
152 }
153 Command::SetInterval {
154 device_id,
155 interval_secs,
156 } => {
157 self.handle_set_interval(&device_id, interval_secs).await;
158 }
159 Command::SetBluetoothRange {
160 device_id,
161 extended,
162 } => {
163 self.handle_set_bluetooth_range(&device_id, extended).await;
164 }
165 Command::SetSmartHome { device_id, enabled } => {
166 self.handle_set_smart_home(&device_id, enabled).await;
167 }
168 Command::RefreshServiceStatus => {
169 self.handle_refresh_service_status().await;
170 }
171 Command::StartServiceCollector => {
172 self.handle_start_service_collector().await;
173 }
174 Command::StopServiceCollector => {
175 self.handle_stop_service_collector().await;
176 }
177 Command::SetAlias { device_id, alias } => {
178 self.handle_set_alias(&device_id, alias).await;
179 }
180 Command::ForgetDevice { device_id } => {
181 info!(
183 device_id,
184 "Forget device requested (not implemented in TUI)"
185 );
186 let _ = self
187 .event_tx
188 .send(SensorEvent::DeviceForgotten { device_id })
189 .await;
190 }
191 Command::Shutdown => {
192 }
194 }
195 }
196
197 async fn handle_load_cached_data(&self) {
199 info!("Loading cached data from store");
200
201 let Some(store) = self.open_store() else {
202 let _ = self
204 .event_tx
205 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
206 .await;
207 return;
208 };
209
210 let stored_devices = match store.list_devices() {
212 Ok(devices) => devices,
213 Err(e) => {
214 warn!("Failed to list devices: {}", e);
215 let _ = self
216 .event_tx
217 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
218 .await;
219 return;
220 }
221 };
222
223 let mut cached_devices = Vec::new();
225 for stored in stored_devices {
226 let reading = match store.get_latest_reading(&stored.id) {
227 Ok(Some(stored_reading)) => Some(CurrentReading {
228 co2: stored_reading.co2,
229 temperature: stored_reading.temperature,
230 pressure: stored_reading.pressure,
231 humidity: stored_reading.humidity,
232 battery: stored_reading.battery,
233 status: stored_reading.status,
234 interval: 0, age: 0, captured_at: Some(stored_reading.captured_at),
237 radon: stored_reading.radon,
238 radiation_rate: stored_reading.radiation_rate,
239 radiation_total: stored_reading.radiation_total,
240 radon_avg_24h: None,
241 radon_avg_7d: None,
242 radon_avg_30d: None,
243 }),
244 Ok(None) => None,
245 Err(e) => {
246 debug!("Failed to get latest reading for {}: {}", stored.id, e);
247 None
248 }
249 };
250
251 let last_sync = match store.get_sync_state(&stored.id) {
253 Ok(Some(state)) => state.last_sync_at,
254 Ok(None) => None,
255 Err(e) => {
256 debug!("Failed to get sync state for {}: {}", stored.id, e);
257 None
258 }
259 };
260
261 cached_devices.push(CachedDevice {
262 id: stored.id,
263 name: stored.name,
264 device_type: stored.device_type,
265 reading,
266 last_sync,
267 });
268 }
269
270 info!(count = cached_devices.len(), "Loaded cached devices");
271
272 let device_ids: Vec<String> = cached_devices.iter().map(|d| d.id.clone()).collect();
274
275 if let Err(e) = self
276 .event_tx
277 .send(SensorEvent::CachedDataLoaded {
278 devices: cached_devices,
279 })
280 .await
281 {
282 error!("Failed to send CachedDataLoaded event: {}", e);
283 }
284
285 for device_id in device_ids {
287 self.load_and_send_history(&device_id).await;
288 }
289 }
290
291 async fn handle_scan(&self, duration: Duration) {
293 info!(?duration, "Starting device scan");
294
295 if let Err(e) = self.event_tx.send(SensorEvent::ScanStarted).await {
297 error!("Failed to send ScanStarted event: {}", e);
298 return;
299 }
300
301 let options = ScanOptions::default().duration(duration);
303 match scan_with_options(options).await {
304 Ok(devices) => {
305 info!(count = devices.len(), "Scan complete");
306
307 self.save_discovered_devices(&devices);
309
310 if let Err(e) = self
311 .event_tx
312 .send(SensorEvent::ScanComplete { devices })
313 .await
314 {
315 error!("Failed to send ScanComplete event: {}", e);
316 }
317 }
318 Err(e) => {
319 error!("Scan failed: {}", e);
320 if let Err(send_err) = self
321 .event_tx
322 .send(SensorEvent::ScanError {
323 error: e.to_string(),
324 })
325 .await
326 {
327 error!("Failed to send ScanError event: {}", send_err);
328 }
329 }
330 }
331 }
332
333 async fn handle_connect(&self, device_id: &str) {
335 info!(device_id, "Connecting to device");
336
337 if let Err(e) = self
339 .event_tx
340 .send(SensorEvent::DeviceConnecting {
341 device_id: device_id.to_string(),
342 })
343 .await
344 {
345 error!("Failed to send DeviceConnecting event: {}", e);
346 return;
347 }
348
349 match timeout(CONNECT_READ_TIMEOUT, self.connect_and_read(device_id)).await {
350 Ok(Ok((name, device_type, reading, settings, rssi))) => {
351 info!(device_id, ?name, ?device_type, ?rssi, "Device connected");
352
353 self.update_device_metadata(device_id, name.as_deref(), device_type);
355
356 if let Err(e) = self
358 .event_tx
359 .send(SensorEvent::DeviceConnected {
360 device_id: device_id.to_string(),
361 name,
362 device_type,
363 rssi,
364 })
365 .await
366 {
367 error!("Failed to send DeviceConnected event: {}", e);
368 }
369
370 if let Some(settings) = settings
372 && let Err(e) = self
373 .event_tx
374 .send(SensorEvent::SettingsLoaded {
375 device_id: device_id.to_string(),
376 settings,
377 })
378 .await
379 {
380 error!("Failed to send SettingsLoaded event: {}", e);
381 }
382
383 if let Some(reading) = reading {
385 self.save_reading(device_id, &reading);
387
388 if let Err(e) = self
389 .event_tx
390 .send(SensorEvent::ReadingUpdated {
391 device_id: device_id.to_string(),
392 reading,
393 })
394 .await
395 {
396 error!("Failed to send ReadingUpdated event: {}", e);
397 }
398 }
399
400 self.load_and_send_history(device_id).await;
402 }
403 Ok(Err(e)) => {
404 error!(device_id, error = %e, "Failed to connect to device");
405 if let Err(send_err) = self
406 .event_tx
407 .send(SensorEvent::ConnectionError {
408 device_id: device_id.to_string(),
409 error: e.to_string(),
410 })
411 .await
412 {
413 error!("Failed to send ConnectionError event: {}", send_err);
414 }
415 }
416 Err(_) => {
417 error!(device_id, "Connection timed out");
419 if let Err(send_err) = self
420 .event_tx
421 .send(SensorEvent::ConnectionError {
422 device_id: device_id.to_string(),
423 error: format!(
424 "Connection timed out after {}s",
425 CONNECT_READ_TIMEOUT.as_secs()
426 ),
427 })
428 .await
429 {
430 error!("Failed to send ConnectionError event: {}", send_err);
431 }
432 }
433 }
434 }
435
436 async fn handle_disconnect(&self, device_id: &str) {
442 info!(device_id, "Disconnecting device");
443
444 if let Err(e) = self
446 .event_tx
447 .send(SensorEvent::DeviceDisconnected {
448 device_id: device_id.to_string(),
449 })
450 .await
451 {
452 error!("Failed to send DeviceDisconnected event: {}", e);
453 }
454 }
455
456 async fn handle_refresh_reading(&self, device_id: &str) {
458 info!(device_id, "Refreshing reading for device");
459
460 match timeout(CONNECT_READ_TIMEOUT, self.connect_and_read(device_id)).await {
461 Ok(Ok((_, _, reading, settings, _rssi))) => {
462 if let Some(settings) = settings
464 && let Err(e) = self
465 .event_tx
466 .send(SensorEvent::SettingsLoaded {
467 device_id: device_id.to_string(),
468 settings,
469 })
470 .await
471 {
472 error!("Failed to send SettingsLoaded event: {}", e);
473 }
474
475 if let Some(reading) = reading {
476 info!(device_id, "Reading refreshed successfully");
477
478 self.save_reading(device_id, &reading);
480
481 if let Err(e) = self
482 .event_tx
483 .send(SensorEvent::ReadingUpdated {
484 device_id: device_id.to_string(),
485 reading,
486 })
487 .await
488 {
489 error!("Failed to send ReadingUpdated event: {}", e);
490 }
491 } else {
492 warn!(device_id, "Connected but failed to read current values");
493 if let Err(e) = self
494 .event_tx
495 .send(SensorEvent::ReadingError {
496 device_id: device_id.to_string(),
497 error: "Failed to read current values".to_string(),
498 })
499 .await
500 {
501 error!("Failed to send ReadingError event: {}", e);
502 }
503 }
504 }
505 Ok(Err(e)) => {
506 error!(device_id, error = %e, "Failed to refresh reading");
507 if let Err(send_err) = self
508 .event_tx
509 .send(SensorEvent::ReadingError {
510 device_id: device_id.to_string(),
511 error: e.to_string(),
512 })
513 .await
514 {
515 error!("Failed to send ReadingError event: {}", send_err);
516 }
517 }
518 Err(_) => {
519 error!(device_id, "Refresh reading timed out");
521 if let Err(send_err) = self
522 .event_tx
523 .send(SensorEvent::ReadingError {
524 device_id: device_id.to_string(),
525 error: format!(
526 "Refresh timed out after {}s",
527 CONNECT_READ_TIMEOUT.as_secs()
528 ),
529 })
530 .await
531 {
532 error!("Failed to send ReadingError event: {}", send_err);
533 }
534 }
535 }
536 }
537
538 async fn handle_refresh_all(&self) {
543 info!("Refreshing all devices");
544
545 let Some(store) = self.open_store() else {
547 return;
548 };
549
550 let devices = match store.list_devices() {
551 Ok(devices) => devices,
552 Err(e) => {
553 warn!("Failed to list devices for refresh all: {}", e);
554 return;
555 }
556 };
557
558 for device in devices {
560 self.handle_refresh_reading(&device.id).await;
561 }
562
563 info!("Completed refreshing all devices");
564 }
565
566 async fn handle_set_interval(&self, device_id: &str, interval_secs: u16) {
571 info!(device_id, interval_secs, "Setting measurement interval");
572
573 let interval = match MeasurementInterval::from_seconds(interval_secs) {
575 Some(i) => i,
576 None => {
577 let error = format!(
578 "Invalid interval: {} seconds. Must be 60, 120, 300, or 600.",
579 interval_secs
580 );
581 error!(device_id, %error, "Invalid interval value");
582 let _ = self
583 .event_tx
584 .send(SensorEvent::IntervalError {
585 device_id: device_id.to_string(),
586 error,
587 })
588 .await;
589 return;
590 }
591 };
592
593 let device = match Device::connect(device_id).await {
595 Ok(d) => d,
596 Err(e) => {
597 error!(device_id, error = %e, "Failed to connect for set interval");
598 let _ = self
599 .event_tx
600 .send(SensorEvent::IntervalError {
601 device_id: device_id.to_string(),
602 error: e.to_string(),
603 })
604 .await;
605 return;
606 }
607 };
608
609 if let Err(e) = device.set_interval(interval).await {
611 error!(device_id, error = %e, "Failed to set interval");
612 let _ = device.disconnect().await;
613 let _ = self
614 .event_tx
615 .send(SensorEvent::IntervalError {
616 device_id: device_id.to_string(),
617 error: e.to_string(),
618 })
619 .await;
620 return;
621 }
622
623 if let Err(e) = device.disconnect().await {
625 warn!(device_id, error = %e, "Failed to disconnect after setting interval");
626 }
627
628 info!(
629 device_id,
630 interval_secs, "Measurement interval set successfully"
631 );
632
633 if let Err(e) = self
635 .event_tx
636 .send(SensorEvent::IntervalChanged {
637 device_id: device_id.to_string(),
638 interval_secs,
639 })
640 .await
641 {
642 error!("Failed to send IntervalChanged event: {}", e);
643 }
644 }
645
646 async fn handle_set_bluetooth_range(&self, device_id: &str, extended: bool) {
648 let range_name = if extended { "Extended" } else { "Standard" };
649 info!(device_id, range_name, "Setting Bluetooth range");
650
651 let device = match Device::connect(device_id).await {
653 Ok(d) => d,
654 Err(e) => {
655 error!(device_id, error = %e, "Failed to connect for set Bluetooth range");
656 let _ = self
657 .event_tx
658 .send(SensorEvent::BluetoothRangeError {
659 device_id: device_id.to_string(),
660 error: e.to_string(),
661 })
662 .await;
663 return;
664 }
665 };
666
667 let range = if extended {
669 BluetoothRange::Extended
670 } else {
671 BluetoothRange::Standard
672 };
673
674 if let Err(e) = device.set_bluetooth_range(range).await {
675 error!(device_id, error = %e, "Failed to set Bluetooth range");
676 let _ = device.disconnect().await;
677 let _ = self
678 .event_tx
679 .send(SensorEvent::BluetoothRangeError {
680 device_id: device_id.to_string(),
681 error: e.to_string(),
682 })
683 .await;
684 return;
685 }
686
687 if let Err(e) = device.disconnect().await {
689 warn!(device_id, error = %e, "Failed to disconnect after setting Bluetooth range");
690 }
691
692 info!(device_id, range_name, "Bluetooth range set successfully");
693
694 if let Err(e) = self
696 .event_tx
697 .send(SensorEvent::BluetoothRangeChanged {
698 device_id: device_id.to_string(),
699 extended,
700 })
701 .await
702 {
703 error!("Failed to send BluetoothRangeChanged event: {}", e);
704 }
705 }
706
707 async fn handle_set_smart_home(&self, device_id: &str, enabled: bool) {
709 let mode = if enabled { "enabled" } else { "disabled" };
710 info!(device_id, mode, "Setting Smart Home");
711
712 let device = match Device::connect(device_id).await {
714 Ok(d) => d,
715 Err(e) => {
716 error!(device_id, error = %e, "Failed to connect for set Smart Home");
717 let _ = self
718 .event_tx
719 .send(SensorEvent::SmartHomeError {
720 device_id: device_id.to_string(),
721 error: e.to_string(),
722 })
723 .await;
724 return;
725 }
726 };
727
728 if let Err(e) = device.set_smart_home(enabled).await {
730 error!(device_id, error = %e, "Failed to set Smart Home");
731 let _ = device.disconnect().await;
732 let _ = self
733 .event_tx
734 .send(SensorEvent::SmartHomeError {
735 device_id: device_id.to_string(),
736 error: e.to_string(),
737 })
738 .await;
739 return;
740 }
741
742 if let Err(e) = device.disconnect().await {
744 warn!(device_id, error = %e, "Failed to disconnect after setting Smart Home");
745 }
746
747 info!(device_id, mode, "Smart Home set successfully");
748
749 if let Err(e) = self
751 .event_tx
752 .send(SensorEvent::SmartHomeChanged {
753 device_id: device_id.to_string(),
754 enabled,
755 })
756 .await
757 {
758 error!("Failed to send SmartHomeChanged event: {}", e);
759 }
760 }
761
762 async fn connect_and_read(
767 &self,
768 device_id: &str,
769 ) -> Result<
770 (
771 Option<String>,
772 Option<DeviceType>,
773 Option<CurrentReading>,
774 Option<DeviceSettings>,
775 Option<i16>,
776 ),
777 aranet_core::Error,
778 > {
779 let device = Device::connect(device_id).await?;
780
781 let name = device.name().map(String::from);
782 let device_type = device.device_type();
783
784 let reading = match device.read_current().await {
786 Ok(reading) => {
787 info!(device_id, "Read current values successfully");
788 Some(reading)
789 }
790 Err(e) => {
791 warn!(device_id, error = %e, "Failed to read current values");
792 None
793 }
794 };
795
796 let settings = match device.get_settings().await {
798 Ok(settings) => {
799 info!(device_id, ?settings, "Read device settings successfully");
800 Some(settings)
801 }
802 Err(e) => {
803 warn!(device_id, error = %e, "Failed to read device settings");
804 None
805 }
806 };
807
808 let rssi = device.read_rssi().await.ok();
810
811 if let Err(e) = device.disconnect().await {
813 warn!(device_id, error = %e, "Failed to disconnect from device");
814 }
815
816 Ok((name, device_type, reading, settings, rssi))
817 }
818
819 fn save_reading(&self, device_id: &str, reading: &CurrentReading) {
821 let Some(store) = self.open_store() else {
822 return;
823 };
824
825 if let Err(e) = store.insert_reading(device_id, reading) {
826 warn!(device_id, error = %e, "Failed to save reading to store");
827 } else {
828 debug!(device_id, "Reading saved to store");
829 }
830 }
831
832 fn save_discovered_devices(&self, devices: &[aranet_core::DiscoveredDevice]) {
834 let Some(store) = self.open_store() else {
835 return;
836 };
837
838 for device in devices {
839 let device_id = device.id.to_string();
840 if let Err(e) = store.upsert_device(&device_id, device.name.as_deref()) {
842 warn!(device_id, error = %e, "Failed to upsert device");
843 continue;
844 }
845 if let Some(device_type) = device.device_type
847 && let Err(e) = store.update_device_metadata(&device_id, None, Some(device_type))
848 {
849 warn!(device_id, error = %e, "Failed to update device metadata");
850 }
851 }
852
853 debug!(count = devices.len(), "Saved discovered devices to store");
854 }
855
856 fn update_device_metadata(
858 &self,
859 device_id: &str,
860 name: Option<&str>,
861 device_type: Option<DeviceType>,
862 ) {
863 let Some(store) = self.open_store() else {
864 return;
865 };
866
867 if let Err(e) = store.upsert_device(device_id, name) {
869 warn!(device_id, error = %e, "Failed to upsert device");
870 return;
871 }
872
873 if let Err(e) = store.update_device_metadata(device_id, name, device_type) {
875 warn!(device_id, error = %e, "Failed to update device metadata");
876 } else {
877 debug!(device_id, ?name, ?device_type, "Device metadata updated");
878 }
879 }
880
881 async fn load_and_send_history(&self, device_id: &str) {
883 let Some(store) = self.open_store() else {
884 return;
885 };
886
887 use aranet_store::HistoryQuery;
890 let query = HistoryQuery::new().device(device_id).oldest_first(); match store.query_history(&query) {
893 Ok(stored_records) => {
894 let records: Vec<aranet_types::HistoryRecord> = stored_records
896 .into_iter()
897 .map(|r| aranet_types::HistoryRecord {
898 timestamp: r.timestamp,
899 co2: r.co2,
900 temperature: r.temperature,
901 pressure: r.pressure,
902 humidity: r.humidity,
903 radon: r.radon,
904 radiation_rate: r.radiation_rate,
905 radiation_total: r.radiation_total,
906 })
907 .collect();
908
909 info!(
910 device_id,
911 count = records.len(),
912 "Loaded history from store"
913 );
914
915 if let Err(e) = self
916 .event_tx
917 .send(SensorEvent::HistoryLoaded {
918 device_id: device_id.to_string(),
919 records,
920 })
921 .await
922 {
923 error!("Failed to send HistoryLoaded event: {}", e);
924 }
925 }
926 Err(e) => {
927 warn!(device_id, error = %e, "Failed to query history from store");
928 }
929 }
930 }
931
932 async fn handle_sync_history(&self, device_id: &str) {
936 use aranet_core::history::HistoryOptions;
937
938 info!(device_id, "Syncing history from device");
939
940 if let Err(e) = self
942 .event_tx
943 .send(SensorEvent::HistorySyncStarted {
944 device_id: device_id.to_string(),
945 })
946 .await
947 {
948 error!("Failed to send HistorySyncStarted event: {}", e);
949 return;
950 }
951
952 let Some(store) = self.open_store() else {
954 let _ = self
955 .event_tx
956 .send(SensorEvent::HistorySyncError {
957 device_id: device_id.to_string(),
958 error: "Failed to open store".to_string(),
959 })
960 .await;
961 return;
962 };
963
964 let device = match Device::connect(device_id).await {
966 Ok(d) => d,
967 Err(e) => {
968 error!(device_id, error = %e, "Failed to connect for history sync");
969 let _ = self
970 .event_tx
971 .send(SensorEvent::HistorySyncError {
972 device_id: device_id.to_string(),
973 error: e.to_string(),
974 })
975 .await;
976 return;
977 }
978 };
979
980 let history_info = match device.get_history_info().await {
982 Ok(info) => info,
983 Err(e) => {
984 error!(device_id, error = %e, "Failed to get history info");
985 let _ = device.disconnect().await;
986 let _ = self
987 .event_tx
988 .send(SensorEvent::HistorySyncError {
989 device_id: device_id.to_string(),
990 error: e.to_string(),
991 })
992 .await;
993 return;
994 }
995 };
996
997 let total_on_device = history_info.total_readings;
998
999 let start_index = match store.calculate_sync_start(device_id, total_on_device) {
1001 Ok(idx) => idx,
1002 Err(e) => {
1003 warn!(device_id, error = %e, "Failed to calculate sync start, doing full sync");
1004 1u16
1005 }
1006 };
1007
1008 if start_index > total_on_device {
1010 info!(device_id, "Already up to date, no new readings to sync");
1011 let _ = device.disconnect().await;
1012 let _ = self
1013 .event_tx
1014 .send(SensorEvent::HistorySynced {
1015 device_id: device_id.to_string(),
1016 count: 0,
1017 })
1018 .await;
1019 self.load_and_send_history(device_id).await;
1021 return;
1022 }
1023
1024 let records_to_download = total_on_device.saturating_sub(start_index) + 1;
1025 info!(
1026 device_id,
1027 start_index,
1028 total_on_device,
1029 records_to_download,
1030 "Downloading history (incremental sync)"
1031 );
1032
1033 let history_options = HistoryOptions {
1035 start_index: Some(start_index),
1036 end_index: None, ..Default::default()
1038 };
1039
1040 let records = match device.download_history_with_options(history_options).await {
1041 Ok(r) => r,
1042 Err(e) => {
1043 error!(device_id, error = %e, "Failed to download history");
1044 let _ = device.disconnect().await;
1045 let _ = self
1046 .event_tx
1047 .send(SensorEvent::HistorySyncError {
1048 device_id: device_id.to_string(),
1049 error: e.to_string(),
1050 })
1051 .await;
1052 return;
1053 }
1054 };
1055
1056 let record_count = records.len();
1057 info!(
1058 device_id,
1059 count = record_count,
1060 "Downloaded history from device"
1061 );
1062
1063 let _ = device.disconnect().await;
1065
1066 if let Err(e) = store.insert_history(device_id, &records) {
1068 warn!(device_id, error = %e, "Failed to save history to store");
1069 } else {
1070 debug!(device_id, count = record_count, "History saved to store");
1071 }
1072
1073 if let Err(e) = store.update_sync_state(device_id, total_on_device, total_on_device) {
1075 warn!(device_id, error = %e, "Failed to update sync state");
1076 }
1077
1078 if let Err(e) = self
1080 .event_tx
1081 .send(SensorEvent::HistorySynced {
1082 device_id: device_id.to_string(),
1083 count: record_count,
1084 })
1085 .await
1086 {
1087 error!("Failed to send HistorySynced event: {}", e);
1088 }
1089
1090 self.load_and_send_history(device_id).await;
1092 }
1093
1094 async fn handle_refresh_service_status(&self) {
1096 info!("Refreshing service status");
1097
1098 let Some(ref client) = self.service_client else {
1099 let _ = self
1100 .event_tx
1101 .send(SensorEvent::ServiceStatusError {
1102 error: "Service client not available".to_string(),
1103 })
1104 .await;
1105 return;
1106 };
1107
1108 match client.status().await {
1109 Ok(status) => {
1110 let devices: Vec<ServiceDeviceStats> = status
1112 .devices
1113 .into_iter()
1114 .map(|d| ServiceDeviceStats {
1115 device_id: d.device_id,
1116 alias: d.alias,
1117 poll_interval: d.poll_interval,
1118 polling: d.polling,
1119 success_count: d.success_count,
1120 failure_count: d.failure_count,
1121 last_poll_at: d.last_poll_at,
1122 last_error: d.last_error,
1123 })
1124 .collect();
1125
1126 let _ = self
1127 .event_tx
1128 .send(SensorEvent::ServiceStatusRefreshed {
1129 reachable: true,
1130 collector_running: status.collector.running,
1131 uptime_seconds: status.collector.uptime_seconds,
1132 devices,
1133 })
1134 .await;
1135 }
1136 Err(e) => {
1137 let (reachable, error_msg) = match &e {
1139 aranet_core::service_client::ServiceClientError::NotReachable { .. } => {
1140 (false, "Service not reachable".to_string())
1141 }
1142 _ => (false, e.to_string()),
1143 };
1144
1145 if reachable {
1146 let _ = self
1147 .event_tx
1148 .send(SensorEvent::ServiceStatusError { error: error_msg })
1149 .await;
1150 } else {
1151 let _ = self
1153 .event_tx
1154 .send(SensorEvent::ServiceStatusRefreshed {
1155 reachable: false,
1156 collector_running: false,
1157 uptime_seconds: None,
1158 devices: vec![],
1159 })
1160 .await;
1161 }
1162 }
1163 }
1164 }
1165
1166 async fn handle_start_service_collector(&self) {
1168 info!("Starting service collector");
1169
1170 let Some(ref client) = self.service_client else {
1171 let _ = self
1172 .event_tx
1173 .send(SensorEvent::ServiceCollectorError {
1174 error: "Service client not available".to_string(),
1175 })
1176 .await;
1177 return;
1178 };
1179
1180 match client.start_collector().await {
1181 Ok(_) => {
1182 let _ = self
1183 .event_tx
1184 .send(SensorEvent::ServiceCollectorStarted)
1185 .await;
1186 self.handle_refresh_service_status().await;
1188 }
1189 Err(e) => {
1190 let _ = self
1191 .event_tx
1192 .send(SensorEvent::ServiceCollectorError {
1193 error: e.to_string(),
1194 })
1195 .await;
1196 }
1197 }
1198 }
1199
1200 async fn handle_stop_service_collector(&self) {
1202 info!("Stopping service collector");
1203
1204 let Some(ref client) = self.service_client else {
1205 let _ = self
1206 .event_tx
1207 .send(SensorEvent::ServiceCollectorError {
1208 error: "Service client not available".to_string(),
1209 })
1210 .await;
1211 return;
1212 };
1213
1214 match client.stop_collector().await {
1215 Ok(_) => {
1216 let _ = self
1217 .event_tx
1218 .send(SensorEvent::ServiceCollectorStopped)
1219 .await;
1220 self.handle_refresh_service_status().await;
1222 }
1223 Err(e) => {
1224 let _ = self
1225 .event_tx
1226 .send(SensorEvent::ServiceCollectorError {
1227 error: e.to_string(),
1228 })
1229 .await;
1230 }
1231 }
1232 }
1233
1234 async fn handle_set_alias(&self, device_id: &str, alias: Option<String>) {
1235 info!("Setting alias for device {} to {:?}", device_id, alias);
1236
1237 let Some(store) = self.open_store() else {
1238 let _ = self
1239 .event_tx
1240 .send(SensorEvent::AliasError {
1241 device_id: device_id.to_string(),
1242 error: "Could not open database".to_string(),
1243 })
1244 .await;
1245 return;
1246 };
1247
1248 match store.update_device_metadata(device_id, alias.as_deref(), None) {
1249 Ok(()) => {
1250 info!("Alias updated successfully for {}", device_id);
1251 let _ = self
1252 .event_tx
1253 .send(SensorEvent::AliasChanged {
1254 device_id: device_id.to_string(),
1255 alias,
1256 })
1257 .await;
1258 }
1259 Err(e) => {
1260 let _ = self
1261 .event_tx
1262 .send(SensorEvent::AliasError {
1263 device_id: device_id.to_string(),
1264 error: e.to_string(),
1265 })
1266 .await;
1267 }
1268 }
1269 }
1270}