Skip to main content

aranet_service/
state.rs

1//! Application state shared across handlers.
2//!
3//! # Broadcast Channel Behavior
4//!
5//! The `readings_tx` broadcast channel is used for real-time updates to WebSocket clients.
6//! Key characteristics:
7//!
8//! - **Buffer size**: Configurable via `server.broadcast_buffer` (default: 100)
9//! - **Message loss**: If a subscriber falls behind and the buffer fills, old messages are dropped
10//! - **No blocking**: Senders never block; they succeed or drop messages for slow receivers
11//!
12//! ## Tuning the Buffer Size
13//!
14//! - **Increase** if WebSocket clients frequently miss messages (e.g., slow network)
15//! - **Decrease** to reduce memory usage in resource-constrained environments
16//! - **Monitor** using the `/api/status` endpoint to track message delivery
17//!
18//! ## Example Configuration
19//!
20//! ```toml
21//! [server]
22//! bind = "127.0.0.1:8080"
23//! broadcast_buffer = 200  # Larger buffer for slow clients
24//! ```
25
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29use std::time::Duration;
30
31use aranet_store::Store;
32use time::OffsetDateTime;
33use tokio::sync::{Mutex, RwLock, broadcast, watch};
34use tokio::task::JoinSet;
35use tracing::warn;
36
37use crate::config::{Config, default_config_path};
38
39/// Shared application state.
40pub struct AppState {
41    /// The data store (wrapped in Mutex for thread-safe access).
42    pub store: Mutex<Store>,
43    /// Configuration (RwLock for runtime updates).
44    pub config: RwLock<Config>,
45    /// Path to the configuration file (for saving changes).
46    pub config_path: PathBuf,
47    /// Broadcast channel for real-time reading updates.
48    pub readings_tx: broadcast::Sender<ReadingEvent>,
49    /// Collector control state.
50    pub collector: CollectorState,
51}
52
53impl AppState {
54    /// Create new application state.
55    ///
56    /// The broadcast channel buffer size is determined by `config.server.broadcast_buffer`.
57    /// If the buffer fills (slow subscribers), old messages are dropped without blocking.
58    pub fn new(store: Store, config: Config) -> Arc<Self> {
59        Self::with_config_path(store, config, default_config_path())
60    }
61
62    /// Create new application state with a custom config path.
63    pub fn with_config_path(store: Store, config: Config, config_path: PathBuf) -> Arc<Self> {
64        let buffer_size = config.server.broadcast_buffer;
65        let (readings_tx, _) = broadcast::channel(buffer_size);
66        Arc::new(Self {
67            store: Mutex::new(store),
68            config: RwLock::new(config),
69            config_path,
70            readings_tx,
71            collector: CollectorState::new(),
72        })
73    }
74
75    /// Save the current configuration to disk.
76    ///
77    /// This should be called after any configuration changes made via the API.
78    pub async fn save_config(&self) -> Result<(), crate::config::ConfigError> {
79        let config = self.config.read().await;
80        config.save(&self.config_path)
81    }
82
83    /// Save the configuration to disk, logging any errors.
84    ///
85    /// This is a convenience method for fire-and-forget saves.
86    pub async fn save_config_or_log(&self) {
87        if let Err(e) = self.save_config().await {
88            warn!("Failed to save configuration: {}", e);
89        }
90    }
91
92    /// Signal that the device configuration has changed.
93    ///
94    /// This saves the config to disk and signals the collector to reload
95    /// if it is currently running.
96    pub async fn on_devices_changed(&self) {
97        self.save_config_or_log().await;
98        if self.collector.is_running() {
99            self.collector.signal_reload();
100        }
101    }
102}
103
104/// State for tracking and controlling the collector.
105pub struct CollectorState {
106    /// Whether the collector is currently running.
107    running: AtomicBool,
108    /// When the collector was started (Unix timestamp).
109    started_at: AtomicU64,
110    /// Channel to signal collector tasks to stop.
111    stop_tx: watch::Sender<bool>,
112    /// Receiver for stop signal (cloned by collector tasks).
113    stop_rx: watch::Receiver<bool>,
114    /// Channel to signal configuration reload.
115    reload_tx: watch::Sender<u64>,
116    /// Receiver for reload signal.
117    reload_rx: watch::Receiver<u64>,
118    /// Per-device collection stats.
119    pub device_stats: RwLock<Vec<DeviceCollectionStats>>,
120    /// Shared JoinSet for device polling tasks.
121    ///
122    /// This allows both the initial collector start and the reload watcher
123    /// to track spawned tasks, ensuring proper cleanup on stop.
124    pub device_tasks: Mutex<JoinSet<()>>,
125}
126
127impl CollectorState {
128    /// Create a new collector state.
129    pub fn new() -> Self {
130        let (stop_tx, stop_rx) = watch::channel(false);
131        let (reload_tx, reload_rx) = watch::channel(0u64);
132        Self {
133            running: AtomicBool::new(false),
134            started_at: AtomicU64::new(0),
135            stop_tx,
136            stop_rx,
137            reload_tx,
138            reload_rx,
139            device_stats: RwLock::new(Vec::new()),
140            device_tasks: Mutex::new(JoinSet::new()),
141        }
142    }
143
144    /// Check if the collector is running.
145    pub fn is_running(&self) -> bool {
146        self.running.load(Ordering::SeqCst)
147    }
148
149    /// Mark the collector as started.
150    pub fn set_running(&self, running: bool) {
151        self.running.store(running, Ordering::SeqCst);
152        if running {
153            let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
154            self.started_at.store(now, Ordering::SeqCst);
155        }
156    }
157
158    /// Get the collector start time.
159    pub fn started_at(&self) -> Option<OffsetDateTime> {
160        let ts = self.started_at.load(Ordering::SeqCst);
161        if ts == 0 {
162            None
163        } else {
164            OffsetDateTime::from_unix_timestamp(ts as i64).ok()
165        }
166    }
167
168    /// Get a receiver for the stop signal.
169    pub fn subscribe_stop(&self) -> watch::Receiver<bool> {
170        self.stop_rx.clone()
171    }
172
173    /// Signal all collector tasks to stop.
174    pub fn signal_stop(&self) {
175        let _ = self.stop_tx.send(true);
176        self.running.store(false, Ordering::SeqCst);
177    }
178
179    /// Reset the stop signal (for restarting).
180    pub fn reset_stop(&self) {
181        let _ = self.stop_tx.send(false);
182    }
183
184    /// Get a receiver for the reload signal.
185    pub fn subscribe_reload(&self) -> watch::Receiver<u64> {
186        self.reload_rx.clone()
187    }
188
189    /// Signal the collector to reload its configuration.
190    ///
191    /// This is used when devices are added, removed, or modified via the API.
192    /// The collector will restart its tasks with the new configuration.
193    pub fn signal_reload(&self) {
194        // Increment the counter to trigger the reload
195        let current = *self.reload_rx.borrow();
196        let _ = self.reload_tx.send(current.wrapping_add(1));
197    }
198
199    /// Wait for all device tasks to complete, with a timeout.
200    ///
201    /// Returns `true` if all tasks stopped cleanly within the timeout,
202    /// `false` if the timeout was reached and tasks were aborted.
203    pub async fn wait_for_device_tasks(&self, timeout: Duration) -> bool {
204        let wait_result = tokio::time::timeout(timeout, async {
205            let mut tasks = self.device_tasks.lock().await;
206            while tasks.join_next().await.is_some() {}
207        })
208        .await;
209
210        if wait_result.is_err() {
211            // Timeout - abort remaining tasks
212            let mut tasks = self.device_tasks.lock().await;
213            tasks.abort_all();
214            false
215        } else {
216            true
217        }
218    }
219
220    /// Spawn a device task into the shared JoinSet.
221    pub async fn spawn_device_task<F>(&self, future: F)
222    where
223        F: std::future::Future<Output = ()> + Send + 'static,
224    {
225        let mut tasks = self.device_tasks.lock().await;
226        tasks.spawn(future);
227    }
228}
229
230impl Default for CollectorState {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236/// Collection statistics for a single device.
237#[derive(Debug, Clone, serde::Serialize)]
238pub struct DeviceCollectionStats {
239    /// Device ID/address.
240    pub device_id: String,
241    /// Device alias.
242    pub alias: Option<String>,
243    /// Poll interval in seconds.
244    pub poll_interval: u64,
245    /// Time of last successful poll.
246    #[serde(with = "time::serde::rfc3339::option")]
247    pub last_poll_at: Option<OffsetDateTime>,
248    /// Time of last failed poll.
249    #[serde(with = "time::serde::rfc3339::option")]
250    pub last_error_at: Option<OffsetDateTime>,
251    /// Last error message.
252    pub last_error: Option<String>,
253    /// Total successful polls.
254    pub success_count: u64,
255    /// Total failed polls.
256    pub failure_count: u64,
257    /// Whether the device is currently being polled.
258    pub polling: bool,
259}
260
261/// A reading event for WebSocket broadcast.
262#[derive(Debug, Clone, serde::Serialize)]
263pub struct ReadingEvent {
264    /// Device ID.
265    pub device_id: String,
266    /// The reading data.
267    pub reading: aranet_store::StoredReading,
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::config::Config;
274    use aranet_types::Status;
275
276    fn create_test_reading(device_id: &str, co2: u16) -> aranet_store::StoredReading {
277        aranet_store::StoredReading {
278            id: 1,
279            device_id: device_id.to_string(),
280            co2,
281            temperature: 22.5,
282            humidity: 45,
283            pressure: 1013.0,
284            battery: 85,
285            status: Status::Green,
286            radon: None,
287            radiation_rate: None,
288            radiation_total: None,
289            captured_at: time::OffsetDateTime::now_utc(),
290        }
291    }
292
293    #[tokio::test]
294    async fn test_app_state_new() {
295        let store = Store::open_in_memory().unwrap();
296        let config = Config::default();
297        let state = AppState::new(store, config);
298
299        let config = state.config.read().await;
300        assert_eq!(config.server.bind, "127.0.0.1:8080");
301    }
302
303    #[test]
304    fn test_collector_state() {
305        let collector = CollectorState::new();
306        assert!(!collector.is_running());
307        assert!(collector.started_at().is_none());
308
309        collector.set_running(true);
310        assert!(collector.is_running());
311        assert!(collector.started_at().is_some());
312
313        collector.signal_stop();
314        assert!(!collector.is_running());
315    }
316
317    #[tokio::test]
318    async fn test_app_state_store_access() {
319        let store = Store::open_in_memory().unwrap();
320        let config = Config::default();
321        let state = AppState::new(store, config);
322
323        let store = state.store.lock().await;
324        let devices = store.list_devices().unwrap();
325        assert!(devices.is_empty());
326    }
327
328    #[tokio::test]
329    async fn test_app_state_broadcast_channel() {
330        let store = Store::open_in_memory().unwrap();
331        let config = Config::default();
332        let state = AppState::new(store, config);
333
334        let mut rx = state.readings_tx.subscribe();
335
336        let reading = create_test_reading("test", 450);
337
338        let event = ReadingEvent {
339            device_id: "test".to_string(),
340            reading: reading.clone(),
341        };
342
343        // Send should succeed (at least one subscriber)
344        state.readings_tx.send(event.clone()).unwrap();
345
346        // Receive and verify
347        let received = rx.recv().await.unwrap();
348        assert_eq!(received.device_id, "test");
349        assert_eq!(received.reading.co2, 450);
350    }
351
352    #[test]
353    fn test_reading_event_serialization() {
354        let reading = create_test_reading("AA:BB:CC:DD:EE:FF", 500);
355
356        let event = ReadingEvent {
357            device_id: "AA:BB:CC:DD:EE:FF".to_string(),
358            reading,
359        };
360
361        let json = serde_json::to_string(&event).unwrap();
362        assert!(json.contains("AA:BB:CC:DD:EE:FF"));
363        assert!(json.contains("500"));
364    }
365
366    #[test]
367    fn test_reading_event_debug() {
368        let reading = create_test_reading("test", 400);
369
370        let event = ReadingEvent {
371            device_id: "test".to_string(),
372            reading,
373        };
374
375        let debug = format!("{:?}", event);
376        assert!(debug.contains("ReadingEvent"));
377        assert!(debug.contains("test"));
378    }
379
380    #[test]
381    fn test_collector_state_default() {
382        let collector = CollectorState::default();
383        assert!(!collector.is_running());
384        assert!(collector.started_at().is_none());
385    }
386
387    #[test]
388    fn test_collector_state_subscribe_stop() {
389        let collector = CollectorState::new();
390
391        // Get multiple receivers
392        let rx1 = collector.subscribe_stop();
393        let rx2 = collector.subscribe_stop();
394
395        // Both should see the initial value (false)
396        assert!(!*rx1.borrow());
397        assert!(!*rx2.borrow());
398    }
399
400    #[test]
401    fn test_collector_state_stop_and_reset() {
402        let collector = CollectorState::new();
403        let rx = collector.subscribe_stop();
404
405        // Initially not stopped
406        assert!(!*rx.borrow());
407
408        // Signal stop
409        collector.signal_stop();
410        assert!(*rx.borrow());
411
412        // Reset
413        collector.reset_stop();
414        assert!(!*rx.borrow());
415    }
416
417    #[test]
418    fn test_collector_state_running_toggle() {
419        let collector = CollectorState::new();
420
421        assert!(!collector.is_running());
422        assert!(collector.started_at().is_none());
423
424        collector.set_running(true);
425        assert!(collector.is_running());
426        let started = collector.started_at();
427        assert!(started.is_some());
428
429        // Set running again - should update timestamp
430        std::thread::sleep(std::time::Duration::from_secs(1));
431        collector.set_running(true);
432        let started2 = collector.started_at();
433        assert!(started2 >= started);
434
435        collector.set_running(false);
436        assert!(!collector.is_running());
437        // Note: started_at is not reset when set_running(false)
438    }
439
440    #[tokio::test]
441    async fn test_collector_state_device_stats_rw_lock() {
442        let collector = CollectorState::new();
443
444        // Write to stats
445        {
446            let mut stats = collector.device_stats.write().await;
447            stats.push(DeviceCollectionStats {
448                device_id: "test-1".to_string(),
449                alias: Some("Test 1".to_string()),
450                poll_interval: 60,
451                last_poll_at: None,
452                last_error_at: None,
453                last_error: None,
454                success_count: 0,
455                failure_count: 0,
456                polling: false,
457            });
458        }
459
460        // Read from stats
461        let stats = collector.device_stats.read().await;
462        assert_eq!(stats.len(), 1);
463        assert_eq!(stats[0].device_id, "test-1");
464    }
465
466    #[test]
467    fn test_device_collection_stats_serialization() {
468        let stats = DeviceCollectionStats {
469            device_id: "AA:BB:CC:DD:EE:FF".to_string(),
470            alias: Some("Kitchen Sensor".to_string()),
471            poll_interval: 120,
472            last_poll_at: Some(time::OffsetDateTime::now_utc()),
473            last_error_at: None,
474            last_error: None,
475            success_count: 42,
476            failure_count: 3,
477            polling: true,
478        };
479
480        let json = serde_json::to_string(&stats).unwrap();
481
482        assert!(json.contains("AA:BB:CC:DD:EE:FF"));
483        assert!(json.contains("Kitchen Sensor"));
484        assert!(json.contains("120"));
485        assert!(json.contains("42"));
486        assert!(json.contains("3"));
487        assert!(json.contains("true"));
488    }
489
490    #[test]
491    fn test_device_collection_stats_with_error() {
492        let stats = DeviceCollectionStats {
493            device_id: "test".to_string(),
494            alias: None,
495            poll_interval: 60,
496            last_poll_at: None,
497            last_error_at: Some(time::OffsetDateTime::now_utc()),
498            last_error: Some("Connection timeout".to_string()),
499            success_count: 10,
500            failure_count: 5,
501            polling: false,
502        };
503
504        let json = serde_json::to_string(&stats).unwrap();
505        assert!(json.contains("Connection timeout"));
506    }
507
508    #[test]
509    fn test_device_collection_stats_clone() {
510        let original = DeviceCollectionStats {
511            device_id: "clone-test".to_string(),
512            alias: Some("Clone".to_string()),
513            poll_interval: 90,
514            last_poll_at: Some(time::OffsetDateTime::now_utc()),
515            last_error_at: None,
516            last_error: None,
517            success_count: 100,
518            failure_count: 2,
519            polling: true,
520        };
521
522        let cloned = original.clone();
523
524        assert_eq!(cloned.device_id, original.device_id);
525        assert_eq!(cloned.alias, original.alias);
526        assert_eq!(cloned.poll_interval, original.poll_interval);
527        assert_eq!(cloned.success_count, original.success_count);
528        assert_eq!(cloned.polling, original.polling);
529    }
530
531    #[test]
532    fn test_device_collection_stats_debug() {
533        let stats = DeviceCollectionStats {
534            device_id: "debug-test".to_string(),
535            alias: Some("Debug".to_string()),
536            poll_interval: 60,
537            last_poll_at: None,
538            last_error_at: None,
539            last_error: None,
540            success_count: 5,
541            failure_count: 1,
542            polling: false,
543        };
544
545        let debug = format!("{:?}", stats);
546        assert!(debug.contains("DeviceCollectionStats"));
547        assert!(debug.contains("debug-test"));
548        assert!(debug.contains("Debug"));
549    }
550
551    #[test]
552    fn test_reading_event_clone() {
553        let reading = create_test_reading("original", 750);
554        let event = ReadingEvent {
555            device_id: "original".to_string(),
556            reading,
557        };
558
559        let cloned = event.clone();
560        assert_eq!(cloned.device_id, event.device_id);
561        assert_eq!(cloned.reading.co2, event.reading.co2);
562    }
563
564    #[tokio::test]
565    async fn test_app_state_config_write() {
566        let store = Store::open_in_memory().unwrap();
567        let config = Config::default();
568        let state = AppState::new(store, config);
569
570        // Modify config
571        {
572            let mut config = state.config.write().await;
573            config.server.bind = "0.0.0.0:9090".to_string();
574        }
575
576        // Read and verify
577        let config = state.config.read().await;
578        assert_eq!(config.server.bind, "0.0.0.0:9090");
579    }
580
581    #[tokio::test]
582    async fn test_broadcast_channel_multiple_receivers() {
583        let store = Store::open_in_memory().unwrap();
584        let config = Config::default();
585        let state = AppState::new(store, config);
586
587        let mut rx1 = state.readings_tx.subscribe();
588        let mut rx2 = state.readings_tx.subscribe();
589
590        let reading = create_test_reading("multi", 888);
591        let event = ReadingEvent {
592            device_id: "multi".to_string(),
593            reading,
594        };
595
596        state.readings_tx.send(event).unwrap();
597
598        // Both receivers should get the message
599        let received1 = rx1.recv().await.unwrap();
600        let received2 = rx2.recv().await.unwrap();
601
602        assert_eq!(received1.reading.co2, 888);
603        assert_eq!(received2.reading.co2, 888);
604    }
605
606    #[tokio::test]
607    async fn test_app_state_store_operations() {
608        let store = Store::open_in_memory().unwrap();
609        let config = Config::default();
610        let state = AppState::new(store, config);
611
612        // Insert a device via store
613        {
614            let store = state.store.lock().await;
615            store.upsert_device("test-device", Some("Test")).unwrap();
616        }
617
618        // Query the device
619        {
620            let store = state.store.lock().await;
621            let device = store.get_device("test-device").unwrap().unwrap();
622            assert_eq!(device.name, Some("Test".to_string()));
623        }
624    }
625
626    #[test]
627    fn test_collector_state_reload_signal() {
628        let collector = CollectorState::new();
629        let rx = collector.subscribe_reload();
630
631        // Initial value
632        assert_eq!(*rx.borrow(), 0);
633
634        // Signal reload
635        collector.signal_reload();
636        assert_eq!(*rx.borrow(), 1);
637
638        // Signal reload again
639        collector.signal_reload();
640        assert_eq!(*rx.borrow(), 2);
641    }
642
643    #[tokio::test]
644    async fn test_collector_state_reload_with_receiver() {
645        let collector = CollectorState::new();
646        let mut rx = collector.subscribe_reload();
647
648        // Start a task that waits for reload
649        let handle = tokio::spawn(async move {
650            rx.changed().await.unwrap();
651            *rx.borrow()
652        });
653
654        // Give the task time to start waiting
655        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
656
657        // Signal reload
658        collector.signal_reload();
659
660        // Task should complete with the new value
661        let result = handle.await.unwrap();
662        assert_eq!(result, 1);
663    }
664}