use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use aranet_store::Store;
use time::OffsetDateTime;
use tokio::sync::{Mutex, RwLock, Semaphore, broadcast, watch};
use tokio::task::{JoinHandle, JoinSet};
use crate::config::{Config, default_config_path};
pub struct AppState {
pub store: Mutex<Store>,
store_path: Option<PathBuf>,
pub config: RwLock<Config>,
pub config_path: PathBuf,
pub readings_tx: broadcast::Sender<ReadingEvent>,
pub ble_semaphore: Semaphore,
pub collector: CollectorState,
pub ws_messages_dropped: AtomicU64,
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
}
impl AppState {
pub fn new(store: Store, config: Config) -> Arc<Self> {
Self::with_config_path(store, config, default_config_path())
}
pub fn with_config_path(store: Store, config: Config, config_path: PathBuf) -> Arc<Self> {
let buffer_size = config.server.broadcast_buffer;
let (readings_tx, _) = broadcast::channel(buffer_size);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let store_path = store.database_path().map(PathBuf::from);
Arc::new(Self {
store: Mutex::new(store),
store_path,
config: RwLock::new(config),
config_path,
readings_tx,
ble_semaphore: Semaphore::new(1),
collector: CollectorState::new(),
ws_messages_dropped: AtomicU64::new(0),
shutdown_tx,
shutdown_rx,
})
}
pub async fn save_config(&self) -> Result<(), crate::config::ConfigError> {
let config = self.config.read().await;
config.save(&self.config_path)
}
pub async fn with_store_read<T, F>(&self, f: F) -> aranet_store::Result<T>
where
F: FnOnce(&Store) -> aranet_store::Result<T>,
{
if let Some(path) = &self.store_path {
let store = Store::open(path)?;
f(&store)
} else {
let store = self.store.lock().await;
f(&store)
}
}
pub async fn with_store_write<T, F>(&self, f: F) -> aranet_store::Result<T>
where
F: FnOnce(&Store) -> aranet_store::Result<T>,
{
let store = self.store.lock().await;
f(&store)
}
pub async fn on_devices_changed(&self) {
self.collector.signal_reload();
}
pub fn subscribe_shutdown(&self) -> watch::Receiver<bool> {
self.shutdown_rx.clone()
}
pub fn signal_shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}
}
pub struct CollectorState {
running: AtomicBool,
started_at: AtomicU64,
stop_tx: watch::Sender<bool>,
stop_rx: watch::Receiver<bool>,
reload_tx: watch::Sender<u64>,
reload_rx: watch::Receiver<u64>,
pub device_stats: RwLock<Vec<DeviceCollectionStats>>,
pub device_tasks: Mutex<JoinSet<()>>,
pub reload_watcher: Mutex<Option<JoinHandle<()>>>,
}
impl CollectorState {
pub fn new() -> Self {
let (stop_tx, stop_rx) = watch::channel(false);
let (reload_tx, reload_rx) = watch::channel(0u64);
Self {
running: AtomicBool::new(false),
started_at: AtomicU64::new(0),
stop_tx,
stop_rx,
reload_tx,
reload_rx,
device_stats: RwLock::new(Vec::new()),
device_tasks: Mutex::new(JoinSet::new()),
reload_watcher: Mutex::new(None),
}
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn try_start(&self) -> bool {
if self
.running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
self.started_at.store(now, Ordering::SeqCst);
true
} else {
false
}
}
pub fn set_running(&self, running: bool) {
self.running.store(running, Ordering::SeqCst);
if running {
let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
self.started_at.store(now, Ordering::SeqCst);
} else {
self.started_at.store(0, Ordering::SeqCst);
}
}
pub fn started_at(&self) -> Option<OffsetDateTime> {
let ts = self.started_at.load(Ordering::SeqCst);
if ts == 0 {
None
} else {
OffsetDateTime::from_unix_timestamp(ts as i64).ok()
}
}
pub fn subscribe_stop(&self) -> watch::Receiver<bool> {
self.stop_rx.clone()
}
pub fn signal_stop(&self) {
let _ = self.stop_tx.send(true);
self.running.store(false, Ordering::SeqCst);
self.started_at.store(0, Ordering::SeqCst);
}
pub fn reset_stop(&self) {
let _ = self.stop_tx.send(false);
}
pub fn subscribe_reload(&self) -> watch::Receiver<u64> {
self.reload_rx.clone()
}
pub fn signal_reload(&self) {
let current = *self.reload_rx.borrow();
let _ = self.reload_tx.send(current.wrapping_add(1));
}
pub async fn wait_for_device_tasks(&self, timeout: Duration) -> bool {
let wait_result = tokio::time::timeout(timeout, async {
let mut tasks = self.device_tasks.lock().await;
while tasks.join_next().await.is_some() {}
})
.await;
if wait_result.is_err() {
let mut tasks = self.device_tasks.lock().await;
tasks.abort_all();
false
} else {
true
}
}
pub async fn spawn_device_task<F>(&self, future: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
let mut tasks = self.device_tasks.lock().await;
tasks.spawn(future);
}
pub async fn set_reload_watcher(&self, handle: JoinHandle<()>) {
let mut watcher = self.reload_watcher.lock().await;
if let Some(existing) = watcher.replace(handle) {
existing.abort();
}
}
pub async fn wait_for_reload_watcher(&self, timeout: Duration) -> bool {
let mut handle = {
let mut watcher = self.reload_watcher.lock().await;
watcher.take()
};
let Some(handle) = handle.as_mut() else {
return true;
};
match tokio::time::timeout(timeout, &mut *handle).await {
Ok(_) => true,
Err(_) => {
handle.abort();
false
}
}
}
}
impl Default for CollectorState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct DeviceCollectionStats {
pub device_id: String,
pub alias: Option<String>,
pub poll_interval: u64,
#[serde(with = "time::serde::rfc3339::option")]
pub last_poll_at: Option<OffsetDateTime>,
#[serde(with = "time::serde::rfc3339::option")]
pub last_error_at: Option<OffsetDateTime>,
pub last_error: Option<String>,
pub last_poll_duration_ms: Option<u64>,
pub success_count: u64,
pub failure_count: u64,
pub polling: bool,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ReadingEvent {
pub device_id: String,
pub reading: aranet_store::StoredReading,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
use aranet_types::Status;
fn create_test_reading(device_id: &str, co2: u16) -> aranet_store::StoredReading {
aranet_store::StoredReading {
id: 1,
device_id: device_id.to_string(),
co2,
temperature: 22.5,
humidity: 45,
pressure: 1013.0,
battery: 85,
status: Status::Green,
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
captured_at: time::OffsetDateTime::now_utc(),
}
}
#[tokio::test]
async fn test_app_state_new() {
let store = Store::open_in_memory().unwrap();
let config = Config::default();
let state = AppState::new(store, config);
let config = state.config.read().await;
assert_eq!(config.server.bind, "127.0.0.1:8080");
}
#[test]
fn test_collector_state() {
let collector = CollectorState::new();
assert!(!collector.is_running());
assert!(collector.started_at().is_none());
collector.set_running(true);
assert!(collector.is_running());
assert!(collector.started_at().is_some());
collector.signal_stop();
assert!(!collector.is_running());
}
#[tokio::test]
async fn test_app_state_store_access() {
let store = Store::open_in_memory().unwrap();
let config = Config::default();
let state = AppState::new(store, config);
let devices = state
.with_store_read(|store| store.list_devices())
.await
.unwrap();
assert!(devices.is_empty());
}
#[tokio::test]
async fn test_app_state_broadcast_channel() {
let store = Store::open_in_memory().unwrap();
let config = Config::default();
let state = AppState::new(store, config);
let mut rx = state.readings_tx.subscribe();
let reading = create_test_reading("test", 450);
let event = ReadingEvent {
device_id: "test".to_string(),
reading: reading.clone(),
};
state.readings_tx.send(event.clone()).unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.device_id, "test");
assert_eq!(received.reading.co2, 450);
}
#[test]
fn test_reading_event_serialization() {
let reading = create_test_reading("AA:BB:CC:DD:EE:FF", 500);
let event = ReadingEvent {
device_id: "AA:BB:CC:DD:EE:FF".to_string(),
reading,
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("AA:BB:CC:DD:EE:FF"));
assert!(json.contains("500"));
}
#[test]
fn test_reading_event_debug() {
let reading = create_test_reading("test", 400);
let event = ReadingEvent {
device_id: "test".to_string(),
reading,
};
let debug = format!("{:?}", event);
assert!(debug.contains("ReadingEvent"));
assert!(debug.contains("test"));
}
#[test]
fn test_collector_state_default() {
let collector = CollectorState::default();
assert!(!collector.is_running());
assert!(collector.started_at().is_none());
}
#[test]
fn test_collector_state_subscribe_stop() {
let collector = CollectorState::new();
let rx1 = collector.subscribe_stop();
let rx2 = collector.subscribe_stop();
assert!(!*rx1.borrow());
assert!(!*rx2.borrow());
}
#[test]
fn test_collector_state_stop_and_reset() {
let collector = CollectorState::new();
let rx = collector.subscribe_stop();
assert!(!*rx.borrow());
collector.signal_stop();
assert!(*rx.borrow());
collector.reset_stop();
assert!(!*rx.borrow());
}
#[test]
fn test_collector_state_running_toggle() {
let collector = CollectorState::new();
assert!(!collector.is_running());
assert!(collector.started_at().is_none());
collector.set_running(true);
assert!(collector.is_running());
let started = collector.started_at();
assert!(started.is_some());
std::thread::sleep(std::time::Duration::from_secs(1));
collector.set_running(true);
let started2 = collector.started_at();
assert!(started2 >= started);
collector.set_running(false);
assert!(!collector.is_running());
assert!(collector.started_at().is_none());
}
#[tokio::test]
async fn test_collector_state_device_stats_rw_lock() {
let collector = CollectorState::new();
{
let mut stats = collector.device_stats.write().await;
stats.push(DeviceCollectionStats {
device_id: "test-1".to_string(),
alias: Some("Test 1".to_string()),
poll_interval: 60,
last_poll_at: None,
last_error_at: None,
last_error: None,
last_poll_duration_ms: None,
success_count: 0,
failure_count: 0,
polling: false,
});
}
let stats = collector.device_stats.read().await;
assert_eq!(stats.len(), 1);
assert_eq!(stats[0].device_id, "test-1");
}
#[test]
fn test_device_collection_stats_serialization() {
let stats = DeviceCollectionStats {
device_id: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("Kitchen Sensor".to_string()),
poll_interval: 120,
last_poll_at: Some(time::OffsetDateTime::now_utc()),
last_error_at: None,
last_error: None,
last_poll_duration_ms: None,
success_count: 42,
failure_count: 3,
polling: true,
};
let json = serde_json::to_string(&stats).unwrap();
assert!(json.contains("AA:BB:CC:DD:EE:FF"));
assert!(json.contains("Kitchen Sensor"));
assert!(json.contains("120"));
assert!(json.contains("42"));
assert!(json.contains("3"));
assert!(json.contains("true"));
}
#[test]
fn test_device_collection_stats_with_error() {
let stats = DeviceCollectionStats {
device_id: "test".to_string(),
alias: None,
poll_interval: 60,
last_poll_at: None,
last_error_at: Some(time::OffsetDateTime::now_utc()),
last_error: Some("Connection timeout".to_string()),
last_poll_duration_ms: None,
success_count: 10,
failure_count: 5,
polling: false,
};
let json = serde_json::to_string(&stats).unwrap();
assert!(json.contains("Connection timeout"));
}
#[test]
fn test_device_collection_stats_clone() {
let original = DeviceCollectionStats {
device_id: "clone-test".to_string(),
alias: Some("Clone".to_string()),
poll_interval: 90,
last_poll_at: Some(time::OffsetDateTime::now_utc()),
last_error_at: None,
last_error: None,
last_poll_duration_ms: None,
success_count: 100,
failure_count: 2,
polling: true,
};
let cloned = original.clone();
assert_eq!(cloned.device_id, original.device_id);
assert_eq!(cloned.alias, original.alias);
assert_eq!(cloned.poll_interval, original.poll_interval);
assert_eq!(cloned.success_count, original.success_count);
assert_eq!(cloned.polling, original.polling);
}
#[test]
fn test_device_collection_stats_debug() {
let stats = DeviceCollectionStats {
device_id: "debug-test".to_string(),
alias: Some("Debug".to_string()),
poll_interval: 60,
last_poll_at: None,
last_error_at: None,
last_error: None,
last_poll_duration_ms: None,
success_count: 5,
failure_count: 1,
polling: false,
};
let debug = format!("{:?}", stats);
assert!(debug.contains("DeviceCollectionStats"));
assert!(debug.contains("debug-test"));
assert!(debug.contains("Debug"));
}
#[test]
fn test_reading_event_clone() {
let reading = create_test_reading("original", 750);
let event = ReadingEvent {
device_id: "original".to_string(),
reading,
};
let cloned = event.clone();
assert_eq!(cloned.device_id, event.device_id);
assert_eq!(cloned.reading.co2, event.reading.co2);
}
#[tokio::test]
async fn test_app_state_config_write() {
let store = Store::open_in_memory().unwrap();
let config = Config::default();
let state = AppState::new(store, config);
{
let mut config = state.config.write().await;
config.server.bind = "0.0.0.0:9090".to_string();
}
let config = state.config.read().await;
assert_eq!(config.server.bind, "0.0.0.0:9090");
}
#[tokio::test]
async fn test_broadcast_channel_multiple_receivers() {
let store = Store::open_in_memory().unwrap();
let config = Config::default();
let state = AppState::new(store, config);
let mut rx1 = state.readings_tx.subscribe();
let mut rx2 = state.readings_tx.subscribe();
let reading = create_test_reading("multi", 888);
let event = ReadingEvent {
device_id: "multi".to_string(),
reading,
};
state.readings_tx.send(event).unwrap();
let received1 = rx1.recv().await.unwrap();
let received2 = rx2.recv().await.unwrap();
assert_eq!(received1.reading.co2, 888);
assert_eq!(received2.reading.co2, 888);
}
#[tokio::test]
async fn test_app_state_store_operations() {
let store = Store::open_in_memory().unwrap();
let config = Config::default();
let state = AppState::new(store, config);
state
.with_store_write(|store| store.upsert_device("test-device", Some("Test")).map(|_| ()))
.await
.unwrap();
let device = state
.with_store_read(|store| store.get_device("test-device"))
.await
.unwrap()
.unwrap();
assert_eq!(device.name, Some("Test".to_string()));
}
#[test]
fn test_collector_state_reload_signal() {
let collector = CollectorState::new();
let rx = collector.subscribe_reload();
assert_eq!(*rx.borrow(), 0);
collector.signal_reload();
assert_eq!(*rx.borrow(), 1);
collector.signal_reload();
assert_eq!(*rx.borrow(), 2);
}
#[tokio::test]
async fn test_collector_state_reload_with_receiver() {
let collector = CollectorState::new();
let mut rx = collector.subscribe_reload();
let handle = tokio::spawn(async move {
rx.changed().await.unwrap();
*rx.borrow()
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
collector.signal_reload();
let result = handle.await.unwrap();
assert_eq!(result, 1);
}
}