1use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29use std::time::Duration;
30
31use aranet_store::Store;
32use time::OffsetDateTime;
33use tokio::sync::{Mutex, RwLock, broadcast, watch};
34use tokio::task::JoinSet;
35use tracing::warn;
36
37use crate::config::{Config, default_config_path};
38
39pub struct AppState {
41 pub store: Mutex<Store>,
43 pub config: RwLock<Config>,
45 pub config_path: PathBuf,
47 pub readings_tx: broadcast::Sender<ReadingEvent>,
49 pub collector: CollectorState,
51}
52
53impl AppState {
54 pub fn new(store: Store, config: Config) -> Arc<Self> {
59 Self::with_config_path(store, config, default_config_path())
60 }
61
62 pub fn with_config_path(store: Store, config: Config, config_path: PathBuf) -> Arc<Self> {
64 let buffer_size = config.server.broadcast_buffer;
65 let (readings_tx, _) = broadcast::channel(buffer_size);
66 Arc::new(Self {
67 store: Mutex::new(store),
68 config: RwLock::new(config),
69 config_path,
70 readings_tx,
71 collector: CollectorState::new(),
72 })
73 }
74
75 pub async fn save_config(&self) -> Result<(), crate::config::ConfigError> {
79 let config = self.config.read().await;
80 config.save(&self.config_path)
81 }
82
83 pub async fn save_config_or_log(&self) {
87 if let Err(e) = self.save_config().await {
88 warn!("Failed to save configuration: {}", e);
89 }
90 }
91
92 pub async fn on_devices_changed(&self) {
97 self.save_config_or_log().await;
98 if self.collector.is_running() {
99 self.collector.signal_reload();
100 }
101 }
102}
103
104pub struct CollectorState {
106 running: AtomicBool,
108 started_at: AtomicU64,
110 stop_tx: watch::Sender<bool>,
112 stop_rx: watch::Receiver<bool>,
114 reload_tx: watch::Sender<u64>,
116 reload_rx: watch::Receiver<u64>,
118 pub device_stats: RwLock<Vec<DeviceCollectionStats>>,
120 pub device_tasks: Mutex<JoinSet<()>>,
125}
126
127impl CollectorState {
128 pub fn new() -> Self {
130 let (stop_tx, stop_rx) = watch::channel(false);
131 let (reload_tx, reload_rx) = watch::channel(0u64);
132 Self {
133 running: AtomicBool::new(false),
134 started_at: AtomicU64::new(0),
135 stop_tx,
136 stop_rx,
137 reload_tx,
138 reload_rx,
139 device_stats: RwLock::new(Vec::new()),
140 device_tasks: Mutex::new(JoinSet::new()),
141 }
142 }
143
144 pub fn is_running(&self) -> bool {
146 self.running.load(Ordering::SeqCst)
147 }
148
149 pub fn set_running(&self, running: bool) {
151 self.running.store(running, Ordering::SeqCst);
152 if running {
153 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
154 self.started_at.store(now, Ordering::SeqCst);
155 }
156 }
157
158 pub fn started_at(&self) -> Option<OffsetDateTime> {
160 let ts = self.started_at.load(Ordering::SeqCst);
161 if ts == 0 {
162 None
163 } else {
164 OffsetDateTime::from_unix_timestamp(ts as i64).ok()
165 }
166 }
167
168 pub fn subscribe_stop(&self) -> watch::Receiver<bool> {
170 self.stop_rx.clone()
171 }
172
173 pub fn signal_stop(&self) {
175 let _ = self.stop_tx.send(true);
176 self.running.store(false, Ordering::SeqCst);
177 }
178
179 pub fn reset_stop(&self) {
181 let _ = self.stop_tx.send(false);
182 }
183
184 pub fn subscribe_reload(&self) -> watch::Receiver<u64> {
186 self.reload_rx.clone()
187 }
188
189 pub fn signal_reload(&self) {
194 let current = *self.reload_rx.borrow();
196 let _ = self.reload_tx.send(current.wrapping_add(1));
197 }
198
199 pub async fn wait_for_device_tasks(&self, timeout: Duration) -> bool {
204 let wait_result = tokio::time::timeout(timeout, async {
205 let mut tasks = self.device_tasks.lock().await;
206 while tasks.join_next().await.is_some() {}
207 })
208 .await;
209
210 if wait_result.is_err() {
211 let mut tasks = self.device_tasks.lock().await;
213 tasks.abort_all();
214 false
215 } else {
216 true
217 }
218 }
219
220 pub async fn spawn_device_task<F>(&self, future: F)
222 where
223 F: std::future::Future<Output = ()> + Send + 'static,
224 {
225 let mut tasks = self.device_tasks.lock().await;
226 tasks.spawn(future);
227 }
228}
229
230impl Default for CollectorState {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236#[derive(Debug, Clone, serde::Serialize)]
238pub struct DeviceCollectionStats {
239 pub device_id: String,
241 pub alias: Option<String>,
243 pub poll_interval: u64,
245 #[serde(with = "time::serde::rfc3339::option")]
247 pub last_poll_at: Option<OffsetDateTime>,
248 #[serde(with = "time::serde::rfc3339::option")]
250 pub last_error_at: Option<OffsetDateTime>,
251 pub last_error: Option<String>,
253 pub success_count: u64,
255 pub failure_count: u64,
257 pub polling: bool,
259}
260
261#[derive(Debug, Clone, serde::Serialize)]
263pub struct ReadingEvent {
264 pub device_id: String,
266 pub reading: aranet_store::StoredReading,
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::config::Config;
274 use aranet_types::Status;
275
276 fn create_test_reading(device_id: &str, co2: u16) -> aranet_store::StoredReading {
277 aranet_store::StoredReading {
278 id: 1,
279 device_id: device_id.to_string(),
280 co2,
281 temperature: 22.5,
282 humidity: 45,
283 pressure: 1013.0,
284 battery: 85,
285 status: Status::Green,
286 radon: None,
287 radiation_rate: None,
288 radiation_total: None,
289 captured_at: time::OffsetDateTime::now_utc(),
290 }
291 }
292
293 #[tokio::test]
294 async fn test_app_state_new() {
295 let store = Store::open_in_memory().unwrap();
296 let config = Config::default();
297 let state = AppState::new(store, config);
298
299 let config = state.config.read().await;
300 assert_eq!(config.server.bind, "127.0.0.1:8080");
301 }
302
303 #[test]
304 fn test_collector_state() {
305 let collector = CollectorState::new();
306 assert!(!collector.is_running());
307 assert!(collector.started_at().is_none());
308
309 collector.set_running(true);
310 assert!(collector.is_running());
311 assert!(collector.started_at().is_some());
312
313 collector.signal_stop();
314 assert!(!collector.is_running());
315 }
316
317 #[tokio::test]
318 async fn test_app_state_store_access() {
319 let store = Store::open_in_memory().unwrap();
320 let config = Config::default();
321 let state = AppState::new(store, config);
322
323 let store = state.store.lock().await;
324 let devices = store.list_devices().unwrap();
325 assert!(devices.is_empty());
326 }
327
328 #[tokio::test]
329 async fn test_app_state_broadcast_channel() {
330 let store = Store::open_in_memory().unwrap();
331 let config = Config::default();
332 let state = AppState::new(store, config);
333
334 let mut rx = state.readings_tx.subscribe();
335
336 let reading = create_test_reading("test", 450);
337
338 let event = ReadingEvent {
339 device_id: "test".to_string(),
340 reading: reading.clone(),
341 };
342
343 state.readings_tx.send(event.clone()).unwrap();
345
346 let received = rx.recv().await.unwrap();
348 assert_eq!(received.device_id, "test");
349 assert_eq!(received.reading.co2, 450);
350 }
351
352 #[test]
353 fn test_reading_event_serialization() {
354 let reading = create_test_reading("AA:BB:CC:DD:EE:FF", 500);
355
356 let event = ReadingEvent {
357 device_id: "AA:BB:CC:DD:EE:FF".to_string(),
358 reading,
359 };
360
361 let json = serde_json::to_string(&event).unwrap();
362 assert!(json.contains("AA:BB:CC:DD:EE:FF"));
363 assert!(json.contains("500"));
364 }
365
366 #[test]
367 fn test_reading_event_debug() {
368 let reading = create_test_reading("test", 400);
369
370 let event = ReadingEvent {
371 device_id: "test".to_string(),
372 reading,
373 };
374
375 let debug = format!("{:?}", event);
376 assert!(debug.contains("ReadingEvent"));
377 assert!(debug.contains("test"));
378 }
379
380 #[test]
381 fn test_collector_state_default() {
382 let collector = CollectorState::default();
383 assert!(!collector.is_running());
384 assert!(collector.started_at().is_none());
385 }
386
387 #[test]
388 fn test_collector_state_subscribe_stop() {
389 let collector = CollectorState::new();
390
391 let rx1 = collector.subscribe_stop();
393 let rx2 = collector.subscribe_stop();
394
395 assert!(!*rx1.borrow());
397 assert!(!*rx2.borrow());
398 }
399
400 #[test]
401 fn test_collector_state_stop_and_reset() {
402 let collector = CollectorState::new();
403 let rx = collector.subscribe_stop();
404
405 assert!(!*rx.borrow());
407
408 collector.signal_stop();
410 assert!(*rx.borrow());
411
412 collector.reset_stop();
414 assert!(!*rx.borrow());
415 }
416
417 #[test]
418 fn test_collector_state_running_toggle() {
419 let collector = CollectorState::new();
420
421 assert!(!collector.is_running());
422 assert!(collector.started_at().is_none());
423
424 collector.set_running(true);
425 assert!(collector.is_running());
426 let started = collector.started_at();
427 assert!(started.is_some());
428
429 std::thread::sleep(std::time::Duration::from_secs(1));
431 collector.set_running(true);
432 let started2 = collector.started_at();
433 assert!(started2 >= started);
434
435 collector.set_running(false);
436 assert!(!collector.is_running());
437 }
439
440 #[tokio::test]
441 async fn test_collector_state_device_stats_rw_lock() {
442 let collector = CollectorState::new();
443
444 {
446 let mut stats = collector.device_stats.write().await;
447 stats.push(DeviceCollectionStats {
448 device_id: "test-1".to_string(),
449 alias: Some("Test 1".to_string()),
450 poll_interval: 60,
451 last_poll_at: None,
452 last_error_at: None,
453 last_error: None,
454 success_count: 0,
455 failure_count: 0,
456 polling: false,
457 });
458 }
459
460 let stats = collector.device_stats.read().await;
462 assert_eq!(stats.len(), 1);
463 assert_eq!(stats[0].device_id, "test-1");
464 }
465
466 #[test]
467 fn test_device_collection_stats_serialization() {
468 let stats = DeviceCollectionStats {
469 device_id: "AA:BB:CC:DD:EE:FF".to_string(),
470 alias: Some("Kitchen Sensor".to_string()),
471 poll_interval: 120,
472 last_poll_at: Some(time::OffsetDateTime::now_utc()),
473 last_error_at: None,
474 last_error: None,
475 success_count: 42,
476 failure_count: 3,
477 polling: true,
478 };
479
480 let json = serde_json::to_string(&stats).unwrap();
481
482 assert!(json.contains("AA:BB:CC:DD:EE:FF"));
483 assert!(json.contains("Kitchen Sensor"));
484 assert!(json.contains("120"));
485 assert!(json.contains("42"));
486 assert!(json.contains("3"));
487 assert!(json.contains("true"));
488 }
489
490 #[test]
491 fn test_device_collection_stats_with_error() {
492 let stats = DeviceCollectionStats {
493 device_id: "test".to_string(),
494 alias: None,
495 poll_interval: 60,
496 last_poll_at: None,
497 last_error_at: Some(time::OffsetDateTime::now_utc()),
498 last_error: Some("Connection timeout".to_string()),
499 success_count: 10,
500 failure_count: 5,
501 polling: false,
502 };
503
504 let json = serde_json::to_string(&stats).unwrap();
505 assert!(json.contains("Connection timeout"));
506 }
507
508 #[test]
509 fn test_device_collection_stats_clone() {
510 let original = DeviceCollectionStats {
511 device_id: "clone-test".to_string(),
512 alias: Some("Clone".to_string()),
513 poll_interval: 90,
514 last_poll_at: Some(time::OffsetDateTime::now_utc()),
515 last_error_at: None,
516 last_error: None,
517 success_count: 100,
518 failure_count: 2,
519 polling: true,
520 };
521
522 let cloned = original.clone();
523
524 assert_eq!(cloned.device_id, original.device_id);
525 assert_eq!(cloned.alias, original.alias);
526 assert_eq!(cloned.poll_interval, original.poll_interval);
527 assert_eq!(cloned.success_count, original.success_count);
528 assert_eq!(cloned.polling, original.polling);
529 }
530
531 #[test]
532 fn test_device_collection_stats_debug() {
533 let stats = DeviceCollectionStats {
534 device_id: "debug-test".to_string(),
535 alias: Some("Debug".to_string()),
536 poll_interval: 60,
537 last_poll_at: None,
538 last_error_at: None,
539 last_error: None,
540 success_count: 5,
541 failure_count: 1,
542 polling: false,
543 };
544
545 let debug = format!("{:?}", stats);
546 assert!(debug.contains("DeviceCollectionStats"));
547 assert!(debug.contains("debug-test"));
548 assert!(debug.contains("Debug"));
549 }
550
551 #[test]
552 fn test_reading_event_clone() {
553 let reading = create_test_reading("original", 750);
554 let event = ReadingEvent {
555 device_id: "original".to_string(),
556 reading,
557 };
558
559 let cloned = event.clone();
560 assert_eq!(cloned.device_id, event.device_id);
561 assert_eq!(cloned.reading.co2, event.reading.co2);
562 }
563
564 #[tokio::test]
565 async fn test_app_state_config_write() {
566 let store = Store::open_in_memory().unwrap();
567 let config = Config::default();
568 let state = AppState::new(store, config);
569
570 {
572 let mut config = state.config.write().await;
573 config.server.bind = "0.0.0.0:9090".to_string();
574 }
575
576 let config = state.config.read().await;
578 assert_eq!(config.server.bind, "0.0.0.0:9090");
579 }
580
581 #[tokio::test]
582 async fn test_broadcast_channel_multiple_receivers() {
583 let store = Store::open_in_memory().unwrap();
584 let config = Config::default();
585 let state = AppState::new(store, config);
586
587 let mut rx1 = state.readings_tx.subscribe();
588 let mut rx2 = state.readings_tx.subscribe();
589
590 let reading = create_test_reading("multi", 888);
591 let event = ReadingEvent {
592 device_id: "multi".to_string(),
593 reading,
594 };
595
596 state.readings_tx.send(event).unwrap();
597
598 let received1 = rx1.recv().await.unwrap();
600 let received2 = rx2.recv().await.unwrap();
601
602 assert_eq!(received1.reading.co2, 888);
603 assert_eq!(received2.reading.co2, 888);
604 }
605
606 #[tokio::test]
607 async fn test_app_state_store_operations() {
608 let store = Store::open_in_memory().unwrap();
609 let config = Config::default();
610 let state = AppState::new(store, config);
611
612 {
614 let store = state.store.lock().await;
615 store.upsert_device("test-device", Some("Test")).unwrap();
616 }
617
618 {
620 let store = state.store.lock().await;
621 let device = store.get_device("test-device").unwrap().unwrap();
622 assert_eq!(device.name, Some("Test".to_string()));
623 }
624 }
625
626 #[test]
627 fn test_collector_state_reload_signal() {
628 let collector = CollectorState::new();
629 let rx = collector.subscribe_reload();
630
631 assert_eq!(*rx.borrow(), 0);
633
634 collector.signal_reload();
636 assert_eq!(*rx.borrow(), 1);
637
638 collector.signal_reload();
640 assert_eq!(*rx.borrow(), 2);
641 }
642
643 #[tokio::test]
644 async fn test_collector_state_reload_with_receiver() {
645 let collector = CollectorState::new();
646 let mut rx = collector.subscribe_reload();
647
648 let handle = tokio::spawn(async move {
650 rx.changed().await.unwrap();
651 *rx.borrow()
652 });
653
654 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
656
657 collector.signal_reload();
659
660 let result = handle.await.unwrap();
662 assert_eq!(result, 1);
663 }
664}