use std::collections::HashMap;
use std::sync::Arc;
use axum::{
Json, Router,
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post, put},
};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::collector::{Collector, CollectorStartResult};
use crate::config::DeviceConfig;
use crate::state::CollectorState;
use crate::state::{AppState, DeviceCollectionStats};
pub fn router() -> Router<Arc<AppState>> {
Router::new()
.route("/api/health", get(health))
.route("/api/health/detailed", get(health_detailed))
.route("/api/status", get(get_status))
.route("/metrics", get(prometheus_metrics))
.route("/api/collector/start", post(collector_start))
.route("/api/collector/stop", post(collector_stop))
.route("/api/config", get(get_config).put(update_config))
.route("/api/config/devices", post(add_device))
.route(
"/api/config/devices/{id}",
put(update_device).delete(remove_device),
)
.route("/api/devices", get(list_devices))
.route("/api/devices/current", get(list_current_readings))
.route("/api/devices/{id}", get(get_device))
.route("/api/devices/{id}/current", get(get_current_reading))
.route("/api/devices/{id}/readings", get(get_readings))
.route("/api/devices/{id}/history", get(get_history))
.route("/api/readings", get(get_all_readings))
}
#[derive(Debug, Serialize)]
pub struct HealthResponse {
pub status: &'static str,
pub version: &'static str,
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
}
async fn health() -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok",
version: env!("CARGO_PKG_VERSION"),
timestamp: OffsetDateTime::now_utc(),
})
}
#[derive(Debug, Serialize)]
pub struct DetailedHealthResponse {
pub status: &'static str,
pub version: &'static str,
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
pub database: DatabaseHealth,
pub collector: CollectorHealth,
pub platform: PlatformInfo,
}
#[derive(Debug, Serialize)]
pub struct DatabaseHealth {
pub ok: bool,
pub device_count: usize,
pub reading_count: Option<usize>,
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct CollectorHealth {
pub running: bool,
pub configured_devices: usize,
pub healthy_devices: usize,
pub failing_devices: usize,
}
#[derive(Debug, Serialize)]
pub struct PlatformInfo {
pub os: &'static str,
pub arch: &'static str,
}
async fn health_detailed(State(state): State<Arc<AppState>>) -> Json<DetailedHealthResponse> {
let database = match state.with_store_read(|store| store.list_devices()).await {
Ok(devices) => DatabaseHealth {
ok: true,
device_count: devices.len(),
reading_count: None, error: None,
},
Err(e) => DatabaseHealth {
ok: false,
device_count: 0,
reading_count: None,
error: Some(e.to_string()),
},
};
let collector = {
let config = state.config.read().await;
let configured_devices = config.devices.len();
drop(config);
let stats = state.collector.device_stats.read().await;
let healthy_devices = stats
.iter()
.filter(|s| {
s.last_poll_at.is_some_and(|t| {
let age = (OffsetDateTime::now_utc() - t).whole_seconds();
age < (s.poll_interval as i64 * 3)
})
})
.count();
let failing_devices = stats
.iter()
.filter(|s| s.failure_count > 0 && s.last_error.is_some())
.count();
CollectorHealth {
running: state.collector.is_running(),
configured_devices,
healthy_devices,
failing_devices,
}
};
let platform = PlatformInfo {
os: std::env::consts::OS,
arch: std::env::consts::ARCH,
};
let status = if database.ok && collector.running {
"ok"
} else if database.ok {
"degraded"
} else {
"unhealthy"
};
Json(DetailedHealthResponse {
status,
version: env!("CARGO_PKG_VERSION"),
timestamp: OffsetDateTime::now_utc(),
database,
collector,
platform,
})
}
const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
fn write_metric_family(
output: &mut String,
name: &str,
help: &str,
metric_type: &str,
values: &[String],
) {
if values.is_empty() {
return;
}
output.push_str(&format!("# HELP {} {}\n", name, help));
output.push_str(&format!("# TYPE {} {}\n", name, metric_type));
for value in values {
output.push_str(value);
output.push('\n');
}
output.push('\n');
}
async fn prometheus_metrics(
State(state): State<Arc<AppState>>,
) -> Result<
(
StatusCode,
[(axum::http::header::HeaderName, &'static str); 1],
String,
),
AppError,
> {
let config = state.config.read().await;
if !config.prometheus.enabled {
return Err(AppError::ServiceUnavailable(
"Prometheus metrics endpoint is disabled. Enable it in server.toml with [prometheus] enabled = true".to_string(),
));
}
drop(config);
let mut output = String::with_capacity(4096);
output.push_str("# Aranet sensor metrics\n");
output.push_str(&format!(
"# Generated at {}\n\n",
OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default()
));
build_collector_metrics(&mut output, &state.collector).await;
let device_readings = state
.with_store_read(|store| store.list_latest_readings())
.await?;
if !device_readings.is_empty() {
let config = state.config.read().await;
let alias_map: std::collections::HashMap<String, String> = config
.devices
.iter()
.filter_map(|d| d.alias.as_ref().map(|a| (d.address.clone(), a.clone())))
.collect();
drop(config);
build_device_metrics(&mut output, &device_readings, &alias_map);
}
Ok((
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
output,
))
}
async fn build_collector_metrics(output: &mut String, collector: &CollectorState) {
let running = collector.is_running();
let uptime = collector.started_at().map(|s| {
let now = OffsetDateTime::now_utc();
(now - s).whole_seconds().max(0)
});
output.push_str(
"# HELP aranet_collector_running Whether the collector is running (1=running, 0=stopped)\n",
);
output.push_str("# TYPE aranet_collector_running gauge\n");
output.push_str(&format!(
"aranet_collector_running {}\n\n",
if running { 1 } else { 0 }
));
if let Some(uptime_secs) = uptime {
output.push_str(
"# HELP aranet_collector_uptime_seconds How long the collector has been running\n",
);
output.push_str("# TYPE aranet_collector_uptime_seconds gauge\n");
output.push_str(&format!(
"aranet_collector_uptime_seconds {}\n\n",
uptime_secs
));
}
let device_stats = collector.device_stats.read().await;
if !device_stats.is_empty() {
let mut success_metrics = Vec::new();
let mut failure_metrics = Vec::new();
let mut polling_metrics = Vec::new();
let mut duration_metrics = Vec::new();
for stat in device_stats.iter() {
let alias = stat.alias.as_deref().unwrap_or(&stat.device_id);
let labels = format!(
"device=\"{}\",address=\"{}\"",
escape_label_value(alias),
escape_label_value(&stat.device_id)
);
success_metrics.push(format!(
"aranet_device_poll_success_total{{{}}} {}",
labels, stat.success_count
));
failure_metrics.push(format!(
"aranet_device_poll_failure_total{{{}}} {}",
labels, stat.failure_count
));
polling_metrics.push(format!(
"aranet_device_polling{{{}}} {}",
labels,
if stat.polling { 1 } else { 0 }
));
if let Some(duration_ms) = stat.last_poll_duration_ms {
duration_metrics.push(format!(
"aranet_device_poll_duration_ms{{{}}} {}",
labels, duration_ms
));
}
}
write_metric_family(
&mut *output,
"aranet_device_poll_success_total",
"Total number of successful polls",
"counter",
&success_metrics,
);
write_metric_family(
&mut *output,
"aranet_device_poll_failure_total",
"Total number of failed polls",
"counter",
&failure_metrics,
);
write_metric_family(
&mut *output,
"aranet_device_polling",
"Whether the device is currently being polled (1=yes, 0=no)",
"gauge",
&polling_metrics,
);
write_metric_family(
&mut *output,
"aranet_device_poll_duration_ms",
"Duration of the last poll in milliseconds",
"gauge",
&duration_metrics,
);
}
}
fn build_device_metrics(
output: &mut String,
device_readings: &[(aranet_store::StoredDevice, aranet_store::StoredReading)],
alias_map: &std::collections::HashMap<String, String>,
) {
let mut co2_metrics = Vec::new();
let mut temp_metrics = Vec::new();
let mut humidity_metrics = Vec::new();
let mut pressure_metrics = Vec::new();
let mut battery_metrics = Vec::new();
let mut age_metrics = Vec::new();
let mut radon_metrics = Vec::new();
let mut radiation_rate_metrics = Vec::new();
let mut radiation_total_metrics = Vec::new();
for (device, reading) in device_readings {
let device_label = alias_map
.get(&device.id)
.map(|s| s.as_str())
.unwrap_or_else(|| device.name.as_deref().unwrap_or(&device.id));
let labels = format!(
"device=\"{}\",address=\"{}\"",
escape_label_value(device_label),
escape_label_value(&device.id)
);
let device_type = resolve_device_type(device);
if device_type.is_none_or(|dt| dt.has_co2()) && reading.co2 > 0 {
co2_metrics.push(format!("aranet_co2_ppm{{{}}} {}", labels, reading.co2));
}
if device_type.is_none_or(|dt| dt.has_temperature()) {
temp_metrics.push(format!(
"aranet_temperature_celsius{{{}}} {:.2}",
labels, reading.temperature
));
}
if device_type.is_none_or(|dt| dt.has_humidity()) {
humidity_metrics.push(format!(
"aranet_humidity_percent{{{}}} {}",
labels, reading.humidity
));
}
if device_type.is_none_or(|dt| dt.has_pressure()) {
pressure_metrics.push(format!(
"aranet_pressure_hpa{{{}}} {:.2}",
labels, reading.pressure
));
}
battery_metrics.push(format!(
"aranet_battery_percent{{{}}} {}",
labels, reading.battery
));
let age_secs = (OffsetDateTime::now_utc() - reading.captured_at)
.whole_seconds()
.max(0);
age_metrics.push(format!(
"aranet_reading_age_seconds{{{}}} {}",
labels, age_secs
));
if let Some(radon) = reading.radon {
radon_metrics.push(format!("aranet_radon_bqm3{{{}}} {}", labels, radon));
}
if let Some(rate) = reading.radiation_rate {
radiation_rate_metrics.push(format!(
"aranet_radiation_rate_usvh{{{}}} {:.4}",
labels, rate
));
}
if let Some(total) = reading.radiation_total {
radiation_total_metrics.push(format!(
"aranet_radiation_total_msv{{{}}} {:.6}",
labels, total
));
}
}
let metric_families = [
(
"aranet_co2_ppm",
"CO2 concentration in parts per million",
&co2_metrics,
),
(
"aranet_temperature_celsius",
"Temperature in degrees Celsius",
&temp_metrics,
),
(
"aranet_humidity_percent",
"Relative humidity percentage",
&humidity_metrics,
),
(
"aranet_pressure_hpa",
"Atmospheric pressure in hectopascals",
&pressure_metrics,
),
(
"aranet_battery_percent",
"Battery level percentage",
&battery_metrics,
),
(
"aranet_reading_age_seconds",
"Age of the reading in seconds",
&age_metrics,
),
(
"aranet_radon_bqm3",
"Radon concentration in Bq/m³",
&radon_metrics,
),
(
"aranet_radiation_rate_usvh",
"Radiation rate in µSv/h",
&radiation_rate_metrics,
),
(
"aranet_radiation_total_msv",
"Total radiation dose in mSv",
&radiation_total_metrics,
),
];
for (name, help, metrics) in &metric_families {
write_metric_family(output, name, help, "gauge", metrics);
}
}
fn resolve_device_type(device: &aranet_store::StoredDevice) -> Option<aranet_types::DeviceType> {
device.device_type.or_else(|| {
device
.name
.as_deref()
.and_then(aranet_types::DeviceType::from_name)
.or_else(|| aranet_types::DeviceType::from_name(&device.id))
})
}
fn escape_label_value(s: &str) -> String {
s.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
}
#[derive(Debug, Serialize)]
pub struct StatusResponse {
pub version: &'static str,
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
pub collector: CollectorStatus,
pub devices: Vec<DeviceCollectionStats>,
}
#[derive(Debug, Serialize)]
pub struct CollectorStatus {
pub running: bool,
#[serde(with = "time::serde::rfc3339::option")]
pub started_at: Option<OffsetDateTime>,
pub uptime_seconds: Option<u64>,
}
async fn get_status(State(state): State<Arc<AppState>>) -> Json<StatusResponse> {
let running = state.collector.is_running();
let started_at = state.collector.started_at();
let uptime_seconds = started_at.map(|s| {
let now = OffsetDateTime::now_utc();
(now - s).whole_seconds().max(0) as u64
});
let devices = state.collector.device_stats.read().await.clone();
Json(StatusResponse {
version: env!("CARGO_PKG_VERSION"),
timestamp: OffsetDateTime::now_utc(),
collector: CollectorStatus {
running,
started_at,
uptime_seconds,
},
devices,
})
}
#[derive(Debug, Serialize)]
pub struct CollectorActionResponse {
pub success: bool,
pub message: String,
pub running: bool,
}
async fn collector_start(State(state): State<Arc<AppState>>) -> Json<CollectorActionResponse> {
let collector = Collector::new(Arc::clone(&state));
match collector.start().await {
CollectorStartResult::Started => Json(CollectorActionResponse {
success: true,
message: "Collector started".to_string(),
running: true,
}),
CollectorStartResult::AlreadyRunning => Json(CollectorActionResponse {
success: false,
message: "Collector is already running".to_string(),
running: true,
}),
CollectorStartResult::NoDevicesConfigured => Json(CollectorActionResponse {
success: false,
message: "No devices configured".to_string(),
running: false,
}),
}
}
async fn collector_stop(State(state): State<Arc<AppState>>) -> Json<CollectorActionResponse> {
if !state.collector.is_running() {
return Json(CollectorActionResponse {
success: false,
message: "Collector is not running".to_string(),
running: false,
});
}
let collector = Collector::new(Arc::clone(&state));
collector.stop().await;
Json(CollectorActionResponse {
success: true,
message: "Collector stopped".to_string(),
running: false,
})
}
#[derive(Debug, Serialize)]
pub struct ConfigResponse {
pub server: ServerConfigResponse,
pub devices: Vec<DeviceConfigResponse>,
}
#[derive(Debug, Serialize)]
pub struct ServerConfigResponse {
pub bind: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceConfigResponse {
pub address: String,
#[serde(default)]
pub alias: Option<String>,
#[serde(default = "default_poll_interval")]
pub poll_interval: u64,
}
fn default_poll_interval() -> u64 {
60
}
fn config_save_error(error: crate::config::ConfigError) -> AppError {
AppError::Internal(format!("Failed to save configuration: {}", error))
}
async fn get_config(State(state): State<Arc<AppState>>) -> Json<ConfigResponse> {
let config = state.config.read().await;
Json(ConfigResponse {
server: ServerConfigResponse {
bind: config.server.bind.clone(),
},
devices: config
.devices
.iter()
.map(|d| DeviceConfigResponse {
address: d.address.clone(),
alias: d.alias.clone(),
poll_interval: d.poll_interval,
})
.collect(),
})
}
#[derive(Debug, Deserialize)]
pub struct UpdateConfigRequest {
#[serde(default)]
pub devices: Option<Vec<DeviceConfigResponse>>,
}
async fn update_config(
State(state): State<Arc<AppState>>,
Json(request): Json<UpdateConfigRequest>,
) -> Result<Json<ConfigResponse>, AppError> {
let response = {
let mut config = state.config.write().await;
let previous_devices = config.devices.clone();
if let Some(devices) = request.devices {
config.devices = devices
.into_iter()
.map(|d| DeviceConfig {
address: d.address,
alias: d.alias,
poll_interval: d.poll_interval,
})
.collect();
}
if let Err(e) = config.validate() {
config.devices = previous_devices;
return Err(AppError::BadRequest(format!(
"Invalid configuration: {}",
e
)));
}
if let Err(e) = config.save(&state.config_path) {
config.devices = previous_devices;
return Err(config_save_error(e));
}
ConfigResponse {
server: ServerConfigResponse {
bind: config.server.bind.clone(),
},
devices: config
.devices
.iter()
.map(|d| DeviceConfigResponse {
address: d.address.clone(),
alias: d.alias.clone(),
poll_interval: d.poll_interval,
})
.collect(),
}
};
state.on_devices_changed().await;
Ok(Json(response))
}
#[derive(Debug, Deserialize)]
pub struct AddDeviceRequest {
pub address: String,
#[serde(default)]
pub alias: Option<String>,
#[serde(default = "default_poll_interval")]
pub poll_interval: u64,
}
async fn add_device(
State(state): State<Arc<AppState>>,
Json(request): Json<AddDeviceRequest>,
) -> Result<(StatusCode, Json<DeviceConfigResponse>), AppError> {
let response = {
let mut config = state.config.write().await;
let addr_lower = request.address.to_lowercase();
if config
.devices
.iter()
.any(|d| d.address.to_lowercase() == addr_lower)
{
return Err(AppError::Conflict(format!(
"Device {} is already being monitored",
request.address
)));
}
let device = DeviceConfig {
address: request.address.clone(),
alias: request.alias.clone(),
poll_interval: request.poll_interval,
};
let errors = device.validate("device");
if !errors.is_empty() {
return Err(AppError::BadRequest(
errors
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", "),
));
}
config.devices.push(device);
if let Err(e) = config.save(&state.config_path) {
config.devices.pop();
return Err(config_save_error(e));
}
DeviceConfigResponse {
address: request.address.clone(),
alias: request.alias.clone(),
poll_interval: request.poll_interval,
}
};
state.on_devices_changed().await;
Ok((StatusCode::CREATED, Json(response)))
}
#[derive(Debug, Deserialize)]
pub struct UpdateDeviceRequest {
#[serde(default, deserialize_with = "deserialize_optional_nullable")]
pub alias: Option<Option<String>>,
#[serde(default)]
pub poll_interval: Option<u64>,
}
fn deserialize_optional_nullable<'de, D>(
deserializer: D,
) -> std::result::Result<Option<Option<String>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Some(Option::deserialize(deserializer)?))
}
async fn update_device(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(request): Json<UpdateDeviceRequest>,
) -> Result<Json<DeviceConfigResponse>, AppError> {
let response = {
let mut config = state.config.write().await;
let id_lower = id.to_lowercase();
let device_index = config
.devices
.iter()
.position(|d| d.address.to_lowercase() == id_lower)
.ok_or_else(|| AppError::NotFound(format!("Device {} not found in config", id)))?;
let previous_device = config.devices[device_index].clone();
{
let device = &mut config.devices[device_index];
if let Some(alias) = request.alias {
device.alias = alias;
}
if let Some(poll_interval) = request.poll_interval {
device.poll_interval = poll_interval;
}
let errors = device.validate("device");
if !errors.is_empty() {
return Err(AppError::BadRequest(
errors
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", "),
));
}
}
let response = {
let device = &config.devices[device_index];
DeviceConfigResponse {
address: device.address.clone(),
alias: device.alias.clone(),
poll_interval: device.poll_interval,
}
};
if let Err(e) = config.save(&state.config_path) {
config.devices[device_index] = previous_device;
return Err(config_save_error(e));
}
response
};
state.on_devices_changed().await;
Ok(Json(response))
}
async fn remove_device(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, AppError> {
{
let mut config = state.config.write().await;
let previous_devices = config.devices.clone();
let id_lower = id.to_lowercase();
let original_len = config.devices.len();
config
.devices
.retain(|d| d.address.to_lowercase() != id_lower);
if config.devices.len() == original_len {
return Err(AppError::NotFound(format!(
"Device {} not found in config",
id
)));
}
if let Err(e) = config.save(&state.config_path) {
config.devices = previous_devices;
return Err(config_save_error(e));
}
}
state.on_devices_changed().await;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug, Serialize)]
pub struct DeviceResponse {
pub id: String,
pub name: Option<String>,
pub device_type: Option<String>,
pub serial: Option<String>,
pub firmware: Option<String>,
#[serde(with = "time::serde::rfc3339")]
pub first_seen: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub last_seen: OffsetDateTime,
}
impl From<aranet_store::StoredDevice> for DeviceResponse {
fn from(d: aranet_store::StoredDevice) -> Self {
Self {
id: d.id,
name: d.name,
device_type: d.device_type.map(|dt| format!("{:?}", dt)),
serial: d.serial,
firmware: d.firmware,
first_seen: d.first_seen,
last_seen: d.last_seen,
}
}
}
async fn list_devices(
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<DeviceResponse>>, AppError> {
let devices = state.with_store_read(|store| store.list_devices()).await?;
Ok(Json(devices.into_iter().map(Into::into).collect()))
}
async fn get_device(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Json<DeviceResponse>, AppError> {
let device = state
.with_store_read(|store| store.get_device(&id))
.await?
.ok_or(AppError::NotFound(format!("Device not found: {}", id)))?;
Ok(Json(device.into()))
}
const DEFAULT_STALE_THRESHOLD_SECS: i64 = 180;
#[derive(Debug, Serialize)]
pub struct CurrentReadingResponse {
#[serde(flatten)]
pub reading: aranet_store::StoredReading,
pub age_seconds: i64,
pub stale: bool,
}
#[derive(Debug, Serialize)]
pub struct DeviceLatestReadingResponse {
pub device_id: String,
pub alias: Option<String>,
pub name: Option<String>,
pub age_seconds: i64,
pub stale: bool,
pub reading: aranet_store::StoredReading,
}
fn reading_age_seconds(reading: &aranet_store::StoredReading) -> i64 {
(OffsetDateTime::now_utc() - reading.captured_at)
.whole_seconds()
.max(0)
}
fn reading_is_stale(
device_id: &str,
age_seconds: i64,
poll_intervals: &HashMap<String, u64>,
) -> bool {
let threshold = poll_intervals
.get(device_id)
.map(|interval| *interval as i64 * 3)
.unwrap_or(DEFAULT_STALE_THRESHOLD_SECS);
age_seconds > threshold
}
async fn list_current_readings(
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<DeviceLatestReadingResponse>>, AppError> {
let latest = state
.with_store_read(|store| store.list_latest_readings())
.await?;
let aliases = {
let config = state.config.read().await;
config
.devices
.iter()
.filter_map(|device| {
device
.alias
.as_ref()
.map(|alias| (device.address.clone(), alias.clone()))
})
.collect::<HashMap<_, _>>()
};
let poll_intervals = {
let stats = state.collector.device_stats.read().await;
stats
.iter()
.map(|stat| (stat.device_id.clone(), stat.poll_interval))
.collect::<HashMap<_, _>>()
};
let response = latest
.into_iter()
.map(|(device, reading)| {
let age_seconds = reading_age_seconds(&reading);
DeviceLatestReadingResponse {
device_id: device.id.clone(),
alias: aliases.get(&device.id).cloned(),
name: device.name,
age_seconds,
stale: reading_is_stale(&device.id, age_seconds, &poll_intervals),
reading,
}
})
.collect();
Ok(Json(response))
}
async fn get_current_reading(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Json<CurrentReadingResponse>, AppError> {
let reading = state
.with_store_read(|store| store.get_latest_reading(&id))
.await?
.ok_or(AppError::NotFound(format!(
"No readings for device: {}",
id
)))?;
let age_seconds = reading_age_seconds(&reading);
let stale = {
let stats = state.collector.device_stats.read().await;
let poll_intervals = stats
.iter()
.map(|stat| (stat.device_id.clone(), stat.poll_interval))
.collect::<HashMap<_, _>>();
reading_is_stale(&id, age_seconds, &poll_intervals)
};
Ok(Json(CurrentReadingResponse {
reading,
age_seconds,
stale,
}))
}
#[derive(Debug, Deserialize, Default)]
pub struct ReadingsQuery {
pub since: Option<i64>,
pub until: Option<i64>,
pub limit: Option<u32>,
pub offset: Option<u32>,
}
const MAX_QUERY_LIMIT: u32 = 10_000;
impl ReadingsQuery {
fn parse_timestamp(
field: &'static str,
value: Option<i64>,
) -> Result<Option<OffsetDateTime>, AppError> {
value
.map(|timestamp| {
OffsetDateTime::from_unix_timestamp(timestamp).map_err(|_| {
AppError::BadRequest(format!("Invalid '{}' timestamp: {}", field, timestamp))
})
})
.transpose()
}
fn since_datetime(&self) -> Result<Option<OffsetDateTime>, AppError> {
Self::parse_timestamp("since", self.since)
}
fn until_datetime(&self) -> Result<Option<OffsetDateTime>, AppError> {
Self::parse_timestamp("until", self.until)
}
pub fn validate(&self) -> Result<(), AppError> {
let since = self.since_datetime()?;
let until = self.until_datetime()?;
if let (Some(since), Some(until)) = (since, until)
&& since > until
{
return Err(AppError::BadRequest(format!(
"Invalid time range: 'since' ({}) must be less than or equal to 'until' ({})",
since.unix_timestamp(),
until.unix_timestamp()
)));
}
if let Some(limit) = self.limit
&& limit > MAX_QUERY_LIMIT
{
return Err(AppError::BadRequest(format!(
"limit {} exceeds maximum of {}",
limit, MAX_QUERY_LIMIT
)));
}
Ok(())
}
}
#[derive(Debug, Serialize)]
pub struct PaginatedResponse<T> {
pub data: Vec<T>,
pub pagination: PaginationMeta,
}
#[derive(Debug, Serialize)]
pub struct PaginationMeta {
pub count: usize,
pub offset: u32,
pub limit: Option<u32>,
pub has_more: bool,
}
async fn get_readings(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(params): Query<ReadingsQuery>,
) -> Result<Json<PaginatedResponse<aranet_store::StoredReading>>, AppError> {
params.validate()?;
let mut query = aranet_store::ReadingQuery::new().device(&id);
if let Some(dt) = params.since_datetime()? {
query = query.since(dt);
}
if let Some(dt) = params.until_datetime()? {
query = query.until(dt);
}
let request_limit = params.limit.map(|l| l + 1);
if let Some(limit) = request_limit {
query = query.limit(limit);
}
if let Some(offset) = params.offset {
query = query.offset(offset);
}
let mut readings = state
.with_store_read(|store| store.query_readings(&query))
.await?;
let has_more = params.limit.is_some_and(|l| readings.len() > l as usize);
if has_more {
readings.pop(); }
Ok(Json(PaginatedResponse {
pagination: PaginationMeta {
count: readings.len(),
offset: params.offset.unwrap_or(0),
limit: params.limit,
has_more,
},
data: readings,
}))
}
async fn get_history(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(params): Query<ReadingsQuery>,
) -> Result<Json<PaginatedResponse<aranet_store::StoredHistoryRecord>>, AppError> {
params.validate()?;
let mut query = aranet_store::HistoryQuery::new().device(&id);
if let Some(dt) = params.since_datetime()? {
query = query.since(dt);
}
if let Some(dt) = params.until_datetime()? {
query = query.until(dt);
}
let request_limit = params.limit.map(|l| l + 1);
if let Some(limit) = request_limit {
query = query.limit(limit);
}
if let Some(offset) = params.offset {
query = query.offset(offset);
}
let mut history = state
.with_store_read(|store| store.query_history(&query))
.await?;
let has_more = params.limit.is_some_and(|l| history.len() > l as usize);
if has_more {
history.pop(); }
Ok(Json(PaginatedResponse {
pagination: PaginationMeta {
count: history.len(),
offset: params.offset.unwrap_or(0),
limit: params.limit,
has_more,
},
data: history,
}))
}
async fn get_all_readings(
State(state): State<Arc<AppState>>,
Query(params): Query<ReadingsQuery>,
) -> Result<Json<PaginatedResponse<aranet_store::StoredReading>>, AppError> {
params.validate()?;
let mut query = aranet_store::ReadingQuery::new();
if let Some(dt) = params.since_datetime()? {
query = query.since(dt);
}
if let Some(dt) = params.until_datetime()? {
query = query.until(dt);
}
let request_limit = params.limit.map(|l| l + 1);
if let Some(limit) = request_limit {
query = query.limit(limit);
}
if let Some(offset) = params.offset {
query = query.offset(offset);
}
let mut readings = state
.with_store_read(|store| store.query_readings(&query))
.await?;
let has_more = params.limit.is_some_and(|l| readings.len() > l as usize);
if has_more {
readings.pop(); }
Ok(Json(PaginatedResponse {
pagination: PaginationMeta {
count: readings.len(),
offset: params.offset.unwrap_or(0),
limit: params.limit,
has_more,
},
data: readings,
}))
}
#[derive(Debug, thiserror::Error)]
pub enum AppError {
#[error("{0}")]
NotFound(String),
#[error("{0}")]
BadRequest(String),
#[error("{0}")]
Conflict(String),
#[error("{0}")]
ServiceUnavailable(String),
#[error(transparent)]
Store(#[from] aranet_store::Error),
#[error("{0}")]
Internal(String),
}
impl IntoResponse for AppError {
fn into_response(self) -> axum::response::Response {
let (status, message) = match self {
AppError::NotFound(msg) => (StatusCode::NOT_FOUND, msg),
AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
AppError::Conflict(msg) => (StatusCode::CONFLICT, msg),
AppError::ServiceUnavailable(msg) => (StatusCode::SERVICE_UNAVAILABLE, msg),
AppError::Store(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
AppError::Internal(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg),
};
let body = serde_json::json!({
"error": message,
});
(status, Json(body)).into_response()
}
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use axum::{
body::Body,
extract::ConnectInfo,
http::{Request, StatusCode},
};
use http_body_util::BodyExt;
use time::Duration;
use tower::ServiceExt;
use crate::config::{Config, SecurityConfig};
use crate::middleware::RateLimitState;
use aranet_types::HistoryRecord;
fn test_config_path() -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
std::env::temp_dir().join(format!(
"aranet-service-api-unit-test-{}-{}.toml",
std::process::id(),
nanos
))
}
fn create_test_state() -> Arc<AppState> {
let store = aranet_store::Store::open_in_memory().unwrap();
let config = Config::default();
AppState::with_config_path(store, config, test_config_path())
}
async fn response_body(response: axum::response::Response) -> String {
let body = response.into_body();
let bytes = body.collect().await.unwrap().to_bytes();
String::from_utf8(bytes.to_vec()).unwrap()
}
fn create_security_config() -> SecurityConfig {
SecurityConfig {
api_key_enabled: true,
api_key: Some("1234567890abcdef1234567890abcdef".to_string()),
rate_limit_enabled: true,
rate_limit_requests: 1,
rate_limit_window_secs: 60,
rate_limit_max_entries: 1024,
cors_origins: vec!["http://localhost:3000".to_string()],
}
}
fn request_with_connect_info(method: axum::http::Method, uri: &str) -> Request<Body> {
Request::builder()
.method(method)
.uri(uri)
.extension(ConnectInfo(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
12345,
)))
.body(Body::empty())
.unwrap()
}
fn create_full_app(security: SecurityConfig) -> axum::Router {
let state = create_test_state();
crate::app(
state,
Arc::new(security.clone()),
Arc::new(RateLimitState::new()),
)
.layer(crate::middleware::cors_layer(&security))
}
#[tokio::test]
async fn test_health_endpoint() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["status"], "ok");
assert!(json["version"].is_string());
assert!(json["timestamp"].is_string());
}
#[tokio::test]
async fn test_list_devices_empty() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json.as_array().unwrap().is_empty());
}
#[tokio::test]
async fn test_get_device_not_found() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/nonexistent")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["error"].as_str().unwrap().contains("not found"));
}
#[tokio::test]
async fn test_get_current_reading_not_found() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test-device/current")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_get_readings_empty() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test/readings")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["data"].as_array().unwrap().is_empty());
assert_eq!(json["pagination"]["count"], 0);
}
#[tokio::test]
async fn test_get_all_readings_empty() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/readings")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["data"].as_array().unwrap().is_empty());
assert_eq!(json["pagination"]["count"], 0);
}
#[tokio::test]
async fn test_get_history_empty() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test/history")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["data"].as_array().unwrap().is_empty());
assert_eq!(json["pagination"]["count"], 0);
}
#[tokio::test]
async fn test_full_app_requires_api_key_for_protected_routes() {
let app = create_full_app(create_security_config());
let response = app
.oneshot(request_with_connect_info(
axum::http::Method::GET,
"/api/devices",
))
.await
.unwrap();
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_full_app_accepts_api_key_header() {
let security = create_security_config();
let expected_key = security.api_key.clone().unwrap();
let app = create_full_app(security);
let response = app
.oneshot(
Request::builder()
.method(axum::http::Method::GET)
.uri("/api/devices")
.header("X-API-Key", expected_key)
.extension(ConnectInfo(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
12345,
)))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_full_app_rate_limit_enforced() {
let mut security = SecurityConfig {
api_key_enabled: false,
api_key: None,
..create_security_config()
};
security.rate_limit_requests = 1;
let app = create_full_app(security);
let first = app
.clone()
.oneshot(request_with_connect_info(
axum::http::Method::GET,
"/api/health",
))
.await
.unwrap();
assert_eq!(first.status(), StatusCode::OK);
let second = app
.oneshot(request_with_connect_info(
axum::http::Method::GET,
"/api/health",
))
.await
.unwrap();
assert_eq!(second.status(), StatusCode::TOO_MANY_REQUESTS);
assert_eq!(second.headers().get("x-ratelimit-limit").unwrap(), "1");
}
#[tokio::test]
async fn test_full_app_cors_preflight_uses_security_config() {
let mut security = SecurityConfig {
api_key_enabled: false,
api_key: None,
..create_security_config()
};
security.rate_limit_enabled = false;
let app = create_full_app(security);
let response = app
.oneshot(
Request::builder()
.method(axum::http::Method::OPTIONS)
.uri("/api/health")
.header("Origin", "http://localhost:3000")
.header("Access-Control-Request-Method", "GET")
.extension(ConnectInfo(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
12345,
)))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get("access-control-allow-origin")
.unwrap(),
"http://localhost:3000"
);
}
#[tokio::test]
async fn test_get_readings_invalid_since_returns_bad_request() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test/readings?since=9223372036854775807")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_get_history_invalid_until_returns_bad_request() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test/history?until=9223372036854775807")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_get_all_readings_invalid_since_returns_bad_request() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/readings?since=9223372036854775807")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_get_history_respects_offset() {
let state = create_test_state();
{
let store = state.store.lock().await;
let records = vec![
HistoryRecord {
timestamp: OffsetDateTime::UNIX_EPOCH + Duration::seconds(1),
co2: 100,
temperature: 20.0,
pressure: 1000.0,
humidity: 40,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: OffsetDateTime::UNIX_EPOCH + Duration::seconds(2),
co2: 200,
temperature: 21.0,
pressure: 1001.0,
humidity: 41,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: OffsetDateTime::UNIX_EPOCH + Duration::seconds(3),
co2: 300,
temperature: 22.0,
pressure: 1002.0,
humidity: 42,
radon: None,
radiation_rate: None,
radiation_total: None,
},
];
store.insert_history("test-device", &records).unwrap();
}
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test-device/history?limit=1&offset=1")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["pagination"]["count"], 1);
assert_eq!(json["pagination"]["offset"], 1);
assert_eq!(json["pagination"]["limit"], 1);
assert!(json["pagination"]["has_more"].as_bool().unwrap());
assert_eq!(json["data"][0]["co2"], 200);
}
#[tokio::test]
async fn test_readings_query_params() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test/readings?since=1704067200&until=1704153600&limit=10&offset=0")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn test_health_response_serialization() {
let response = HealthResponse {
status: "ok",
version: env!("CARGO_PKG_VERSION"),
timestamp: time::OffsetDateTime::now_utc(),
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("ok"));
assert!(json.contains(env!("CARGO_PKG_VERSION")));
}
#[test]
fn test_device_response_from_stored_device() {
let stored = aranet_store::StoredDevice {
id: "AA:BB:CC:DD:EE:FF".to_string(),
name: Some("Test Device".to_string()),
device_type: Some(aranet_types::DeviceType::Aranet4),
serial: Some("12345".to_string()),
firmware: Some("1.2.3".to_string()),
hardware: Some("2.0".to_string()),
first_seen: time::OffsetDateTime::now_utc(),
last_seen: time::OffsetDateTime::now_utc(),
};
let response: DeviceResponse = stored.into();
assert_eq!(response.id, "AA:BB:CC:DD:EE:FF");
assert_eq!(response.name, Some("Test Device".to_string()));
assert_eq!(response.device_type, Some("Aranet4".to_string()));
assert_eq!(response.serial, Some("12345".to_string()));
assert_eq!(response.firmware, Some("1.2.3".to_string()));
}
#[test]
fn test_readings_query_default() {
let query = ReadingsQuery::default();
assert!(query.since.is_none());
assert!(query.until.is_none());
assert!(query.limit.is_none());
assert!(query.offset.is_none());
}
#[test]
fn test_readings_query_validate_rejects_invalid_timestamps() {
let query = ReadingsQuery {
since: Some(i64::MAX),
..Default::default()
};
let error = query.validate().unwrap_err();
assert!(matches!(error, AppError::BadRequest(_)));
assert!(error.to_string().contains("Invalid 'since' timestamp"));
}
#[test]
fn test_app_error_not_found() {
let error = AppError::NotFound("test".to_string());
let response = error.into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[test]
fn test_app_error_internal() {
let error = AppError::Internal("internal error".to_string());
let response = error.into_response();
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[test]
fn test_app_error_debug() {
let error = AppError::NotFound("test".to_string());
let debug = format!("{:?}", error);
assert!(debug.contains("NotFound"));
assert!(debug.contains("test"));
}
#[test]
fn test_app_error_bad_request() {
let error = AppError::BadRequest("invalid input".to_string());
let response = error.into_response();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[test]
fn test_app_error_conflict() {
let error = AppError::Conflict("resource exists".to_string());
let response = error.into_response();
assert_eq!(response.status(), StatusCode::CONFLICT);
}
#[tokio::test]
async fn test_get_status_endpoint() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["version"].is_string());
assert!(json["timestamp"].is_string());
assert!(json["collector"].is_object());
assert!(json["collector"]["running"].is_boolean());
assert!(json["devices"].is_array());
}
#[tokio::test]
async fn test_get_config_endpoint() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/config")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["server"].is_object());
assert!(json["server"]["bind"].is_string());
assert!(json["devices"].is_array());
}
#[tokio::test]
async fn test_add_device_endpoint() {
let state = create_test_state();
let app = router().with_state(Arc::clone(&state));
let request_body = serde_json::json!({
"address": "AA:BB:CC:DD:EE:FF",
"alias": "Test Device",
"poll_interval": 120
});
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/config/devices")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::CREATED);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["address"], "AA:BB:CC:DD:EE:FF");
assert_eq!(json["alias"], "Test Device");
assert_eq!(json["poll_interval"], 120);
}
#[tokio::test]
async fn test_add_duplicate_device() {
let state = create_test_state();
{
let mut config = state.config.write().await;
config.devices.push(DeviceConfig {
address: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("First".to_string()),
poll_interval: 60,
});
}
let app = router().with_state(state);
let request_body = serde_json::json!({
"address": "AA:BB:CC:DD:EE:FF",
"alias": "Duplicate"
});
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/config/devices")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::CONFLICT);
}
#[tokio::test]
async fn test_update_device_endpoint() {
let state = create_test_state();
{
let mut config = state.config.write().await;
config.devices.push(DeviceConfig {
address: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("Original".to_string()),
poll_interval: 60,
});
}
let app = router().with_state(state);
let request_body = serde_json::json!({
"alias": "Updated Name",
"poll_interval": 300
});
let response = app
.oneshot(
Request::builder()
.method("PUT")
.uri("/api/config/devices/AA:BB:CC:DD:EE:FF")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["alias"], "Updated Name");
assert_eq!(json["poll_interval"], 300);
}
#[tokio::test]
async fn test_update_nonexistent_device() {
let state = create_test_state();
let app = router().with_state(state);
let request_body = serde_json::json!({
"alias": "New Name"
});
let response = app
.oneshot(
Request::builder()
.method("PUT")
.uri("/api/config/devices/NONEXISTENT")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_remove_device_endpoint() {
let state = create_test_state();
{
let mut config = state.config.write().await;
config.devices.push(DeviceConfig {
address: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("To Remove".to_string()),
poll_interval: 60,
});
}
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/config/devices/AA:BB:CC:DD:EE:FF")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn test_remove_nonexistent_device() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/config/devices/NONEXISTENT")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_collector_start_stop() {
let state = create_test_state();
let app = router().with_state(Arc::clone(&state));
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/collector/start")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(!json["success"].as_bool().unwrap());
assert_eq!(json["message"], "No devices configured");
assert!(!json["running"].as_bool().unwrap());
}
#[tokio::test]
async fn test_collector_start_stop_with_configured_device() {
let state = create_test_state();
{
let mut config = state.config.write().await;
config.devices.push(DeviceConfig {
address: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("Test".to_string()),
poll_interval: 60,
});
}
let app = router().with_state(Arc::clone(&state));
let start = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/collector/start")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(start.status(), StatusCode::OK);
let body = response_body(start).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["success"].as_bool().unwrap());
assert_eq!(json["message"], "Collector started");
let stop = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/collector/stop")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(stop.status(), StatusCode::OK);
let body = response_body(stop).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["success"].as_bool().unwrap());
assert_eq!(json["message"], "Collector stopped");
}
#[tokio::test]
async fn test_collector_start_already_running() {
let state = create_test_state();
state.collector.set_running(true);
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/collector/start")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(!json["success"].as_bool().unwrap());
assert_eq!(json["message"], "Collector is already running");
}
#[tokio::test]
async fn test_collector_stop_not_running() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/collector/stop")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(!json["success"].as_bool().unwrap());
assert_eq!(json["message"], "Collector is not running");
}
#[tokio::test]
async fn test_update_config_with_devices() {
let state = create_test_state();
let app = router().with_state(state);
let request_body = serde_json::json!({
"devices": [
{
"address": "AA:BB:CC:DD:EE:01",
"alias": "Device 1",
"poll_interval": 60
},
{
"address": "AA:BB:CC:DD:EE:02",
"alias": "Device 2",
"poll_interval": 120
}
]
});
let response = app
.oneshot(
Request::builder()
.method("PUT")
.uri("/api/config")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&request_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
let devices = json["devices"].as_array().unwrap();
assert_eq!(devices.len(), 2);
}
#[test]
fn test_status_response_serialization() {
let status = StatusResponse {
version: "1.0.0",
timestamp: time::OffsetDateTime::now_utc(),
collector: CollectorStatus {
running: true,
started_at: Some(time::OffsetDateTime::now_utc()),
uptime_seconds: Some(3600),
},
devices: vec![],
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("1.0.0"));
assert!(json.contains("3600"));
}
#[test]
fn test_collector_action_response_serialization() {
let response = CollectorActionResponse {
success: true,
message: "Test message".to_string(),
running: true,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("true"));
assert!(json.contains("Test message"));
}
#[test]
fn test_config_response_serialization() {
let config = ConfigResponse {
server: ServerConfigResponse {
bind: "0.0.0.0:8080".to_string(),
},
devices: vec![DeviceConfigResponse {
address: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("Test".to_string()),
poll_interval: 60,
}],
};
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("0.0.0.0:8080"));
assert!(json.contains("AA:BB:CC:DD:EE:FF"));
}
#[test]
fn test_device_config_response_deserialization() {
let json = r#"{"address": "TEST", "alias": "My Device", "poll_interval": 180}"#;
let config: DeviceConfigResponse = serde_json::from_str(json).unwrap();
assert_eq!(config.address, "TEST");
assert_eq!(config.alias, Some("My Device".to_string()));
assert_eq!(config.poll_interval, 180);
}
#[test]
fn test_device_config_response_default_poll_interval() {
let json = r#"{"address": "TEST"}"#;
let config: DeviceConfigResponse = serde_json::from_str(json).unwrap();
assert_eq!(config.address, "TEST");
assert_eq!(config.poll_interval, 60); }
#[test]
fn test_add_device_request_deserialization() {
let json = r#"{"address": "TEST-ADDR", "alias": "Kitchen", "poll_interval": 90}"#;
let request: AddDeviceRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.address, "TEST-ADDR");
assert_eq!(request.alias, Some("Kitchen".to_string()));
assert_eq!(request.poll_interval, 90);
}
#[test]
fn test_update_device_request_deserialization() {
let json = r#"{"alias": "New Name", "poll_interval": 300}"#;
let request: UpdateDeviceRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.alias, Some(Some("New Name".to_string())));
assert_eq!(request.poll_interval, Some(300));
}
#[test]
fn test_update_config_request_deserialization() {
let json = r#"{"devices": [{"address": "DEV1"}]}"#;
let request: UpdateConfigRequest = serde_json::from_str(json).unwrap();
assert!(request.devices.is_some());
assert_eq!(request.devices.unwrap().len(), 1);
}
#[tokio::test]
async fn test_full_device_lifecycle() {
let state = create_test_state();
let app = router().with_state(Arc::clone(&state));
let add_body = serde_json::json!({
"address": "AA:BB:CC:DD:EE:FF",
"alias": "Living Room",
"poll_interval": 90
});
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/config/devices")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&add_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::CREATED);
let app = router().with_state(Arc::clone(&state));
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/api/config")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["devices"].as_array().unwrap().len(), 1);
let app = router().with_state(Arc::clone(&state));
let update_body = serde_json::json!({
"alias": "Kitchen",
"poll_interval": 120
});
let response = app
.clone()
.oneshot(
Request::builder()
.method("PUT")
.uri("/api/config/devices/AA:BB:CC:DD:EE:FF")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&update_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["alias"], "Kitchen");
assert_eq!(json["poll_interval"], 120);
let app = router().with_state(Arc::clone(&state));
let response = app
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/config/devices/AA:BB:CC:DD:EE:FF")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let app = router().with_state(Arc::clone(&state));
let response = app
.oneshot(
Request::builder()
.uri("/api/config")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["devices"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn test_invalid_json_body() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/config/devices")
.header("content-type", "application/json")
.body(Body::from("{ invalid json }"))
.unwrap(),
)
.await
.unwrap();
assert!(response.status().is_client_error());
}
#[tokio::test]
async fn test_missing_required_field() {
let state = create_test_state();
let app = router().with_state(state);
let body = serde_json::json!({
"alias": "Test Device"
});
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/config/devices")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert!(response.status().is_client_error());
}
#[tokio::test]
async fn test_case_insensitive_device_lookup() {
let state = create_test_state();
{
let mut config = state.config.write().await;
config.devices.push(DeviceConfig {
address: "AA:BB:CC:DD:EE:FF".to_string(),
alias: Some("Test".to_string()),
poll_interval: 60,
});
}
let app = router().with_state(Arc::clone(&state));
let update_body = serde_json::json!({
"alias": "Updated"
});
let response = app
.oneshot(
Request::builder()
.method("PUT")
.uri("/api/config/devices/aa:bb:cc:dd:ee:ff")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&update_body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_data_endpoints_with_stored_data() {
let state = create_test_state();
{
let store = state.store.lock().await;
store
.upsert_device("test-sensor", Some("Test Sensor"))
.unwrap();
let reading = aranet_types::CurrentReading {
co2: 750,
temperature: 23.5,
pressure: 1015.0,
humidity: 48,
battery: 90,
status: aranet_types::Status::Green,
interval: 60,
age: 5,
captured_at: Some(time::OffsetDateTime::now_utc()),
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
};
store.insert_reading("test-sensor", &reading).unwrap();
}
let app = router().with_state(Arc::clone(&state));
let response = app
.oneshot(
Request::builder()
.uri("/api/devices")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json.as_array().unwrap().len(), 1);
assert_eq!(json[0]["id"], "test-sensor");
let app = router().with_state(Arc::clone(&state));
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test-sensor")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["name"], "Test Sensor");
let app = router().with_state(Arc::clone(&state));
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/test-sensor/current")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["co2"], 750);
assert_eq!(json["temperature"], 23.5);
}
#[tokio::test]
async fn test_error_response_format() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/nonexistent-device")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json.get("error").is_some());
assert!(json["error"].as_str().unwrap().contains("not found"));
}
#[tokio::test]
async fn test_concurrent_api_requests() {
let state = create_test_state();
let mut handles = Vec::new();
for _ in 0..10 {
let state = Arc::clone(&state);
handles.push(tokio::spawn(async move {
let app = router().with_state(Arc::clone(&state));
let response = app
.oneshot(
Request::builder()
.uri("/api/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}));
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_health_detailed_endpoint() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/health/detailed")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(json["status"].is_string());
assert!(json["version"].is_string());
assert!(json["timestamp"].is_string());
assert!(json["database"]["ok"].is_boolean());
assert!(json["database"]["device_count"].is_number());
assert!(json["collector"]["running"].is_boolean());
assert!(json["collector"]["configured_devices"].is_number());
assert!(json["platform"]["os"].is_string());
assert!(json["platform"]["arch"].is_string());
}
#[tokio::test]
async fn test_staleness_threshold_default_when_no_collector_stats() {
let state = create_test_state();
{
let store = state.store.lock().await;
let reading = aranet_types::CurrentReading {
co2: 500,
temperature: 22.0,
pressure: 1013.0,
humidity: 50,
battery: 80,
status: aranet_types::Status::Green,
interval: 60,
age: 0,
captured_at: Some(time::OffsetDateTime::now_utc() - time::Duration::seconds(200)),
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
};
store.insert_reading("Aranet4 AABB0", &reading).unwrap();
}
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/Aranet4%20AABB0/current")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(
json["stale"].as_bool().unwrap(),
"reading >180s old should be stale"
);
assert!(json["age_seconds"].as_i64().unwrap() >= 200);
}
#[tokio::test]
async fn test_staleness_threshold_fresh_reading() {
let state = create_test_state();
{
let store = state.store.lock().await;
let reading = aranet_types::CurrentReading {
co2: 400,
temperature: 21.0,
pressure: 1013.0,
humidity: 45,
battery: 90,
status: aranet_types::Status::Green,
interval: 60,
age: 0,
captured_at: Some(time::OffsetDateTime::now_utc()),
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
};
store.insert_reading("Aranet4 AABB1", &reading).unwrap();
}
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/devices/Aranet4%20AABB1/current")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(
!json["stale"].as_bool().unwrap(),
"fresh reading should not be stale"
);
}
#[tokio::test]
async fn test_health_detailed_status_degraded_when_collector_stopped() {
let state = create_test_state();
let app = router().with_state(state);
let response = app
.oneshot(
Request::builder()
.uri("/api/health/detailed")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = response_body(response).await;
let json: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(json["status"], "degraded");
assert!(!json["collector"]["running"].as_bool().unwrap());
}
}