Skip to main content

aranet_cli/tui/
worker.rs

1//! Background worker for BLE sensor operations.
2//!
3//! This module contains the [`SensorWorker`] which handles all Bluetooth Low Energy
4//! operations in a background task, keeping the UI thread responsive. The worker
5//! communicates with the UI thread via channels:
6//!
7//! - Receives [`Command`]s from the UI to perform operations
8//! - Sends [`SensorEvent`]s back to report results and status updates
9//!
10//! # Architecture
11//!
12//! The worker runs in a separate Tokio task and uses `tokio::select!` to handle:
13//! - Incoming commands from the UI
14//! - Periodic auto-refresh of sensor readings (when enabled)
15//!
16//! All BLE operations are performed here to avoid blocking the UI rendering loop.
17
18use std::path::PathBuf;
19use std::time::Duration;
20
21use aranet_core::service_client::ServiceClient;
22use aranet_core::settings::{DeviceSettings, MeasurementInterval};
23use aranet_core::{BluetoothRange, Device, ScanOptions, scan::scan_with_options};
24use aranet_store::Store;
25use aranet_types::{CurrentReading, DeviceType};
26use tokio::sync::mpsc;
27use tokio::time::timeout;
28use tracing::{debug, error, info, warn};
29
30use super::messages::{CachedDevice, Command, SensorEvent};
31use aranet_core::messages::ServiceDeviceStats;
32
33/// Maximum time to wait for a BLE connect-and-read operation.
34const CONNECT_READ_TIMEOUT: Duration = Duration::from_secs(30);
35
36/// Background worker that handles BLE operations.
37///
38/// The worker receives commands from the UI thread and performs
39/// Bluetooth operations asynchronously, sending events back to
40/// update the UI state.
41///
42/// Note: The Store is not held directly because rusqlite's Connection
43/// is not Send+Sync. Instead, we store the path and open the store
44/// when needed.
45pub struct SensorWorker {
46    /// Receiver for commands from the UI thread.
47    command_rx: mpsc::Receiver<Command>,
48    /// Sender for events back to the UI thread.
49    event_tx: mpsc::Sender<SensorEvent>,
50    /// Path to persistent storage.
51    store_path: PathBuf,
52    /// Service client for aranet-service communication.
53    service_client: Option<ServiceClient>,
54}
55
56/// Default URL for the aranet-service.
57const DEFAULT_SERVICE_URL: &str = "http://localhost:8080";
58
59impl SensorWorker {
60    /// Create a new sensor worker.
61    ///
62    /// # Arguments
63    ///
64    /// * `command_rx` - Channel receiver for commands from the UI
65    /// * `event_tx` - Channel sender for events to the UI
66    /// * `store_path` - Path to persistent storage
67    pub fn new(
68        command_rx: mpsc::Receiver<Command>,
69        event_tx: mpsc::Sender<SensorEvent>,
70        store_path: PathBuf,
71    ) -> Self {
72        // Try to create service client with default URL
73        let service_client = ServiceClient::new(DEFAULT_SERVICE_URL).ok();
74
75        Self {
76            command_rx,
77            event_tx,
78            store_path,
79            service_client,
80        }
81    }
82
83    /// Open the store, logging a warning on failure.
84    ///
85    /// This helper centralizes store access and error handling.
86    fn open_store(&self) -> Option<Store> {
87        match Store::open(&self.store_path) {
88            Ok(store) => Some(store),
89            Err(e) => {
90                warn!(error = %e, "Failed to open store");
91                None
92            }
93        }
94    }
95
96    /// Run the worker's main loop.
97    ///
98    /// This method consumes the worker and runs until a [`Command::Shutdown`]
99    /// is received or the command channel is closed.
100    pub async fn run(mut self) {
101        info!("SensorWorker started");
102
103        loop {
104            tokio::select! {
105                // Handle incoming commands
106                cmd = self.command_rx.recv() => {
107                    match cmd {
108                        Some(Command::Shutdown) => {
109                            info!("SensorWorker received shutdown command");
110                            break;
111                        }
112                        Some(cmd) => {
113                            self.handle_command(cmd).await;
114                        }
115                        None => {
116                            info!("Command channel closed, shutting down worker");
117                            break;
118                        }
119                    }
120                }
121            }
122        }
123
124        info!("SensorWorker stopped");
125    }
126
127    /// Handle a single command from the UI.
128    async fn handle_command(&mut self, cmd: Command) {
129        info!(?cmd, "Handling command");
130
131        match cmd {
132            Command::LoadCachedData => {
133                self.handle_load_cached_data().await;
134            }
135            Command::Scan { duration } => {
136                self.handle_scan(duration).await;
137            }
138            Command::Connect { device_id } => {
139                self.handle_connect(&device_id).await;
140            }
141            Command::Disconnect { device_id } => {
142                self.handle_disconnect(&device_id).await;
143            }
144            Command::RefreshReading { device_id } => {
145                self.handle_refresh_reading(&device_id).await;
146            }
147            Command::RefreshAll => {
148                self.handle_refresh_all().await;
149            }
150            Command::SyncHistory { device_id } => {
151                self.handle_sync_history(&device_id).await;
152            }
153            Command::SetInterval {
154                device_id,
155                interval_secs,
156            } => {
157                self.handle_set_interval(&device_id, interval_secs).await;
158            }
159            Command::SetBluetoothRange {
160                device_id,
161                extended,
162            } => {
163                self.handle_set_bluetooth_range(&device_id, extended).await;
164            }
165            Command::SetSmartHome { device_id, enabled } => {
166                self.handle_set_smart_home(&device_id, enabled).await;
167            }
168            Command::RefreshServiceStatus => {
169                self.handle_refresh_service_status().await;
170            }
171            Command::StartServiceCollector => {
172                self.handle_start_service_collector().await;
173            }
174            Command::StopServiceCollector => {
175                self.handle_stop_service_collector().await;
176            }
177            Command::SetAlias { device_id, alias } => {
178                self.handle_set_alias(&device_id, alias).await;
179            }
180            Command::ForgetDevice { device_id } => {
181                // TUI doesn't fully support forget device yet, but handle the command gracefully
182                info!(
183                    device_id,
184                    "Forget device requested (not implemented in TUI)"
185                );
186                let _ = self
187                    .event_tx
188                    .send(SensorEvent::DeviceForgotten { device_id })
189                    .await;
190            }
191            Command::Shutdown => {
192                // Handled in run() loop
193            }
194        }
195    }
196
197    /// Load cached devices and readings from the store.
198    async fn handle_load_cached_data(&self) {
199        info!("Loading cached data from store");
200
201        let Some(store) = self.open_store() else {
202            // Send empty cached data
203            let _ = self
204                .event_tx
205                .send(SensorEvent::CachedDataLoaded { devices: vec![] })
206                .await;
207            return;
208        };
209
210        // Load all known devices
211        let stored_devices = match store.list_devices() {
212            Ok(devices) => devices,
213            Err(e) => {
214                warn!("Failed to list devices: {}", e);
215                let _ = self
216                    .event_tx
217                    .send(SensorEvent::CachedDataLoaded { devices: vec![] })
218                    .await;
219                return;
220            }
221        };
222
223        // Load latest reading for each device
224        let mut cached_devices = Vec::new();
225        for stored in stored_devices {
226            let reading = match store.get_latest_reading(&stored.id) {
227                Ok(Some(stored_reading)) => Some(CurrentReading {
228                    co2: stored_reading.co2,
229                    temperature: stored_reading.temperature,
230                    pressure: stored_reading.pressure,
231                    humidity: stored_reading.humidity,
232                    battery: stored_reading.battery,
233                    status: stored_reading.status,
234                    interval: 0, // Not stored
235                    age: 0,      // Will be calculated below
236                    captured_at: Some(stored_reading.captured_at),
237                    radon: stored_reading.radon,
238                    radiation_rate: stored_reading.radiation_rate,
239                    radiation_total: stored_reading.radiation_total,
240                    radon_avg_24h: None,
241                    radon_avg_7d: None,
242                    radon_avg_30d: None,
243                }),
244                Ok(None) => None,
245                Err(e) => {
246                    debug!("Failed to get latest reading for {}: {}", stored.id, e);
247                    None
248                }
249            };
250
251            // Get sync state for last sync time
252            let last_sync = match store.get_sync_state(&stored.id) {
253                Ok(Some(state)) => state.last_sync_at,
254                Ok(None) => None,
255                Err(e) => {
256                    debug!("Failed to get sync state for {}: {}", stored.id, e);
257                    None
258                }
259            };
260
261            cached_devices.push(CachedDevice {
262                id: stored.id,
263                name: stored.name,
264                device_type: stored.device_type,
265                reading,
266                last_sync,
267            });
268        }
269
270        info!(count = cached_devices.len(), "Loaded cached devices");
271
272        // Collect device IDs before sending (we need them for history loading)
273        let device_ids: Vec<String> = cached_devices.iter().map(|d| d.id.clone()).collect();
274
275        if let Err(e) = self
276            .event_tx
277            .send(SensorEvent::CachedDataLoaded {
278                devices: cached_devices,
279            })
280            .await
281        {
282            error!("Failed to send CachedDataLoaded event: {}", e);
283        }
284
285        // Load history for each cached device (for sparklines on startup)
286        for device_id in device_ids {
287            self.load_and_send_history(&device_id).await;
288        }
289    }
290
291    /// Handle a scan command.
292    async fn handle_scan(&self, duration: Duration) {
293        info!(?duration, "Starting device scan");
294
295        // Notify UI that scan has started
296        if let Err(e) = self.event_tx.send(SensorEvent::ScanStarted).await {
297            error!("Failed to send ScanStarted event: {}", e);
298            return;
299        }
300
301        // Perform the scan
302        let options = ScanOptions::default().duration(duration);
303        match scan_with_options(options).await {
304            Ok(devices) => {
305                info!(count = devices.len(), "Scan complete");
306
307                // Save discovered devices to store
308                self.save_discovered_devices(&devices);
309
310                if let Err(e) = self
311                    .event_tx
312                    .send(SensorEvent::ScanComplete { devices })
313                    .await
314                {
315                    error!("Failed to send ScanComplete event: {}", e);
316                }
317            }
318            Err(e) => {
319                error!("Scan failed: {}", e);
320                if let Err(send_err) = self
321                    .event_tx
322                    .send(SensorEvent::ScanError {
323                        error: e.to_string(),
324                    })
325                    .await
326                {
327                    error!("Failed to send ScanError event: {}", send_err);
328                }
329            }
330        }
331    }
332
333    /// Handle a connect command.
334    async fn handle_connect(&self, device_id: &str) {
335        info!(device_id, "Connecting to device");
336
337        // Notify UI that we're connecting
338        if let Err(e) = self
339            .event_tx
340            .send(SensorEvent::DeviceConnecting {
341                device_id: device_id.to_string(),
342            })
343            .await
344        {
345            error!("Failed to send DeviceConnecting event: {}", e);
346            return;
347        }
348
349        match timeout(CONNECT_READ_TIMEOUT, self.connect_and_read(device_id)).await {
350            Ok(Ok((name, device_type, reading, settings, rssi))) => {
351                info!(device_id, ?name, ?device_type, ?rssi, "Device connected");
352
353                // Update device metadata in store
354                self.update_device_metadata(device_id, name.as_deref(), device_type);
355
356                // Send connected event
357                if let Err(e) = self
358                    .event_tx
359                    .send(SensorEvent::DeviceConnected {
360                        device_id: device_id.to_string(),
361                        name,
362                        device_type,
363                        rssi,
364                    })
365                    .await
366                {
367                    error!("Failed to send DeviceConnected event: {}", e);
368                }
369
370                // Send settings if we got them
371                if let Some(settings) = settings
372                    && let Err(e) = self
373                        .event_tx
374                        .send(SensorEvent::SettingsLoaded {
375                            device_id: device_id.to_string(),
376                            settings,
377                        })
378                        .await
379                {
380                    error!("Failed to send SettingsLoaded event: {}", e);
381                }
382
383                // Send reading if we got one and save to store
384                if let Some(reading) = reading {
385                    // Save to store
386                    self.save_reading(device_id, &reading);
387
388                    if let Err(e) = self
389                        .event_tx
390                        .send(SensorEvent::ReadingUpdated {
391                            device_id: device_id.to_string(),
392                            reading,
393                        })
394                        .await
395                    {
396                        error!("Failed to send ReadingUpdated event: {}", e);
397                    }
398                }
399
400                // Load history for sparklines
401                self.load_and_send_history(device_id).await;
402            }
403            Ok(Err(e)) => {
404                error!(device_id, error = %e, "Failed to connect to device");
405                if let Err(send_err) = self
406                    .event_tx
407                    .send(SensorEvent::ConnectionError {
408                        device_id: device_id.to_string(),
409                        error: e.to_string(),
410                    })
411                    .await
412                {
413                    error!("Failed to send ConnectionError event: {}", send_err);
414                }
415            }
416            Err(_) => {
417                // Timeout expired
418                error!(device_id, "Connection timed out");
419                if let Err(send_err) = self
420                    .event_tx
421                    .send(SensorEvent::ConnectionError {
422                        device_id: device_id.to_string(),
423                        error: format!(
424                            "Connection timed out after {}s",
425                            CONNECT_READ_TIMEOUT.as_secs()
426                        ),
427                    })
428                    .await
429                {
430                    error!("Failed to send ConnectionError event: {}", send_err);
431                }
432            }
433        }
434    }
435
436    /// Handle a disconnect command.
437    ///
438    /// For TUI purposes, disconnection mostly means updating UI state since
439    /// we don't maintain persistent connections (we connect, read, and disconnect).
440    /// This sends a DeviceDisconnected event to update the UI.
441    async fn handle_disconnect(&self, device_id: &str) {
442        info!(device_id, "Disconnecting device");
443
444        // Send disconnected event to update UI state
445        if let Err(e) = self
446            .event_tx
447            .send(SensorEvent::DeviceDisconnected {
448                device_id: device_id.to_string(),
449            })
450            .await
451        {
452            error!("Failed to send DeviceDisconnected event: {}", e);
453        }
454    }
455
456    /// Handle a refresh reading command.
457    async fn handle_refresh_reading(&self, device_id: &str) {
458        info!(device_id, "Refreshing reading for device");
459
460        match timeout(CONNECT_READ_TIMEOUT, self.connect_and_read(device_id)).await {
461            Ok(Ok((_, _, reading, settings, _rssi))) => {
462                // Send settings if we got them
463                if let Some(settings) = settings
464                    && let Err(e) = self
465                        .event_tx
466                        .send(SensorEvent::SettingsLoaded {
467                            device_id: device_id.to_string(),
468                            settings,
469                        })
470                        .await
471                {
472                    error!("Failed to send SettingsLoaded event: {}", e);
473                }
474
475                if let Some(reading) = reading {
476                    info!(device_id, "Reading refreshed successfully");
477
478                    // Save to store
479                    self.save_reading(device_id, &reading);
480
481                    if let Err(e) = self
482                        .event_tx
483                        .send(SensorEvent::ReadingUpdated {
484                            device_id: device_id.to_string(),
485                            reading,
486                        })
487                        .await
488                    {
489                        error!("Failed to send ReadingUpdated event: {}", e);
490                    }
491                } else {
492                    warn!(device_id, "Connected but failed to read current values");
493                    if let Err(e) = self
494                        .event_tx
495                        .send(SensorEvent::ReadingError {
496                            device_id: device_id.to_string(),
497                            error: "Failed to read current values".to_string(),
498                        })
499                        .await
500                    {
501                        error!("Failed to send ReadingError event: {}", e);
502                    }
503                }
504            }
505            Ok(Err(e)) => {
506                error!(device_id, error = %e, "Failed to refresh reading");
507                if let Err(send_err) = self
508                    .event_tx
509                    .send(SensorEvent::ReadingError {
510                        device_id: device_id.to_string(),
511                        error: e.to_string(),
512                    })
513                    .await
514                {
515                    error!("Failed to send ReadingError event: {}", send_err);
516                }
517            }
518            Err(_) => {
519                // Timeout expired
520                error!(device_id, "Refresh reading timed out");
521                if let Err(send_err) = self
522                    .event_tx
523                    .send(SensorEvent::ReadingError {
524                        device_id: device_id.to_string(),
525                        error: format!(
526                            "Refresh timed out after {}s",
527                            CONNECT_READ_TIMEOUT.as_secs()
528                        ),
529                    })
530                    .await
531                {
532                    error!("Failed to send ReadingError event: {}", send_err);
533                }
534            }
535        }
536    }
537
538    /// Handle a refresh all command.
539    ///
540    /// Refreshes readings from all known devices by iterating through
541    /// and calling handle_refresh_reading for each device.
542    async fn handle_refresh_all(&self) {
543        info!("Refreshing all devices");
544
545        // Open store to get list of known devices
546        let Some(store) = self.open_store() else {
547            return;
548        };
549
550        let devices = match store.list_devices() {
551            Ok(devices) => devices,
552            Err(e) => {
553                warn!("Failed to list devices for refresh all: {}", e);
554                return;
555            }
556        };
557
558        // Refresh each device
559        for device in devices {
560            self.handle_refresh_reading(&device.id).await;
561        }
562
563        info!("Completed refreshing all devices");
564    }
565
566    /// Handle a set interval command.
567    ///
568    /// Connects to the device, sets the measurement interval, and sends
569    /// the appropriate event back to the UI.
570    async fn handle_set_interval(&self, device_id: &str, interval_secs: u16) {
571        info!(device_id, interval_secs, "Setting measurement interval");
572
573        // Validate and convert seconds to MeasurementInterval
574        let interval = match MeasurementInterval::from_seconds(interval_secs) {
575            Some(i) => i,
576            None => {
577                let error = format!(
578                    "Invalid interval: {} seconds. Must be 60, 120, 300, or 600.",
579                    interval_secs
580                );
581                error!(device_id, %error, "Invalid interval value");
582                let _ = self
583                    .event_tx
584                    .send(SensorEvent::IntervalError {
585                        device_id: device_id.to_string(),
586                        error,
587                    })
588                    .await;
589                return;
590            }
591        };
592
593        // Connect to the device
594        let device = match Device::connect(device_id).await {
595            Ok(d) => d,
596            Err(e) => {
597                error!(device_id, error = %e, "Failed to connect for set interval");
598                let _ = self
599                    .event_tx
600                    .send(SensorEvent::IntervalError {
601                        device_id: device_id.to_string(),
602                        error: e.to_string(),
603                    })
604                    .await;
605                return;
606            }
607        };
608
609        // Set the interval
610        if let Err(e) = device.set_interval(interval).await {
611            error!(device_id, error = %e, "Failed to set interval");
612            let _ = device.disconnect().await;
613            let _ = self
614                .event_tx
615                .send(SensorEvent::IntervalError {
616                    device_id: device_id.to_string(),
617                    error: e.to_string(),
618                })
619                .await;
620            return;
621        }
622
623        // Disconnect from device
624        if let Err(e) = device.disconnect().await {
625            warn!(device_id, error = %e, "Failed to disconnect after setting interval");
626        }
627
628        info!(
629            device_id,
630            interval_secs, "Measurement interval set successfully"
631        );
632
633        // Send success event
634        if let Err(e) = self
635            .event_tx
636            .send(SensorEvent::IntervalChanged {
637                device_id: device_id.to_string(),
638                interval_secs,
639            })
640            .await
641        {
642            error!("Failed to send IntervalChanged event: {}", e);
643        }
644    }
645
646    /// Handle a set bluetooth range command.
647    async fn handle_set_bluetooth_range(&self, device_id: &str, extended: bool) {
648        let range_name = if extended { "Extended" } else { "Standard" };
649        info!(device_id, range_name, "Setting Bluetooth range");
650
651        // Connect to the device
652        let device = match Device::connect(device_id).await {
653            Ok(d) => d,
654            Err(e) => {
655                error!(device_id, error = %e, "Failed to connect for set Bluetooth range");
656                let _ = self
657                    .event_tx
658                    .send(SensorEvent::BluetoothRangeError {
659                        device_id: device_id.to_string(),
660                        error: e.to_string(),
661                    })
662                    .await;
663                return;
664            }
665        };
666
667        // Set the Bluetooth range
668        let range = if extended {
669            BluetoothRange::Extended
670        } else {
671            BluetoothRange::Standard
672        };
673
674        if let Err(e) = device.set_bluetooth_range(range).await {
675            error!(device_id, error = %e, "Failed to set Bluetooth range");
676            let _ = device.disconnect().await;
677            let _ = self
678                .event_tx
679                .send(SensorEvent::BluetoothRangeError {
680                    device_id: device_id.to_string(),
681                    error: e.to_string(),
682                })
683                .await;
684            return;
685        }
686
687        // Disconnect from device
688        if let Err(e) = device.disconnect().await {
689            warn!(device_id, error = %e, "Failed to disconnect after setting Bluetooth range");
690        }
691
692        info!(device_id, range_name, "Bluetooth range set successfully");
693
694        // Send success event
695        if let Err(e) = self
696            .event_tx
697            .send(SensorEvent::BluetoothRangeChanged {
698                device_id: device_id.to_string(),
699                extended,
700            })
701            .await
702        {
703            error!("Failed to send BluetoothRangeChanged event: {}", e);
704        }
705    }
706
707    /// Handle a set smart home command.
708    async fn handle_set_smart_home(&self, device_id: &str, enabled: bool) {
709        let mode = if enabled { "enabled" } else { "disabled" };
710        info!(device_id, mode, "Setting Smart Home");
711
712        // Connect to the device
713        let device = match Device::connect(device_id).await {
714            Ok(d) => d,
715            Err(e) => {
716                error!(device_id, error = %e, "Failed to connect for set Smart Home");
717                let _ = self
718                    .event_tx
719                    .send(SensorEvent::SmartHomeError {
720                        device_id: device_id.to_string(),
721                        error: e.to_string(),
722                    })
723                    .await;
724                return;
725            }
726        };
727
728        // Set Smart Home mode
729        if let Err(e) = device.set_smart_home(enabled).await {
730            error!(device_id, error = %e, "Failed to set Smart Home");
731            let _ = device.disconnect().await;
732            let _ = self
733                .event_tx
734                .send(SensorEvent::SmartHomeError {
735                    device_id: device_id.to_string(),
736                    error: e.to_string(),
737                })
738                .await;
739            return;
740        }
741
742        // Disconnect from device
743        if let Err(e) = device.disconnect().await {
744            warn!(device_id, error = %e, "Failed to disconnect after setting Smart Home");
745        }
746
747        info!(device_id, mode, "Smart Home set successfully");
748
749        // Send success event
750        if let Err(e) = self
751            .event_tx
752            .send(SensorEvent::SmartHomeChanged {
753                device_id: device_id.to_string(),
754                enabled,
755            })
756            .await
757        {
758            error!("Failed to send SmartHomeChanged event: {}", e);
759        }
760    }
761
762    /// Connect to a device and read its current values.
763    ///
764    /// Returns the device name, type, current reading, settings, and RSSI if successful.
765    /// The device is disconnected after reading.
766    async fn connect_and_read(
767        &self,
768        device_id: &str,
769    ) -> Result<
770        (
771            Option<String>,
772            Option<DeviceType>,
773            Option<CurrentReading>,
774            Option<DeviceSettings>,
775            Option<i16>,
776        ),
777        aranet_core::Error,
778    > {
779        let device = Device::connect(device_id).await?;
780
781        let name = device.name().map(String::from);
782        let device_type = device.device_type();
783
784        // Try to read current values
785        let reading = match device.read_current().await {
786            Ok(reading) => {
787                info!(device_id, "Read current values successfully");
788                Some(reading)
789            }
790            Err(e) => {
791                warn!(device_id, error = %e, "Failed to read current values");
792                None
793            }
794        };
795
796        // Try to read device settings
797        let settings = match device.get_settings().await {
798            Ok(settings) => {
799                info!(device_id, ?settings, "Read device settings successfully");
800                Some(settings)
801            }
802            Err(e) => {
803                warn!(device_id, error = %e, "Failed to read device settings");
804                None
805            }
806        };
807
808        // Try to read RSSI signal strength
809        let rssi = device.read_rssi().await.ok();
810
811        // Disconnect from the device
812        if let Err(e) = device.disconnect().await {
813            warn!(device_id, error = %e, "Failed to disconnect from device");
814        }
815
816        Ok((name, device_type, reading, settings, rssi))
817    }
818
819    /// Save a reading to the store.
820    fn save_reading(&self, device_id: &str, reading: &CurrentReading) {
821        let Some(store) = self.open_store() else {
822            return;
823        };
824
825        if let Err(e) = store.insert_reading(device_id, reading) {
826            warn!(device_id, error = %e, "Failed to save reading to store");
827        } else {
828            debug!(device_id, "Reading saved to store");
829        }
830    }
831
832    /// Save discovered devices to the store.
833    fn save_discovered_devices(&self, devices: &[aranet_core::DiscoveredDevice]) {
834        let Some(store) = self.open_store() else {
835            return;
836        };
837
838        for device in devices {
839            let device_id = device.id.to_string();
840            // Upsert the device with name
841            if let Err(e) = store.upsert_device(&device_id, device.name.as_deref()) {
842                warn!(device_id, error = %e, "Failed to upsert device");
843                continue;
844            }
845            // Update device type if known
846            if let Some(device_type) = device.device_type
847                && let Err(e) = store.update_device_metadata(&device_id, None, Some(device_type))
848            {
849                warn!(device_id, error = %e, "Failed to update device metadata");
850            }
851        }
852
853        debug!(count = devices.len(), "Saved discovered devices to store");
854    }
855
856    /// Update device metadata in the store.
857    fn update_device_metadata(
858        &self,
859        device_id: &str,
860        name: Option<&str>,
861        device_type: Option<DeviceType>,
862    ) {
863        let Some(store) = self.open_store() else {
864            return;
865        };
866
867        // Ensure device exists
868        if let Err(e) = store.upsert_device(device_id, name) {
869            warn!(device_id, error = %e, "Failed to upsert device");
870            return;
871        }
872
873        // Update metadata
874        if let Err(e) = store.update_device_metadata(device_id, name, device_type) {
875            warn!(device_id, error = %e, "Failed to update device metadata");
876        } else {
877            debug!(device_id, ?name, ?device_type, "Device metadata updated");
878        }
879    }
880
881    /// Load history from store and send to UI.
882    async fn load_and_send_history(&self, device_id: &str) {
883        let Some(store) = self.open_store() else {
884            return;
885        };
886
887        // Query all history for the device (no limit)
888        // The UI will filter by time range and resample for sparkline display
889        use aranet_store::HistoryQuery;
890        let query = HistoryQuery::new().device(device_id).oldest_first(); // Chronological order for sparkline (oldest to newest)
891
892        match store.query_history(&query) {
893            Ok(stored_records) => {
894                // Convert StoredHistoryRecord to HistoryRecord
895                let records: Vec<aranet_types::HistoryRecord> = stored_records
896                    .into_iter()
897                    .map(|r| aranet_types::HistoryRecord {
898                        timestamp: r.timestamp,
899                        co2: r.co2,
900                        temperature: r.temperature,
901                        pressure: r.pressure,
902                        humidity: r.humidity,
903                        radon: r.radon,
904                        radiation_rate: r.radiation_rate,
905                        radiation_total: r.radiation_total,
906                    })
907                    .collect();
908
909                info!(
910                    device_id,
911                    count = records.len(),
912                    "Loaded history from store"
913                );
914
915                if let Err(e) = self
916                    .event_tx
917                    .send(SensorEvent::HistoryLoaded {
918                        device_id: device_id.to_string(),
919                        records,
920                    })
921                    .await
922                {
923                    error!("Failed to send HistoryLoaded event: {}", e);
924                }
925            }
926            Err(e) => {
927                warn!(device_id, error = %e, "Failed to query history from store");
928            }
929        }
930    }
931
932    /// Sync history from device (download via BLE and save to store).
933    ///
934    /// Uses incremental sync - only downloads new records since the last sync.
935    async fn handle_sync_history(&self, device_id: &str) {
936        use aranet_core::history::HistoryOptions;
937
938        info!(device_id, "Syncing history from device");
939
940        // Notify UI that sync is starting
941        if let Err(e) = self
942            .event_tx
943            .send(SensorEvent::HistorySyncStarted {
944                device_id: device_id.to_string(),
945            })
946            .await
947        {
948            error!("Failed to send HistorySyncStarted event: {}", e);
949            return;
950        }
951
952        // Open store first to check sync state
953        let Some(store) = self.open_store() else {
954            let _ = self
955                .event_tx
956                .send(SensorEvent::HistorySyncError {
957                    device_id: device_id.to_string(),
958                    error: "Failed to open store".to_string(),
959                })
960                .await;
961            return;
962        };
963
964        // Connect to the device
965        let device = match Device::connect(device_id).await {
966            Ok(d) => d,
967            Err(e) => {
968                error!(device_id, error = %e, "Failed to connect for history sync");
969                let _ = self
970                    .event_tx
971                    .send(SensorEvent::HistorySyncError {
972                        device_id: device_id.to_string(),
973                        error: e.to_string(),
974                    })
975                    .await;
976                return;
977            }
978        };
979
980        // Get history info to know how many records are on the device
981        let history_info = match device.get_history_info().await {
982            Ok(info) => info,
983            Err(e) => {
984                error!(device_id, error = %e, "Failed to get history info");
985                let _ = device.disconnect().await;
986                let _ = self
987                    .event_tx
988                    .send(SensorEvent::HistorySyncError {
989                        device_id: device_id.to_string(),
990                        error: e.to_string(),
991                    })
992                    .await;
993                return;
994            }
995        };
996
997        let total_on_device = history_info.total_readings;
998
999        // Calculate start index for incremental sync
1000        let start_index = match store.calculate_sync_start(device_id, total_on_device) {
1001            Ok(idx) => idx,
1002            Err(e) => {
1003                warn!(device_id, error = %e, "Failed to calculate sync start, doing full sync");
1004                1u16
1005            }
1006        };
1007
1008        // Check if already up to date
1009        if start_index > total_on_device {
1010            info!(device_id, "Already up to date, no new readings to sync");
1011            let _ = device.disconnect().await;
1012            let _ = self
1013                .event_tx
1014                .send(SensorEvent::HistorySynced {
1015                    device_id: device_id.to_string(),
1016                    count: 0,
1017                })
1018                .await;
1019            // Still load history from store to update UI
1020            self.load_and_send_history(device_id).await;
1021            return;
1022        }
1023
1024        let records_to_download = total_on_device.saturating_sub(start_index) + 1;
1025        info!(
1026            device_id,
1027            start_index,
1028            total_on_device,
1029            records_to_download,
1030            "Downloading history (incremental sync)"
1031        );
1032
1033        // Download history with start_index for incremental sync
1034        let history_options = HistoryOptions {
1035            start_index: Some(start_index),
1036            end_index: None, // Download to the end
1037            ..Default::default()
1038        };
1039
1040        let records = match device.download_history_with_options(history_options).await {
1041            Ok(r) => r,
1042            Err(e) => {
1043                error!(device_id, error = %e, "Failed to download history");
1044                let _ = device.disconnect().await;
1045                let _ = self
1046                    .event_tx
1047                    .send(SensorEvent::HistorySyncError {
1048                        device_id: device_id.to_string(),
1049                        error: e.to_string(),
1050                    })
1051                    .await;
1052                return;
1053            }
1054        };
1055
1056        let record_count = records.len();
1057        info!(
1058            device_id,
1059            count = record_count,
1060            "Downloaded history from device"
1061        );
1062
1063        // Disconnect from device
1064        let _ = device.disconnect().await;
1065
1066        // Insert history to store (with deduplication)
1067        if let Err(e) = store.insert_history(device_id, &records) {
1068            warn!(device_id, error = %e, "Failed to save history to store");
1069        } else {
1070            debug!(device_id, count = record_count, "History saved to store");
1071        }
1072
1073        // Update sync state for next incremental sync
1074        if let Err(e) = store.update_sync_state(device_id, total_on_device, total_on_device) {
1075            warn!(device_id, error = %e, "Failed to update sync state");
1076        }
1077
1078        // Notify UI that sync is complete
1079        if let Err(e) = self
1080            .event_tx
1081            .send(SensorEvent::HistorySynced {
1082                device_id: device_id.to_string(),
1083                count: record_count,
1084            })
1085            .await
1086        {
1087            error!("Failed to send HistorySynced event: {}", e);
1088        }
1089
1090        // Send history to UI for sparklines
1091        self.load_and_send_history(device_id).await;
1092    }
1093
1094    /// Handle refreshing the aranet-service status.
1095    async fn handle_refresh_service_status(&self) {
1096        info!("Refreshing service status");
1097
1098        let Some(ref client) = self.service_client else {
1099            let _ = self
1100                .event_tx
1101                .send(SensorEvent::ServiceStatusError {
1102                    error: "Service client not available".to_string(),
1103                })
1104                .await;
1105            return;
1106        };
1107
1108        match client.status().await {
1109            Ok(status) => {
1110                // Convert device stats to our message type
1111                let devices: Vec<ServiceDeviceStats> = status
1112                    .devices
1113                    .into_iter()
1114                    .map(|d| ServiceDeviceStats {
1115                        device_id: d.device_id,
1116                        alias: d.alias,
1117                        poll_interval: d.poll_interval,
1118                        polling: d.polling,
1119                        success_count: d.success_count,
1120                        failure_count: d.failure_count,
1121                        last_poll_at: d.last_poll_at,
1122                        last_error: d.last_error,
1123                    })
1124                    .collect();
1125
1126                let _ = self
1127                    .event_tx
1128                    .send(SensorEvent::ServiceStatusRefreshed {
1129                        reachable: true,
1130                        collector_running: status.collector.running,
1131                        uptime_seconds: status.collector.uptime_seconds,
1132                        devices,
1133                    })
1134                    .await;
1135            }
1136            Err(e) => {
1137                // Check if it's a connection error (service not running)
1138                let (reachable, error_msg) = match &e {
1139                    aranet_core::service_client::ServiceClientError::NotReachable { .. } => {
1140                        (false, "Service not reachable".to_string())
1141                    }
1142                    _ => (false, e.to_string()),
1143                };
1144
1145                if reachable {
1146                    let _ = self
1147                        .event_tx
1148                        .send(SensorEvent::ServiceStatusError { error: error_msg })
1149                        .await;
1150                } else {
1151                    // Send status with reachable=false
1152                    let _ = self
1153                        .event_tx
1154                        .send(SensorEvent::ServiceStatusRefreshed {
1155                            reachable: false,
1156                            collector_running: false,
1157                            uptime_seconds: None,
1158                            devices: vec![],
1159                        })
1160                        .await;
1161                }
1162            }
1163        }
1164    }
1165
1166    /// Handle starting the aranet-service collector.
1167    async fn handle_start_service_collector(&self) {
1168        info!("Starting service collector");
1169
1170        let Some(ref client) = self.service_client else {
1171            let _ = self
1172                .event_tx
1173                .send(SensorEvent::ServiceCollectorError {
1174                    error: "Service client not available".to_string(),
1175                })
1176                .await;
1177            return;
1178        };
1179
1180        match client.start_collector().await {
1181            Ok(_) => {
1182                let _ = self
1183                    .event_tx
1184                    .send(SensorEvent::ServiceCollectorStarted)
1185                    .await;
1186                // Refresh status to get updated state
1187                self.handle_refresh_service_status().await;
1188            }
1189            Err(e) => {
1190                let _ = self
1191                    .event_tx
1192                    .send(SensorEvent::ServiceCollectorError {
1193                        error: e.to_string(),
1194                    })
1195                    .await;
1196            }
1197        }
1198    }
1199
1200    /// Handle stopping the aranet-service collector.
1201    async fn handle_stop_service_collector(&self) {
1202        info!("Stopping service collector");
1203
1204        let Some(ref client) = self.service_client else {
1205            let _ = self
1206                .event_tx
1207                .send(SensorEvent::ServiceCollectorError {
1208                    error: "Service client not available".to_string(),
1209                })
1210                .await;
1211            return;
1212        };
1213
1214        match client.stop_collector().await {
1215            Ok(_) => {
1216                let _ = self
1217                    .event_tx
1218                    .send(SensorEvent::ServiceCollectorStopped)
1219                    .await;
1220                // Refresh status to get updated state
1221                self.handle_refresh_service_status().await;
1222            }
1223            Err(e) => {
1224                let _ = self
1225                    .event_tx
1226                    .send(SensorEvent::ServiceCollectorError {
1227                        error: e.to_string(),
1228                    })
1229                    .await;
1230            }
1231        }
1232    }
1233
1234    async fn handle_set_alias(&self, device_id: &str, alias: Option<String>) {
1235        info!("Setting alias for device {} to {:?}", device_id, alias);
1236
1237        let Some(store) = self.open_store() else {
1238            let _ = self
1239                .event_tx
1240                .send(SensorEvent::AliasError {
1241                    device_id: device_id.to_string(),
1242                    error: "Could not open database".to_string(),
1243                })
1244                .await;
1245            return;
1246        };
1247
1248        match store.update_device_metadata(device_id, alias.as_deref(), None) {
1249            Ok(()) => {
1250                info!("Alias updated successfully for {}", device_id);
1251                let _ = self
1252                    .event_tx
1253                    .send(SensorEvent::AliasChanged {
1254                        device_id: device_id.to_string(),
1255                        alias,
1256                    })
1257                    .await;
1258            }
1259            Err(e) => {
1260                let _ = self
1261                    .event_tx
1262                    .send(SensorEvent::AliasError {
1263                        device_id: device_id.to_string(),
1264                        error: e.to_string(),
1265                    })
1266                    .await;
1267            }
1268        }
1269    }
1270}