use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use aranet_core::Device;
use aranet_store::StoredReading;
use crate::config::DeviceConfig;
use crate::state::{AppState, DeviceCollectionStats, ReadingEvent};
pub struct Collector {
state: Arc<AppState>,
tasks: JoinSet<()>,
}
impl Collector {
pub fn new(state: Arc<AppState>) -> Self {
Self {
state,
tasks: JoinSet::new(),
}
}
pub async fn start(&mut self) {
self.state.collector.reset_stop();
let config = self.state.config.read().await;
let devices = config.devices.clone();
drop(config);
if devices.is_empty() {
info!("No devices configured for collection");
return;
}
info!("Starting collector for {} device(s)", devices.len());
{
let mut stats = self.state.collector.device_stats.write().await;
stats.clear();
for device in &devices {
stats.push(DeviceCollectionStats {
device_id: device.address.clone(),
alias: device.alias.clone(),
poll_interval: device.poll_interval,
last_poll_at: None,
last_error_at: None,
last_error: None,
success_count: 0,
failure_count: 0,
polling: false,
});
}
}
self.state.collector.set_running(true);
for device_config in devices {
let state = Arc::clone(&self.state);
let stop_rx = self.state.collector.subscribe_stop();
self.state
.collector
.spawn_device_task(async move {
collect_device(state, device_config, stop_rx).await;
})
.await;
}
let state = Arc::clone(&self.state);
self.tasks.spawn(async move {
watch_for_reload(state).await;
});
}
pub async fn stop(&mut self) {
info!("Stopping collector");
self.state.collector.signal_stop();
let stopped_cleanly = self
.state
.collector
.wait_for_device_tasks(Duration::from_secs(10))
.await;
if !stopped_cleanly {
warn!("Device tasks did not stop within timeout, aborted");
}
let timeout = tokio::time::timeout(Duration::from_secs(2), async {
while self.tasks.join_next().await.is_some() {}
});
if timeout.await.is_err() {
warn!("Reload watcher did not stop within timeout, aborting");
self.tasks.abort_all();
}
}
pub fn is_running(&self) -> bool {
self.state.collector.is_running()
}
pub fn task_count(&self) -> usize {
self.tasks.len()
}
}
async fn watch_for_reload(state: Arc<AppState>) {
let mut reload_rx = state.collector.subscribe_reload();
let mut stop_rx = state.collector.subscribe_stop();
loop {
tokio::select! {
result = reload_rx.changed() => {
if result.is_err() {
break;
}
info!("Configuration reload requested, restarting device tasks");
state.collector.signal_stop();
tokio::time::sleep(Duration::from_millis(100)).await;
state.collector.reset_stop();
let config = state.config.read().await;
let devices = config.devices.clone();
drop(config);
{
let mut stats = state.collector.device_stats.write().await;
stats.clear();
for device in &devices {
stats.push(DeviceCollectionStats {
device_id: device.address.clone(),
alias: device.alias.clone(),
poll_interval: device.poll_interval,
last_poll_at: None,
last_error_at: None,
last_error: None,
success_count: 0,
failure_count: 0,
polling: false,
});
}
}
if devices.is_empty() {
info!("No devices configured after reload");
continue;
}
info!("Restarting collector for {} device(s)", devices.len());
for device_config in devices {
let state_clone = Arc::clone(&state);
let stop_rx = state.collector.subscribe_stop();
state
.collector
.spawn_device_task(async move {
collect_device(state_clone, device_config, stop_rx).await;
})
.await;
}
}
_ = stop_rx.changed() => {
if *stop_rx.borrow() {
info!("Reload watcher received stop signal");
break;
}
}
}
}
}
async fn collect_device(
state: Arc<AppState>,
config: DeviceConfig,
mut stop_rx: watch::Receiver<bool>,
) {
let device_id = config.address.clone();
let alias = config.alias.as_deref().unwrap_or(&device_id);
let poll_interval = Duration::from_secs(config.poll_interval);
info!(
"Starting collector for {} (alias: {}, interval: {}s)",
device_id, alias, config.poll_interval
);
let mut interval_timer = interval(poll_interval);
let mut consecutive_failures = 0u32;
loop {
tokio::select! {
_ = interval_timer.tick() => {
update_device_stat(&state, &device_id, |stat| {
stat.polling = true;
}).await;
match poll_device(&state, &device_id).await {
Ok(reading) => {
consecutive_failures = 0;
debug!("Collected reading from {}: CO2={}", device_id, reading.co2);
update_device_stat(&state, &device_id, |stat| {
stat.last_poll_at = Some(OffsetDateTime::now_utc());
stat.success_count += 1;
stat.polling = false;
}).await;
let event = ReadingEvent {
device_id: device_id.clone(),
reading,
};
if state.readings_tx.send(event).is_err() {
debug!("No active WebSocket subscribers for reading broadcast");
}
}
Err(e) => {
consecutive_failures += 1;
update_device_stat(&state, &device_id, |stat| {
stat.last_error_at = Some(OffsetDateTime::now_utc());
stat.last_error = Some(e.to_string());
stat.failure_count += 1;
stat.polling = false;
}).await;
if consecutive_failures <= 3 {
warn!(
"Failed to poll {}: {} (attempt {})",
device_id, e, consecutive_failures
);
} else if consecutive_failures == 4 {
error!(
"Failed to poll {} after {} attempts, reducing log frequency",
device_id, consecutive_failures
);
} else if consecutive_failures.is_multiple_of(100) {
error!(
"Failed to poll {} ({} consecutive failures): {}",
device_id, consecutive_failures, e
);
}
}
}
}
_ = stop_rx.changed() => {
if *stop_rx.borrow() {
info!("Collector for {} received stop signal", device_id);
break;
}
}
}
}
info!("Collector for {} stopped", device_id);
}
async fn update_device_stat<F>(state: &AppState, device_id: &str, update_fn: F)
where
F: FnOnce(&mut DeviceCollectionStats),
{
let mut stats = state.collector.device_stats.write().await;
if let Some(stat) = stats.iter_mut().find(|s| s.device_id == device_id) {
update_fn(stat);
}
}
async fn poll_device(state: &AppState, device_id: &str) -> Result<StoredReading, CollectorError> {
let device = Device::connect(device_id)
.await
.map_err(CollectorError::Connect)?;
let reading = device.read_current().await.map_err(CollectorError::Read)?;
let _ = device.disconnect().await;
{
let store = state.store.lock().await;
store
.insert_reading(device_id, &reading)
.map_err(CollectorError::Store)?;
}
Ok(StoredReading::from_reading(device_id, &reading))
}
#[derive(Debug, thiserror::Error)]
pub enum CollectorError {
#[error("Failed to connect: {0}")]
Connect(aranet_core::Error),
#[error("Failed to read: {0}")]
Read(aranet_core::Error),
#[error("Failed to store: {0}")]
Store(aranet_store::Error),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
fn create_test_state() -> Arc<AppState> {
let store = aranet_store::Store::open_in_memory().unwrap();
let config = Config::default();
AppState::new(store, config)
}
#[test]
fn test_collector_new() {
let state = create_test_state();
let collector = Collector::new(Arc::clone(&state));
assert!(!collector.is_running());
}
#[test]
fn test_collector_is_running_initially_false() {
let state = create_test_state();
let collector = Collector::new(state);
assert!(!collector.is_running());
}
#[tokio::test]
async fn test_collector_start_no_devices() {
let state = create_test_state();
let mut collector = Collector::new(Arc::clone(&state));
collector.start().await;
let stats = state.collector.device_stats.read().await;
assert!(stats.is_empty());
}
#[tokio::test]
async fn test_collector_start_with_devices_initializes_stats() {
let state = create_test_state();
{
let mut config = state.config.write().await;
config.devices.push(crate::config::DeviceConfig {
address: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("Test Device".to_string()),
poll_interval: 60,
});
}
let mut collector = Collector::new(Arc::clone(&state));
collector.start().await;
tokio::time::sleep(Duration::from_millis(50)).await;
let stats = state.collector.device_stats.read().await;
assert_eq!(stats.len(), 1);
assert_eq!(stats[0].device_id, "AA:BB:CC:DD:EE:FF");
assert_eq!(stats[0].alias, Some("Test Device".to_string()));
assert_eq!(stats[0].poll_interval, 60);
}
#[tokio::test]
async fn test_collector_stop() {
let state = create_test_state();
state.collector.set_running(true);
let mut collector = Collector::new(Arc::clone(&state));
assert!(collector.is_running());
collector.stop().await;
assert!(!collector.is_running());
}
#[test]
fn test_collector_error_display_connect() {
let core_error = aranet_core::Error::NotConnected;
let error = CollectorError::Connect(core_error);
let display = format!("{}", error);
assert!(display.contains("Failed to connect"));
}
#[test]
fn test_collector_error_display_read() {
let core_error = aranet_core::Error::NotConnected;
let error = CollectorError::Read(core_error);
let display = format!("{}", error);
assert!(display.contains("Failed to read"));
}
#[test]
fn test_collector_error_display_store() {
let store_error = aranet_store::Error::DeviceNotFound("test".to_string());
let error = CollectorError::Store(store_error);
let display = format!("{}", error);
assert!(display.contains("Failed to store"));
}
#[test]
fn test_collector_error_debug() {
let core_error = aranet_core::Error::NotConnected;
let error = CollectorError::Connect(core_error);
let debug = format!("{:?}", error);
assert!(debug.contains("Connect"));
}
#[tokio::test]
async fn test_device_collection_stats_initialization() {
let stats = DeviceCollectionStats {
device_id: "test-device".to_string(),
alias: Some("Test Alias".to_string()),
poll_interval: 120,
last_poll_at: None,
last_error_at: None,
last_error: None,
success_count: 0,
failure_count: 0,
polling: false,
};
assert_eq!(stats.device_id, "test-device");
assert_eq!(stats.alias, Some("Test Alias".to_string()));
assert_eq!(stats.poll_interval, 120);
assert!(stats.last_poll_at.is_none());
assert_eq!(stats.success_count, 0);
assert_eq!(stats.failure_count, 0);
assert!(!stats.polling);
}
#[tokio::test]
async fn test_update_device_stat() {
let state = create_test_state();
{
let mut stats = state.collector.device_stats.write().await;
stats.push(DeviceCollectionStats {
device_id: "test-device".to_string(),
alias: None,
poll_interval: 60,
last_poll_at: None,
last_error_at: None,
last_error: None,
success_count: 0,
failure_count: 0,
polling: false,
});
}
update_device_stat(&state, "test-device", |stat| {
stat.success_count = 5;
stat.polling = true;
})
.await;
let stats = state.collector.device_stats.read().await;
assert_eq!(stats[0].success_count, 5);
assert!(stats[0].polling);
}
#[tokio::test]
async fn test_update_device_stat_nonexistent_device() {
let state = create_test_state();
{
let mut stats = state.collector.device_stats.write().await;
stats.push(DeviceCollectionStats {
device_id: "existing-device".to_string(),
alias: None,
poll_interval: 60,
last_poll_at: None,
last_error_at: None,
last_error: None,
success_count: 0,
failure_count: 0,
polling: false,
});
}
update_device_stat(&state, "nonexistent-device", |stat| {
stat.success_count = 10;
})
.await;
let stats = state.collector.device_stats.read().await;
assert_eq!(stats.len(), 1);
assert_eq!(stats[0].success_count, 0);
}
#[tokio::test]
async fn test_collector_multiple_devices() {
let state = create_test_state();
{
let mut config = state.config.write().await;
config.devices.push(crate::config::DeviceConfig {
address: "DEVICE-1".to_string(),
alias: Some("First".to_string()),
poll_interval: 30,
});
config.devices.push(crate::config::DeviceConfig {
address: "DEVICE-2".to_string(),
alias: Some("Second".to_string()),
poll_interval: 60,
});
config.devices.push(crate::config::DeviceConfig {
address: "DEVICE-3".to_string(),
alias: None,
poll_interval: 120,
});
}
let mut collector = Collector::new(Arc::clone(&state));
collector.start().await;
tokio::time::sleep(Duration::from_millis(50)).await;
let stats = state.collector.device_stats.read().await;
assert_eq!(stats.len(), 3);
let device1 = stats.iter().find(|s| s.device_id == "DEVICE-1").unwrap();
assert_eq!(device1.alias, Some("First".to_string()));
assert_eq!(device1.poll_interval, 30);
let device2 = stats.iter().find(|s| s.device_id == "DEVICE-2").unwrap();
assert_eq!(device2.alias, Some("Second".to_string()));
assert_eq!(device2.poll_interval, 60);
let device3 = stats.iter().find(|s| s.device_id == "DEVICE-3").unwrap();
assert!(device3.alias.is_none());
assert_eq!(device3.poll_interval, 120);
}
}