1use std::path::PathBuf;
19use std::time::Duration;
20
21use aranet_core::settings::{DeviceSettings, MeasurementInterval};
22use aranet_core::{BluetoothRange, Device, ScanOptions, scan::scan_with_options};
23use aranet_store::Store;
24use aranet_types::{CurrentReading, DeviceType};
25use tokio::sync::mpsc;
26use tokio::time::timeout;
27use tracing::{debug, error, info, warn};
28
29use super::messages::{CachedDevice, Command, SensorEvent};
30
31const CONNECT_READ_TIMEOUT: Duration = Duration::from_secs(30);
33
34pub struct SensorWorker {
44 command_rx: mpsc::Receiver<Command>,
46 event_tx: mpsc::Sender<SensorEvent>,
48 #[allow(dead_code)]
50 store_path: PathBuf,
51 #[allow(dead_code)]
53 poll_interval: Duration,
54}
55
56impl SensorWorker {
57 pub fn new(
67 command_rx: mpsc::Receiver<Command>,
68 event_tx: mpsc::Sender<SensorEvent>,
69 store_path: PathBuf,
70 ) -> Self {
71 Self {
72 command_rx,
73 event_tx,
74 store_path,
75 poll_interval: Duration::from_secs(60),
76 }
77 }
78
79 fn open_store(&self) -> Option<Store> {
83 match Store::open(&self.store_path) {
84 Ok(store) => Some(store),
85 Err(e) => {
86 warn!(error = %e, "Failed to open store");
87 None
88 }
89 }
90 }
91
92 pub async fn run(mut self) {
97 info!("SensorWorker started");
98
99 loop {
100 tokio::select! {
101 cmd = self.command_rx.recv() => {
103 match cmd {
104 Some(Command::Shutdown) => {
105 info!("SensorWorker received shutdown command");
106 break;
107 }
108 Some(cmd) => {
109 self.handle_command(cmd).await;
110 }
111 None => {
112 info!("Command channel closed, shutting down worker");
113 break;
114 }
115 }
116 }
117 }
118 }
119
120 info!("SensorWorker stopped");
121 }
122
123 async fn handle_command(&mut self, cmd: Command) {
125 info!(?cmd, "Handling command");
126
127 match cmd {
128 Command::LoadCachedData => {
129 self.handle_load_cached_data().await;
130 }
131 Command::Scan { duration } => {
132 self.handle_scan(duration).await;
133 }
134 Command::Connect { device_id } => {
135 self.handle_connect(&device_id).await;
136 }
137 Command::Disconnect { device_id } => {
138 self.handle_disconnect(&device_id).await;
139 }
140 Command::RefreshReading { device_id } => {
141 self.handle_refresh_reading(&device_id).await;
142 }
143 Command::RefreshAll => {
144 self.handle_refresh_all().await;
145 }
146 Command::SyncHistory { device_id } => {
147 self.handle_sync_history(&device_id).await;
148 }
149 Command::SetInterval {
150 device_id,
151 interval_secs,
152 } => {
153 self.handle_set_interval(&device_id, interval_secs).await;
154 }
155 Command::SetBluetoothRange {
156 device_id,
157 extended,
158 } => {
159 self.handle_set_bluetooth_range(&device_id, extended).await;
160 }
161 Command::SetSmartHome { device_id, enabled } => {
162 self.handle_set_smart_home(&device_id, enabled).await;
163 }
164 Command::Shutdown => {
165 }
167 }
168 }
169
170 async fn handle_load_cached_data(&self) {
172 info!("Loading cached data from store");
173
174 let Some(store) = self.open_store() else {
175 let _ = self
177 .event_tx
178 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
179 .await;
180 return;
181 };
182
183 let stored_devices = match store.list_devices() {
185 Ok(devices) => devices,
186 Err(e) => {
187 warn!("Failed to list devices: {}", e);
188 let _ = self
189 .event_tx
190 .send(SensorEvent::CachedDataLoaded { devices: vec![] })
191 .await;
192 return;
193 }
194 };
195
196 let mut cached_devices = Vec::new();
198 for stored in stored_devices {
199 let reading = match store.get_latest_reading(&stored.id) {
200 Ok(Some(stored_reading)) => Some(CurrentReading {
201 co2: stored_reading.co2,
202 temperature: stored_reading.temperature,
203 pressure: stored_reading.pressure,
204 humidity: stored_reading.humidity,
205 battery: stored_reading.battery,
206 status: stored_reading.status,
207 interval: 0, age: 0, captured_at: Some(stored_reading.captured_at),
210 radon: stored_reading.radon,
211 radiation_rate: stored_reading.radiation_rate,
212 radiation_total: stored_reading.radiation_total,
213 radon_avg_24h: None,
214 radon_avg_7d: None,
215 radon_avg_30d: None,
216 }),
217 Ok(None) => None,
218 Err(e) => {
219 debug!("Failed to get latest reading for {}: {}", stored.id, e);
220 None
221 }
222 };
223
224 let last_sync = match store.get_sync_state(&stored.id) {
226 Ok(Some(state)) => state.last_sync_at,
227 Ok(None) => None,
228 Err(e) => {
229 debug!("Failed to get sync state for {}: {}", stored.id, e);
230 None
231 }
232 };
233
234 cached_devices.push(CachedDevice {
235 id: stored.id,
236 name: stored.name,
237 device_type: stored.device_type,
238 reading,
239 last_sync,
240 });
241 }
242
243 info!(count = cached_devices.len(), "Loaded cached devices");
244
245 let device_ids: Vec<String> = cached_devices.iter().map(|d| d.id.clone()).collect();
247
248 if let Err(e) = self
249 .event_tx
250 .send(SensorEvent::CachedDataLoaded {
251 devices: cached_devices,
252 })
253 .await
254 {
255 error!("Failed to send CachedDataLoaded event: {}", e);
256 }
257
258 for device_id in device_ids {
260 self.load_and_send_history(&device_id).await;
261 }
262 }
263
264 async fn handle_scan(&self, duration: Duration) {
266 info!(?duration, "Starting device scan");
267
268 if let Err(e) = self.event_tx.send(SensorEvent::ScanStarted).await {
270 error!("Failed to send ScanStarted event: {}", e);
271 return;
272 }
273
274 let options = ScanOptions::default().duration(duration);
276 match scan_with_options(options).await {
277 Ok(devices) => {
278 info!(count = devices.len(), "Scan complete");
279
280 self.save_discovered_devices(&devices);
282
283 if let Err(e) = self
284 .event_tx
285 .send(SensorEvent::ScanComplete { devices })
286 .await
287 {
288 error!("Failed to send ScanComplete event: {}", e);
289 }
290 }
291 Err(e) => {
292 error!("Scan failed: {}", e);
293 if let Err(send_err) = self
294 .event_tx
295 .send(SensorEvent::ScanError {
296 error: e.to_string(),
297 })
298 .await
299 {
300 error!("Failed to send ScanError event: {}", send_err);
301 }
302 }
303 }
304 }
305
306 async fn handle_connect(&self, device_id: &str) {
308 info!(device_id, "Connecting to device");
309
310 if let Err(e) = self
312 .event_tx
313 .send(SensorEvent::DeviceConnecting {
314 device_id: device_id.to_string(),
315 })
316 .await
317 {
318 error!("Failed to send DeviceConnecting event: {}", e);
319 return;
320 }
321
322 match timeout(CONNECT_READ_TIMEOUT, self.connect_and_read(device_id)).await {
323 Ok(Ok((name, device_type, reading, settings))) => {
324 info!(device_id, ?name, ?device_type, "Device connected");
325
326 self.update_device_metadata(device_id, name.as_deref(), device_type);
328
329 if let Err(e) = self
331 .event_tx
332 .send(SensorEvent::DeviceConnected {
333 device_id: device_id.to_string(),
334 name,
335 device_type,
336 rssi: None,
338 })
339 .await
340 {
341 error!("Failed to send DeviceConnected event: {}", e);
342 }
343
344 if let Some(settings) = settings
346 && let Err(e) = self
347 .event_tx
348 .send(SensorEvent::SettingsLoaded {
349 device_id: device_id.to_string(),
350 settings,
351 })
352 .await
353 {
354 error!("Failed to send SettingsLoaded event: {}", e);
355 }
356
357 if let Some(reading) = reading {
359 self.save_reading(device_id, &reading);
361
362 if let Err(e) = self
363 .event_tx
364 .send(SensorEvent::ReadingUpdated {
365 device_id: device_id.to_string(),
366 reading,
367 })
368 .await
369 {
370 error!("Failed to send ReadingUpdated event: {}", e);
371 }
372 }
373
374 self.load_and_send_history(device_id).await;
376 }
377 Ok(Err(e)) => {
378 error!(device_id, error = %e, "Failed to connect to device");
379 if let Err(send_err) = self
380 .event_tx
381 .send(SensorEvent::ConnectionError {
382 device_id: device_id.to_string(),
383 error: e.to_string(),
384 })
385 .await
386 {
387 error!("Failed to send ConnectionError event: {}", send_err);
388 }
389 }
390 Err(_) => {
391 error!(device_id, "Connection timed out");
393 if let Err(send_err) = self
394 .event_tx
395 .send(SensorEvent::ConnectionError {
396 device_id: device_id.to_string(),
397 error: format!(
398 "Connection timed out after {}s",
399 CONNECT_READ_TIMEOUT.as_secs()
400 ),
401 })
402 .await
403 {
404 error!("Failed to send ConnectionError event: {}", send_err);
405 }
406 }
407 }
408 }
409
410 async fn handle_disconnect(&self, device_id: &str) {
416 info!(device_id, "Disconnecting device");
417
418 if let Err(e) = self
420 .event_tx
421 .send(SensorEvent::DeviceDisconnected {
422 device_id: device_id.to_string(),
423 })
424 .await
425 {
426 error!("Failed to send DeviceDisconnected event: {}", e);
427 }
428 }
429
430 async fn handle_refresh_reading(&self, device_id: &str) {
432 info!(device_id, "Refreshing reading for device");
433
434 match timeout(CONNECT_READ_TIMEOUT, self.connect_and_read(device_id)).await {
435 Ok(Ok((_, _, reading, settings))) => {
436 if let Some(settings) = settings
438 && let Err(e) = self
439 .event_tx
440 .send(SensorEvent::SettingsLoaded {
441 device_id: device_id.to_string(),
442 settings,
443 })
444 .await
445 {
446 error!("Failed to send SettingsLoaded event: {}", e);
447 }
448
449 if let Some(reading) = reading {
450 info!(device_id, "Reading refreshed successfully");
451
452 self.save_reading(device_id, &reading);
454
455 if let Err(e) = self
456 .event_tx
457 .send(SensorEvent::ReadingUpdated {
458 device_id: device_id.to_string(),
459 reading,
460 })
461 .await
462 {
463 error!("Failed to send ReadingUpdated event: {}", e);
464 }
465 } else {
466 warn!(device_id, "Connected but failed to read current values");
467 if let Err(e) = self
468 .event_tx
469 .send(SensorEvent::ReadingError {
470 device_id: device_id.to_string(),
471 error: "Failed to read current values".to_string(),
472 })
473 .await
474 {
475 error!("Failed to send ReadingError event: {}", e);
476 }
477 }
478 }
479 Ok(Err(e)) => {
480 error!(device_id, error = %e, "Failed to refresh reading");
481 if let Err(send_err) = self
482 .event_tx
483 .send(SensorEvent::ReadingError {
484 device_id: device_id.to_string(),
485 error: e.to_string(),
486 })
487 .await
488 {
489 error!("Failed to send ReadingError event: {}", send_err);
490 }
491 }
492 Err(_) => {
493 error!(device_id, "Refresh reading timed out");
495 if let Err(send_err) = self
496 .event_tx
497 .send(SensorEvent::ReadingError {
498 device_id: device_id.to_string(),
499 error: format!(
500 "Refresh timed out after {}s",
501 CONNECT_READ_TIMEOUT.as_secs()
502 ),
503 })
504 .await
505 {
506 error!("Failed to send ReadingError event: {}", send_err);
507 }
508 }
509 }
510 }
511
512 async fn handle_refresh_all(&self) {
517 info!("Refreshing all devices");
518
519 let Some(store) = self.open_store() else {
521 return;
522 };
523
524 let devices = match store.list_devices() {
525 Ok(devices) => devices,
526 Err(e) => {
527 warn!("Failed to list devices for refresh all: {}", e);
528 return;
529 }
530 };
531
532 for device in devices {
534 self.handle_refresh_reading(&device.id).await;
535 }
536
537 info!("Completed refreshing all devices");
538 }
539
540 async fn handle_set_interval(&self, device_id: &str, interval_secs: u16) {
545 info!(device_id, interval_secs, "Setting measurement interval");
546
547 let interval = match MeasurementInterval::from_seconds(interval_secs) {
549 Some(i) => i,
550 None => {
551 let error = format!(
552 "Invalid interval: {} seconds. Must be 60, 120, 300, or 600.",
553 interval_secs
554 );
555 error!(device_id, %error, "Invalid interval value");
556 let _ = self
557 .event_tx
558 .send(SensorEvent::IntervalError {
559 device_id: device_id.to_string(),
560 error,
561 })
562 .await;
563 return;
564 }
565 };
566
567 let device = match Device::connect(device_id).await {
569 Ok(d) => d,
570 Err(e) => {
571 error!(device_id, error = %e, "Failed to connect for set interval");
572 let _ = self
573 .event_tx
574 .send(SensorEvent::IntervalError {
575 device_id: device_id.to_string(),
576 error: e.to_string(),
577 })
578 .await;
579 return;
580 }
581 };
582
583 if let Err(e) = device.set_interval(interval).await {
585 error!(device_id, error = %e, "Failed to set interval");
586 let _ = device.disconnect().await;
587 let _ = self
588 .event_tx
589 .send(SensorEvent::IntervalError {
590 device_id: device_id.to_string(),
591 error: e.to_string(),
592 })
593 .await;
594 return;
595 }
596
597 if let Err(e) = device.disconnect().await {
599 warn!(device_id, error = %e, "Failed to disconnect after setting interval");
600 }
601
602 info!(
603 device_id,
604 interval_secs, "Measurement interval set successfully"
605 );
606
607 if let Err(e) = self
609 .event_tx
610 .send(SensorEvent::IntervalChanged {
611 device_id: device_id.to_string(),
612 interval_secs,
613 })
614 .await
615 {
616 error!("Failed to send IntervalChanged event: {}", e);
617 }
618 }
619
620 async fn handle_set_bluetooth_range(&self, device_id: &str, extended: bool) {
622 let range_name = if extended { "Extended" } else { "Standard" };
623 info!(device_id, range_name, "Setting Bluetooth range");
624
625 let device = match Device::connect(device_id).await {
627 Ok(d) => d,
628 Err(e) => {
629 error!(device_id, error = %e, "Failed to connect for set Bluetooth range");
630 let _ = self
631 .event_tx
632 .send(SensorEvent::BluetoothRangeError {
633 device_id: device_id.to_string(),
634 error: e.to_string(),
635 })
636 .await;
637 return;
638 }
639 };
640
641 let range = if extended {
643 BluetoothRange::Extended
644 } else {
645 BluetoothRange::Standard
646 };
647
648 if let Err(e) = device.set_bluetooth_range(range).await {
649 error!(device_id, error = %e, "Failed to set Bluetooth range");
650 let _ = device.disconnect().await;
651 let _ = self
652 .event_tx
653 .send(SensorEvent::BluetoothRangeError {
654 device_id: device_id.to_string(),
655 error: e.to_string(),
656 })
657 .await;
658 return;
659 }
660
661 if let Err(e) = device.disconnect().await {
663 warn!(device_id, error = %e, "Failed to disconnect after setting Bluetooth range");
664 }
665
666 info!(device_id, range_name, "Bluetooth range set successfully");
667
668 if let Err(e) = self
670 .event_tx
671 .send(SensorEvent::BluetoothRangeChanged {
672 device_id: device_id.to_string(),
673 extended,
674 })
675 .await
676 {
677 error!("Failed to send BluetoothRangeChanged event: {}", e);
678 }
679 }
680
681 async fn handle_set_smart_home(&self, device_id: &str, enabled: bool) {
683 let mode = if enabled { "enabled" } else { "disabled" };
684 info!(device_id, mode, "Setting Smart Home");
685
686 let device = match Device::connect(device_id).await {
688 Ok(d) => d,
689 Err(e) => {
690 error!(device_id, error = %e, "Failed to connect for set Smart Home");
691 let _ = self
692 .event_tx
693 .send(SensorEvent::SmartHomeError {
694 device_id: device_id.to_string(),
695 error: e.to_string(),
696 })
697 .await;
698 return;
699 }
700 };
701
702 if let Err(e) = device.set_smart_home(enabled).await {
704 error!(device_id, error = %e, "Failed to set Smart Home");
705 let _ = device.disconnect().await;
706 let _ = self
707 .event_tx
708 .send(SensorEvent::SmartHomeError {
709 device_id: device_id.to_string(),
710 error: e.to_string(),
711 })
712 .await;
713 return;
714 }
715
716 if let Err(e) = device.disconnect().await {
718 warn!(device_id, error = %e, "Failed to disconnect after setting Smart Home");
719 }
720
721 info!(device_id, mode, "Smart Home set successfully");
722
723 if let Err(e) = self
725 .event_tx
726 .send(SensorEvent::SmartHomeChanged {
727 device_id: device_id.to_string(),
728 enabled,
729 })
730 .await
731 {
732 error!("Failed to send SmartHomeChanged event: {}", e);
733 }
734 }
735
736 async fn connect_and_read(
741 &self,
742 device_id: &str,
743 ) -> Result<
744 (
745 Option<String>,
746 Option<DeviceType>,
747 Option<CurrentReading>,
748 Option<DeviceSettings>,
749 ),
750 aranet_core::Error,
751 > {
752 let device = Device::connect(device_id).await?;
753
754 let name = device.name().map(String::from);
755 let device_type = device.device_type();
756
757 let reading = match device.read_current().await {
759 Ok(reading) => {
760 info!(device_id, "Read current values successfully");
761 Some(reading)
762 }
763 Err(e) => {
764 warn!(device_id, error = %e, "Failed to read current values");
765 None
766 }
767 };
768
769 let settings = match device.get_settings().await {
771 Ok(settings) => {
772 info!(device_id, ?settings, "Read device settings successfully");
773 Some(settings)
774 }
775 Err(e) => {
776 warn!(device_id, error = %e, "Failed to read device settings");
777 None
778 }
779 };
780
781 if let Err(e) = device.disconnect().await {
783 warn!(device_id, error = %e, "Failed to disconnect from device");
784 }
785
786 Ok((name, device_type, reading, settings))
787 }
788
789 fn save_reading(&self, device_id: &str, reading: &CurrentReading) {
791 let Some(store) = self.open_store() else {
792 return;
793 };
794
795 if let Err(e) = store.insert_reading(device_id, reading) {
796 warn!(device_id, error = %e, "Failed to save reading to store");
797 } else {
798 debug!(device_id, "Reading saved to store");
799 }
800 }
801
802 fn save_discovered_devices(&self, devices: &[aranet_core::DiscoveredDevice]) {
804 let Some(store) = self.open_store() else {
805 return;
806 };
807
808 for device in devices {
809 let device_id = device.id.to_string();
810 if let Err(e) = store.upsert_device(&device_id, device.name.as_deref()) {
812 warn!(device_id, error = %e, "Failed to upsert device");
813 continue;
814 }
815 if let Some(device_type) = device.device_type
817 && let Err(e) = store.update_device_metadata(&device_id, None, Some(device_type))
818 {
819 warn!(device_id, error = %e, "Failed to update device metadata");
820 }
821 }
822
823 debug!(count = devices.len(), "Saved discovered devices to store");
824 }
825
826 fn update_device_metadata(
828 &self,
829 device_id: &str,
830 name: Option<&str>,
831 device_type: Option<DeviceType>,
832 ) {
833 let Some(store) = self.open_store() else {
834 return;
835 };
836
837 if let Err(e) = store.upsert_device(device_id, name) {
839 warn!(device_id, error = %e, "Failed to upsert device");
840 return;
841 }
842
843 if let Err(e) = store.update_device_metadata(device_id, name, device_type) {
845 warn!(device_id, error = %e, "Failed to update device metadata");
846 } else {
847 debug!(device_id, ?name, ?device_type, "Device metadata updated");
848 }
849 }
850
851 async fn load_and_send_history(&self, device_id: &str) {
853 let Some(store) = self.open_store() else {
854 return;
855 };
856
857 use aranet_store::HistoryQuery;
860 let query = HistoryQuery::new().device(device_id).oldest_first(); match store.query_history(&query) {
863 Ok(stored_records) => {
864 let records: Vec<aranet_types::HistoryRecord> = stored_records
866 .into_iter()
867 .map(|r| aranet_types::HistoryRecord {
868 timestamp: r.timestamp,
869 co2: r.co2,
870 temperature: r.temperature,
871 pressure: r.pressure,
872 humidity: r.humidity,
873 radon: r.radon,
874 radiation_rate: r.radiation_rate,
875 radiation_total: r.radiation_total,
876 })
877 .collect();
878
879 info!(
880 device_id,
881 count = records.len(),
882 "Loaded history from store"
883 );
884
885 if let Err(e) = self
886 .event_tx
887 .send(SensorEvent::HistoryLoaded {
888 device_id: device_id.to_string(),
889 records,
890 })
891 .await
892 {
893 error!("Failed to send HistoryLoaded event: {}", e);
894 }
895 }
896 Err(e) => {
897 warn!(device_id, error = %e, "Failed to query history from store");
898 }
899 }
900 }
901
902 async fn handle_sync_history(&self, device_id: &str) {
906 use aranet_core::history::HistoryOptions;
907
908 info!(device_id, "Syncing history from device");
909
910 if let Err(e) = self
912 .event_tx
913 .send(SensorEvent::HistorySyncStarted {
914 device_id: device_id.to_string(),
915 })
916 .await
917 {
918 error!("Failed to send HistorySyncStarted event: {}", e);
919 return;
920 }
921
922 let Some(store) = self.open_store() else {
924 let _ = self
925 .event_tx
926 .send(SensorEvent::HistorySyncError {
927 device_id: device_id.to_string(),
928 error: "Failed to open store".to_string(),
929 })
930 .await;
931 return;
932 };
933
934 let device = match Device::connect(device_id).await {
936 Ok(d) => d,
937 Err(e) => {
938 error!(device_id, error = %e, "Failed to connect for history sync");
939 let _ = self
940 .event_tx
941 .send(SensorEvent::HistorySyncError {
942 device_id: device_id.to_string(),
943 error: e.to_string(),
944 })
945 .await;
946 return;
947 }
948 };
949
950 let history_info = match device.get_history_info().await {
952 Ok(info) => info,
953 Err(e) => {
954 error!(device_id, error = %e, "Failed to get history info");
955 let _ = device.disconnect().await;
956 let _ = self
957 .event_tx
958 .send(SensorEvent::HistorySyncError {
959 device_id: device_id.to_string(),
960 error: e.to_string(),
961 })
962 .await;
963 return;
964 }
965 };
966
967 let total_on_device = history_info.total_readings;
968
969 let start_index = match store.calculate_sync_start(device_id, total_on_device) {
971 Ok(idx) => idx,
972 Err(e) => {
973 warn!(device_id, error = %e, "Failed to calculate sync start, doing full sync");
974 1u16
975 }
976 };
977
978 if start_index > total_on_device {
980 info!(device_id, "Already up to date, no new readings to sync");
981 let _ = device.disconnect().await;
982 let _ = self
983 .event_tx
984 .send(SensorEvent::HistorySynced {
985 device_id: device_id.to_string(),
986 count: 0,
987 })
988 .await;
989 self.load_and_send_history(device_id).await;
991 return;
992 }
993
994 let records_to_download = total_on_device.saturating_sub(start_index) + 1;
995 info!(
996 device_id,
997 start_index,
998 total_on_device,
999 records_to_download,
1000 "Downloading history (incremental sync)"
1001 );
1002
1003 let history_options = HistoryOptions {
1005 start_index: Some(start_index),
1006 end_index: None, ..Default::default()
1008 };
1009
1010 let records = match device.download_history_with_options(history_options).await {
1011 Ok(r) => r,
1012 Err(e) => {
1013 error!(device_id, error = %e, "Failed to download history");
1014 let _ = device.disconnect().await;
1015 let _ = self
1016 .event_tx
1017 .send(SensorEvent::HistorySyncError {
1018 device_id: device_id.to_string(),
1019 error: e.to_string(),
1020 })
1021 .await;
1022 return;
1023 }
1024 };
1025
1026 let record_count = records.len();
1027 info!(
1028 device_id,
1029 count = record_count,
1030 "Downloaded history from device"
1031 );
1032
1033 let _ = device.disconnect().await;
1035
1036 if let Err(e) = store.insert_history(device_id, &records) {
1038 warn!(device_id, error = %e, "Failed to save history to store");
1039 } else {
1040 debug!(device_id, count = record_count, "History saved to store");
1041 }
1042
1043 if let Err(e) = store.update_sync_state(device_id, total_on_device, total_on_device) {
1045 warn!(device_id, error = %e, "Failed to update sync state");
1046 }
1047
1048 if let Err(e) = self
1050 .event_tx
1051 .send(SensorEvent::HistorySynced {
1052 device_id: device_id.to_string(),
1053 count: record_count,
1054 })
1055 .await
1056 {
1057 error!("Failed to send HistorySynced event: {}", e);
1058 }
1059
1060 self.load_and_send_history(device_id).await;
1062 }
1063}