use std::path::Path;
use rusqlite::{Connection, OptionalExtension};
use time::OffsetDateTime;
use tracing::{debug, info, warn};
use aranet_types::{CurrentReading, DeviceInfo, DeviceType, HistoryRecord, Status};
fn timestamp_from_unix(ts: i64) -> OffsetDateTime {
OffsetDateTime::from_unix_timestamp(ts).unwrap_or_else(|_| {
warn!(
"Corrupted timestamp {} in database, substituting UNIX_EPOCH. \
Consider running a database integrity check.",
ts
);
OffsetDateTime::UNIX_EPOCH
})
}
use crate::error::{Error, Result};
use crate::models::{StoredDevice, StoredHistoryRecord, StoredReading, SyncState};
use crate::queries::{HistoryQuery, ReadingQuery};
use crate::schema;
pub struct Store {
conn: Connection,
}
impl Store {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
if let Some(parent) = path.parent()
&& !parent.exists()
{
std::fs::create_dir_all(parent).map_err(|e| Error::CreateDirectory {
path: parent.to_path_buf(),
source: e,
})?;
}
info!("Opening database at {}", path.display());
let conn = Connection::open(path)?;
conn.execute_batch(
"PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;",
)?;
schema::initialize(&conn)?;
Ok(Self { conn })
}
pub fn open_default() -> Result<Self> {
Self::open(crate::default_db_path())
}
pub fn open_in_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
conn.execute_batch("PRAGMA foreign_keys = ON;")?;
schema::initialize(&conn)?;
Ok(Self { conn })
}
pub fn upsert_device(&self, device_id: &str, name: Option<&str>) -> Result<StoredDevice> {
let now = OffsetDateTime::now_utc().unix_timestamp();
self.conn.execute(
"INSERT INTO devices (id, name, first_seen, last_seen) VALUES (?1, ?2, ?3, ?3)
ON CONFLICT(id) DO UPDATE SET
name = COALESCE(?2, name),
last_seen = ?3",
rusqlite::params![device_id, name, now],
)?;
self.get_device(device_id)?
.ok_or_else(|| Error::DeviceNotFound(device_id.to_string()))
}
pub fn update_device_metadata(
&self,
device_id: &str,
name: Option<&str>,
device_type: Option<DeviceType>,
) -> Result<()> {
let device_type_str = device_type.map(|dt| format!("{:?}", dt));
let now = OffsetDateTime::now_utc().unix_timestamp();
self.conn.execute(
"UPDATE devices SET
name = COALESCE(?2, name),
device_type = COALESCE(?3, device_type),
last_seen = ?4
WHERE id = ?1",
rusqlite::params![device_id, name, device_type_str, now],
)?;
Ok(())
}
pub fn update_device_info(&self, device_id: &str, info: &DeviceInfo) -> Result<()> {
let device_type = DeviceType::from_name(&info.model).map(|dt| format!("{:?}", dt));
let name = if info.name.is_empty() {
None
} else {
Some(&info.name)
};
self.conn.execute(
"UPDATE devices SET
name = COALESCE(?2, name),
device_type = COALESCE(?3, device_type),
serial = COALESCE(?4, serial),
firmware = COALESCE(?5, firmware),
hardware = COALESCE(?6, hardware),
last_seen = ?7
WHERE id = ?1",
rusqlite::params![
device_id,
name,
device_type,
&info.serial,
&info.firmware,
&info.hardware,
OffsetDateTime::now_utc().unix_timestamp()
],
)?;
Ok(())
}
pub fn get_device(&self, device_id: &str) -> Result<Option<StoredDevice>> {
let mut stmt = self.conn.prepare(
"SELECT id, name, device_type, serial, firmware, hardware, first_seen, last_seen
FROM devices WHERE id = ?",
)?;
let device = stmt
.query_row([device_id], |row| {
Ok(StoredDevice {
id: row.get(0)?,
name: row.get(1)?,
device_type: row
.get::<_, Option<String>>(2)?
.and_then(|s| parse_device_type(&s)),
serial: row.get(3)?,
firmware: row.get(4)?,
hardware: row.get(5)?,
first_seen: timestamp_from_unix(row.get(6)?),
last_seen: timestamp_from_unix(row.get(7)?),
})
})
.optional()?;
Ok(device)
}
pub fn list_devices(&self) -> Result<Vec<StoredDevice>> {
let mut stmt = self.conn.prepare(
"SELECT id, name, device_type, serial, firmware, hardware, first_seen, last_seen
FROM devices ORDER BY last_seen DESC",
)?;
let devices = stmt
.query_map([], |row| {
Ok(StoredDevice {
id: row.get(0)?,
name: row.get(1)?,
device_type: row
.get::<_, Option<String>>(2)?
.and_then(|s| parse_device_type(&s)),
serial: row.get(3)?,
firmware: row.get(4)?,
hardware: row.get(5)?,
first_seen: timestamp_from_unix(row.get(6)?),
last_seen: timestamp_from_unix(row.get(7)?),
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(devices)
}
pub fn delete_device(&self, device_id: &str) -> Result<bool> {
let tx = self.conn.unchecked_transaction()?;
tx.execute(
"DELETE FROM history WHERE device_id = ?1",
rusqlite::params![device_id],
)?;
tx.execute(
"DELETE FROM readings WHERE device_id = ?1",
rusqlite::params![device_id],
)?;
tx.execute(
"DELETE FROM sync_state WHERE device_id = ?1",
rusqlite::params![device_id],
)?;
let rows_deleted = tx.execute(
"DELETE FROM devices WHERE id = ?1",
rusqlite::params![device_id],
)?;
tx.commit()?;
Ok(rows_deleted > 0)
}
}
fn parse_device_type(s: &str) -> Option<DeviceType> {
match s {
"Aranet4" => Some(DeviceType::Aranet4),
"Aranet2" => Some(DeviceType::Aranet2),
"AranetRadon" => Some(DeviceType::AranetRadon),
"AranetRadiation" => Some(DeviceType::AranetRadiation),
_ => None,
}
}
fn parse_status(s: &str) -> Status {
match s {
"Green" => Status::Green,
"Yellow" => Status::Yellow,
"Red" => Status::Red,
"Error" => Status::Error,
_ => Status::Green,
}
}
impl Store {
pub fn insert_reading(&self, device_id: &str, reading: &CurrentReading) -> Result<i64> {
self.upsert_device(device_id, None)?;
let captured_at = reading
.captured_at
.unwrap_or_else(OffsetDateTime::now_utc)
.unix_timestamp();
self.conn.execute(
"INSERT INTO readings (device_id, captured_at, co2, temperature, pressure,
humidity, battery, status, radon, radiation_rate, radiation_total)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
rusqlite::params![
device_id,
captured_at,
reading.co2,
reading.temperature,
reading.pressure,
reading.humidity,
reading.battery,
format!("{:?}", reading.status),
reading.radon,
reading.radiation_rate,
reading.radiation_total,
],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn query_readings(&self, query: &ReadingQuery) -> Result<Vec<StoredReading>> {
let sql = query.build_sql();
let (_, params) = query.build_where();
debug!("Executing query: {}", sql);
let params_ref: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = self.conn.prepare(&sql)?;
let readings = stmt
.query_map(params_ref.as_slice(), |row| {
Ok(StoredReading {
id: row.get(0)?,
device_id: row.get(1)?,
captured_at: timestamp_from_unix(row.get(2)?),
co2: row.get::<_, i64>(3)? as u16,
temperature: row.get(4)?,
pressure: row.get(5)?,
humidity: row.get::<_, i64>(6)? as u8,
battery: row.get::<_, i64>(7)? as u8,
status: parse_status(&row.get::<_, String>(8)?),
radon: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
radiation_rate: row.get(10)?,
radiation_total: row.get(11)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(readings)
}
pub fn get_latest_reading(&self, device_id: &str) -> Result<Option<StoredReading>> {
let query = ReadingQuery::new().device(device_id).limit(1);
let mut readings = self.query_readings(&query)?;
Ok(readings.pop())
}
pub fn count_readings(&self, device_id: Option<&str>) -> Result<u64> {
let count: i64 = match device_id {
Some(id) => self.conn.query_row(
"SELECT COUNT(*) FROM readings WHERE device_id = ?",
[id],
|row| row.get(0),
)?,
None => self
.conn
.query_row("SELECT COUNT(*) FROM readings", [], |row| row.get(0))?,
};
Ok(count as u64)
}
}
impl Store {
pub fn insert_history(&self, device_id: &str, records: &[HistoryRecord]) -> Result<usize> {
self.upsert_device(device_id, None)?;
let synced_at = OffsetDateTime::now_utc().unix_timestamp();
let mut inserted = 0;
for record in records {
let result = self.conn.execute(
"INSERT OR IGNORE INTO history (device_id, timestamp, synced_at, co2,
temperature, pressure, humidity, radon, radiation_rate, radiation_total)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
device_id,
record.timestamp.unix_timestamp(),
synced_at,
record.co2,
record.temperature,
record.pressure,
record.humidity,
record.radon,
record.radiation_rate,
record.radiation_total,
],
)?;
inserted += result;
}
info!(
"Inserted {} new history records for {}",
inserted, device_id
);
Ok(inserted)
}
pub fn query_history(&self, query: &HistoryQuery) -> Result<Vec<StoredHistoryRecord>> {
let mut conditions = Vec::new();
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(ref device_id) = query.device_id {
conditions.push("device_id = ?");
params.push(Box::new(device_id.clone()));
}
if let Some(since) = query.since {
conditions.push("timestamp >= ?");
params.push(Box::new(since.unix_timestamp()));
}
if let Some(until) = query.until {
conditions.push("timestamp <= ?");
params.push(Box::new(until.unix_timestamp()));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let order = if query.newest_first { "DESC" } else { "ASC" };
let mut sql = format!(
"SELECT id, device_id, timestamp, synced_at, co2, temperature, pressure,
humidity, radon, radiation_rate, radiation_total
FROM history {} ORDER BY timestamp {}",
where_clause, order
);
if let Some(limit) = query.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
if let Some(offset) = query.offset {
sql.push_str(&format!(" OFFSET {}", offset));
}
let params_ref: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = self.conn.prepare(&sql)?;
let records = stmt
.query_map(params_ref.as_slice(), |row| {
Ok(StoredHistoryRecord {
id: row.get(0)?,
device_id: row.get(1)?,
timestamp: timestamp_from_unix(row.get(2)?),
synced_at: timestamp_from_unix(row.get(3)?),
co2: row.get::<_, i64>(4)? as u16,
temperature: row.get(5)?,
pressure: row.get(6)?,
humidity: row.get::<_, i64>(7)? as u8,
radon: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
radiation_rate: row.get(9)?,
radiation_total: row.get(10)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(records)
}
pub fn count_history(&self, device_id: Option<&str>) -> Result<u64> {
let count: i64 = match device_id {
Some(id) => self.conn.query_row(
"SELECT COUNT(*) FROM history WHERE device_id = ?",
[id],
|row| row.get(0),
)?,
None => self
.conn
.query_row("SELECT COUNT(*) FROM history", [], |row| row.get(0))?,
};
Ok(count as u64)
}
}
impl Store {
pub fn get_sync_state(&self, device_id: &str) -> Result<Option<SyncState>> {
let mut stmt = self.conn.prepare(
"SELECT device_id, last_history_index, total_readings, last_sync_at
FROM sync_state WHERE device_id = ?",
)?;
let state = stmt
.query_row([device_id], |row| {
Ok(SyncState {
device_id: row.get(0)?,
last_history_index: row.get::<_, Option<i64>>(1)?.map(|v| v as u16),
total_readings: row.get::<_, Option<i64>>(2)?.map(|v| v as u16),
last_sync_at: row.get::<_, Option<i64>>(3)?.map(timestamp_from_unix),
})
})
.optional()?;
Ok(state)
}
pub fn update_sync_state(
&self,
device_id: &str,
last_index: u16,
total_readings: u16,
) -> Result<()> {
let now = OffsetDateTime::now_utc().unix_timestamp();
self.conn.execute(
"INSERT INTO sync_state (device_id, last_history_index, total_readings, last_sync_at)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(device_id) DO UPDATE SET
last_history_index = ?2,
total_readings = ?3,
last_sync_at = ?4",
rusqlite::params![device_id, last_index, total_readings, now],
)?;
debug!(
"Updated sync state for {}: index={}, total={}",
device_id, last_index, total_readings
);
Ok(())
}
pub fn calculate_sync_start(&self, device_id: &str, current_total: u16) -> Result<u16> {
let state = self.get_sync_state(device_id)?;
match state {
Some(s) if s.total_readings == Some(current_total) => {
if s.last_sync_at.is_some() {
let latest_stored = self.get_latest_history_timestamp(device_id)?;
match latest_stored {
Some(latest_ts) => {
let now = OffsetDateTime::now_utc();
let time_since_latest = now - latest_ts;
if time_since_latest > time::Duration::minutes(10) {
debug!(
"Buffer may have wrapped for {} (latest record is {} min old), doing full sync",
device_id,
time_since_latest.whole_minutes()
);
return Ok(1);
}
debug!("No new readings for {}", device_id);
Ok(current_total + 1)
}
None => {
debug!(
"Sync state exists but no history for {}, doing full sync",
device_id
);
Ok(1)
}
}
} else {
debug!("No sync timestamp for {}, doing full sync", device_id);
Ok(1)
}
}
Some(s) if s.last_history_index.is_some() => {
let last_index = s.last_history_index.unwrap();
let prev_total = s.total_readings.unwrap_or(0);
if current_total < prev_total {
debug!(
"Device total decreased ({} -> {}) for {}, device was reset - doing full sync",
prev_total, current_total, device_id
);
return Ok(1);
}
let new_count = current_total.saturating_sub(prev_total);
if new_count > 0 {
let start = last_index.saturating_add(1);
if start > current_total {
debug!(
"Start index {} exceeds device total {} for {}, doing full sync",
start, current_total, device_id
);
return Ok(1);
}
debug!(
"Incremental sync for {}: {} new readings, starting at {}",
device_id, new_count, start
);
Ok(start)
} else {
Ok(current_total + 1)
}
}
_ => {
debug!(
"First sync for {}: downloading all {} readings",
device_id, current_total
);
Ok(1)
}
}
}
fn get_latest_history_timestamp(&self, device_id: &str) -> Result<Option<OffsetDateTime>> {
let ts: Option<i64> = self
.conn
.query_row(
"SELECT MAX(timestamp) FROM history WHERE device_id = ?",
[device_id],
|row| row.get(0),
)
.optional()?
.flatten();
Ok(ts.map(timestamp_from_unix))
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct HistoryStats {
pub count: u64,
pub min: HistoryAggregates,
pub max: HistoryAggregates,
pub avg: HistoryAggregates,
pub time_range: Option<(OffsetDateTime, OffsetDateTime)>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct HistoryAggregates {
pub co2: Option<f64>,
pub temperature: Option<f64>,
pub pressure: Option<f64>,
pub humidity: Option<f64>,
pub radon: Option<f64>,
}
impl Store {
pub fn history_stats(&self, query: &HistoryQuery) -> Result<HistoryStats> {
let mut conditions = Vec::new();
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(ref device_id) = query.device_id {
conditions.push("device_id = ?");
params.push(Box::new(device_id.clone()));
}
if let Some(since) = query.since {
conditions.push("timestamp >= ?");
params.push(Box::new(since.unix_timestamp()));
}
if let Some(until) = query.until {
conditions.push("timestamp <= ?");
params.push(Box::new(until.unix_timestamp()));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let sql = format!(
"SELECT
COUNT(*) as count,
MIN(co2) as min_co2, MAX(co2) as max_co2, AVG(co2) as avg_co2,
MIN(temperature) as min_temp, MAX(temperature) as max_temp, AVG(temperature) as avg_temp,
MIN(pressure) as min_press, MAX(pressure) as max_press, AVG(pressure) as avg_press,
MIN(humidity) as min_hum, MAX(humidity) as max_hum, AVG(humidity) as avg_hum,
MIN(radon) as min_radon, MAX(radon) as max_radon, AVG(radon) as avg_radon,
MIN(timestamp) as min_ts, MAX(timestamp) as max_ts
FROM history {}",
where_clause
);
let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let stats = self.conn.query_row(&sql, params_refs.as_slice(), |row| {
let count: i64 = row.get(0)?;
let min_ts: Option<i64> = row.get(16)?;
let max_ts: Option<i64> = row.get(17)?;
let time_range = match (min_ts, max_ts) {
(Some(min), Some(max)) => {
Some((timestamp_from_unix(min), timestamp_from_unix(max)))
}
_ => None,
};
Ok(HistoryStats {
count: count as u64,
min: HistoryAggregates {
co2: row.get::<_, Option<i64>>(1)?.map(|v| v as f64),
temperature: row.get(4)?,
pressure: row.get(7)?,
humidity: row.get::<_, Option<i64>>(10)?.map(|v| v as f64),
radon: row.get::<_, Option<i64>>(13)?.map(|v| v as f64),
},
max: HistoryAggregates {
co2: row.get::<_, Option<i64>>(2)?.map(|v| v as f64),
temperature: row.get(5)?,
pressure: row.get(8)?,
humidity: row.get::<_, Option<i64>>(11)?.map(|v| v as f64),
radon: row.get::<_, Option<i64>>(14)?.map(|v| v as f64),
},
avg: HistoryAggregates {
co2: row.get(3)?,
temperature: row.get(6)?,
pressure: row.get(9)?,
humidity: row.get(12)?,
radon: row.get(15)?,
},
time_range,
})
})?;
Ok(stats)
}
pub fn export_history_csv(&self, query: &HistoryQuery) -> Result<String> {
let records = self.query_history(query)?;
let mut output = String::new();
output.push_str("timestamp,device_id,co2,temperature,pressure,humidity,radon\n");
for record in records {
let timestamp = record
.timestamp
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_default();
let radon = record.radon.map(|r| r.to_string()).unwrap_or_default();
output.push_str(&format!(
"{},{},{},{:.1},{:.2},{},{}\n",
timestamp,
record.device_id,
record.co2,
record.temperature,
record.pressure,
record.humidity,
radon
));
}
Ok(output)
}
pub fn export_history_json(&self, query: &HistoryQuery) -> Result<String> {
let records = self.query_history(query)?;
let json = serde_json::to_string_pretty(&records)
.map_err(|e| Error::Database(rusqlite::Error::ToSqlConversionFailure(Box::new(e))))?;
Ok(json)
}
pub fn import_history_csv(&self, csv_data: &str) -> Result<ImportResult> {
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.flexible(true)
.trim(csv::Trim::All)
.from_reader(csv_data.as_bytes());
let mut total = 0;
let mut imported = 0;
let mut skipped = 0;
let mut errors = Vec::new();
let mut upserted_devices = std::collections::HashSet::new();
for (line_num, result) in reader.records().enumerate() {
total += 1;
let line = line_num + 2;
let record = match result {
Ok(r) => r,
Err(e) => {
errors.push(format!("Line {}: parse error - {}", line, e));
skipped += 1;
continue;
}
};
let timestamp_str = record.get(0).unwrap_or("").trim();
let device_id = record.get(1).unwrap_or("").trim();
let co2_str = record.get(2).unwrap_or("").trim();
let temp_str = record.get(3).unwrap_or("").trim();
let pressure_str = record.get(4).unwrap_or("").trim();
let humidity_str = record.get(5).unwrap_or("").trim();
let radon_str = record.get(6).unwrap_or("").trim();
if device_id.is_empty() {
errors.push(format!("Line {}: missing device_id", line));
skipped += 1;
continue;
}
let timestamp = match OffsetDateTime::parse(
timestamp_str,
&time::format_description::well_known::Rfc3339,
) {
Ok(ts) => ts,
Err(_) => {
errors.push(format!(
"Line {}: invalid timestamp '{}'",
line, timestamp_str
));
skipped += 1;
continue;
}
};
let co2: u16 = match co2_str.parse::<u16>() {
Ok(v) if v <= 10000 => v, Ok(v) => {
errors.push(format!(
"Line {}: CO2 value {} exceeds maximum of 10000 ppm",
line, v
));
skipped += 1;
continue;
}
Err(_) if co2_str.is_empty() => 0,
Err(_) => {
errors.push(format!("Line {}: invalid CO2 value '{}'", line, co2_str));
skipped += 1;
continue;
}
};
let temperature: f32 = match temp_str.parse::<f32>() {
Ok(v) if (-40.0..=100.0).contains(&v) => v,
Ok(v) => {
errors.push(format!(
"Line {}: temperature {} is outside valid range (-40 to 100°C)",
line, v
));
skipped += 1;
continue;
}
Err(_) if temp_str.is_empty() => 0.0,
Err(_) => {
errors.push(format!(
"Line {}: invalid temperature value '{}'",
line, temp_str
));
skipped += 1;
continue;
}
};
let pressure: f32 = match pressure_str.parse::<f32>() {
Ok(v) if v == 0.0 || (800.0..=1200.0).contains(&v) => v,
Ok(v) => {
errors.push(format!(
"Line {}: pressure {} is outside valid range (800-1200 hPa)",
line, v
));
skipped += 1;
continue;
}
Err(_) if pressure_str.is_empty() => 0.0,
Err(_) => {
errors.push(format!(
"Line {}: invalid pressure value '{}'",
line, pressure_str
));
skipped += 1;
continue;
}
};
let humidity: u8 = match humidity_str.parse::<u8>() {
Ok(v) if v <= 100 => v,
Ok(v) => {
errors.push(format!(
"Line {}: humidity {} exceeds maximum of 100%",
line, v
));
skipped += 1;
continue;
}
Err(_) if humidity_str.is_empty() => 0,
Err(_) => {
errors.push(format!(
"Line {}: invalid humidity value '{}'",
line, humidity_str
));
skipped += 1;
continue;
}
};
let radon: Option<u32> = if radon_str.is_empty() {
None
} else {
match radon_str.parse::<u32>() {
Ok(v) if v <= 100000 => Some(v), Ok(v) => {
errors.push(format!(
"Line {}: radon value {} exceeds maximum of 100000 Bq/m³",
line, v
));
skipped += 1;
continue;
}
Err(_) => {
errors.push(format!(
"Line {}: invalid radon value '{}'",
line, radon_str
));
skipped += 1;
continue;
}
}
};
let history_record = HistoryRecord {
timestamp,
co2,
temperature,
pressure,
humidity,
radon,
radiation_rate: None,
radiation_total: None,
};
if upserted_devices.insert(device_id.to_string()) {
self.upsert_device(device_id, None)?;
}
let count = self.insert_history(device_id, &[history_record])?;
imported += count;
if count == 0 {
skipped += 1; }
}
Ok(ImportResult {
total,
imported,
skipped,
errors,
})
}
pub fn import_history_json(&self, json_data: &str) -> Result<ImportResult> {
let records: Vec<StoredHistoryRecord> = serde_json::from_str(json_data)
.map_err(|e| Error::Database(rusqlite::Error::ToSqlConversionFailure(Box::new(e))))?;
let total = records.len();
let mut imported = 0;
let mut skipped = 0;
let mut upserted_devices = std::collections::HashSet::new();
for record in records {
let history_record = record.to_history();
if upserted_devices.insert(record.device_id.clone()) {
self.upsert_device(&record.device_id, None)?;
}
let count = self.insert_history(&record.device_id, &[history_record])?;
imported += count;
if count == 0 {
skipped += 1; }
}
Ok(ImportResult {
total,
imported,
skipped,
errors: Vec::new(),
})
}
}
#[derive(Debug, Clone)]
pub struct ImportResult {
pub total: usize,
pub imported: usize,
pub skipped: usize,
pub errors: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use aranet_types::Status;
fn create_test_reading() -> CurrentReading {
CurrentReading {
co2: 800,
temperature: 22.5,
pressure: 1013.0,
humidity: 45,
battery: 85,
status: Status::Green,
interval: 60,
age: 30,
captured_at: Some(OffsetDateTime::now_utc()),
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
}
}
#[test]
fn test_open_in_memory() {
let store = Store::open_in_memory().unwrap();
let devices = store.list_devices().unwrap();
assert!(devices.is_empty());
}
#[test]
fn test_upsert_device() {
let store = Store::open_in_memory().unwrap();
let device = store.upsert_device("test-device", Some("Test")).unwrap();
assert_eq!(device.id, "test-device");
assert_eq!(device.name, Some("Test".to_string()));
let device = store
.upsert_device("test-device", Some("New Name"))
.unwrap();
assert_eq!(device.name, Some("New Name".to_string()));
}
#[test]
fn test_insert_and_query_reading() {
let store = Store::open_in_memory().unwrap();
let reading = create_test_reading();
store.insert_reading("test-device", &reading).unwrap();
let query = ReadingQuery::new().device("test-device");
let readings = store.query_readings(&query).unwrap();
assert_eq!(readings.len(), 1);
assert_eq!(readings[0].co2, 800);
assert_eq!(readings[0].temperature, 22.5);
}
#[test]
fn test_get_latest_reading() {
let store = Store::open_in_memory().unwrap();
let mut reading1 = create_test_reading();
reading1.co2 = 700;
store.insert_reading("test-device", &reading1).unwrap();
let mut reading2 = create_test_reading();
reading2.co2 = 900;
store.insert_reading("test-device", &reading2).unwrap();
let latest = store.get_latest_reading("test-device").unwrap().unwrap();
assert_eq!(latest.co2, 900);
}
#[test]
fn test_insert_history_deduplication() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
let records = vec![
HistoryRecord {
timestamp: now,
co2: 800,
temperature: 22.0,
pressure: 1013.0,
humidity: 45,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: now, co2: 850,
temperature: 23.0,
pressure: 1014.0,
humidity: 46,
radon: None,
radiation_rate: None,
radiation_total: None,
},
];
let inserted = store.insert_history("test-device", &records).unwrap();
assert_eq!(inserted, 1);
let count = store.count_history(Some("test-device")).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_sync_state() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("test-device", None).unwrap();
let state = store.get_sync_state("test-device").unwrap();
assert!(state.is_none());
store.update_sync_state("test-device", 100, 100).unwrap();
let state = store.get_sync_state("test-device").unwrap().unwrap();
assert_eq!(state.last_history_index, Some(100));
assert_eq!(state.total_readings, Some(100));
assert!(state.last_sync_at.is_some());
}
#[test]
fn test_calculate_sync_start() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("test-device", None).unwrap();
let start = store.calculate_sync_start("test-device", 100).unwrap();
assert_eq!(start, 1);
let now = OffsetDateTime::now_utc();
let records = vec![HistoryRecord {
timestamp: now,
co2: 800,
temperature: 22.0,
pressure: 1013.0,
humidity: 45,
radon: None,
radiation_rate: None,
radiation_total: None,
}];
store.insert_history("test-device", &records).unwrap();
store.update_sync_state("test-device", 100, 100).unwrap();
let start = store.calculate_sync_start("test-device", 100).unwrap();
assert_eq!(start, 101);
let start = store.calculate_sync_start("test-device", 110).unwrap();
assert_eq!(start, 101);
}
#[test]
fn test_calculate_sync_start_cache_cleared() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("test-device", None).unwrap();
store.update_sync_state("test-device", 100, 100).unwrap();
let start = store.calculate_sync_start("test-device", 100).unwrap();
assert_eq!(start, 1);
}
#[test]
fn test_calculate_sync_start_buffer_wrapped() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("test-device", None).unwrap();
let old_time = OffsetDateTime::now_utc() - time::Duration::minutes(30);
let records = vec![HistoryRecord {
timestamp: old_time,
co2: 800,
temperature: 22.0,
pressure: 1013.0,
humidity: 45,
radon: None,
radiation_rate: None,
radiation_total: None,
}];
store.insert_history("test-device", &records).unwrap();
store.update_sync_state("test-device", 100, 100).unwrap();
let start = store.calculate_sync_start("test-device", 100).unwrap();
assert_eq!(start, 1);
}
#[test]
fn test_calculate_sync_start_index_overflow() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("test-device", None).unwrap();
store.update_sync_state("test-device", 500, 500).unwrap();
let start = store.calculate_sync_start("test-device", 200).unwrap();
assert_eq!(start, 1);
}
#[test]
fn test_import_history_csv() {
let store = Store::open_in_memory().unwrap();
let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
2024-01-15T10:30:00Z,Aranet4 17C3C,800,22.5,1013.25,45,
2024-01-15T11:30:00Z,Aranet4 17C3C,850,23.0,1014.00,48,
2024-01-15T12:30:00Z,AranetRn+ 306B8,0,21.0,1012.00,50,150
"#;
let result = store.import_history_csv(csv_data).unwrap();
assert_eq!(result.total, 3);
assert_eq!(result.imported, 3);
assert_eq!(result.skipped, 0);
assert!(result.errors.is_empty());
let devices = store.list_devices().unwrap();
assert_eq!(devices.len(), 2);
let query = HistoryQuery::new().device("Aranet4 17C3C");
let records = store.query_history(&query).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].co2, 850); assert_eq!(records[1].co2, 800);
let query = HistoryQuery::new().device("AranetRn+ 306B8");
let records = store.query_history(&query).unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].radon, Some(150));
}
#[test]
fn test_import_history_csv_deduplication() {
let store = Store::open_in_memory().unwrap();
let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
2024-01-15T10:30:00Z,test-device,800,22.5,1013.25,45,
"#;
let result = store.import_history_csv(csv_data).unwrap();
assert_eq!(result.imported, 1);
let result = store.import_history_csv(csv_data).unwrap();
assert_eq!(result.imported, 0);
assert_eq!(result.skipped, 1);
}
#[test]
fn test_import_history_csv_with_errors() {
let store = Store::open_in_memory().unwrap();
let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
invalid-timestamp,test-device,800,22.5,1013.25,45,
2024-01-15T10:30:00Z,,800,22.5,1013.25,45,
2024-01-15T11:30:00Z,valid-device,900,23.0,1014.00,50,
"#;
let result = store.import_history_csv(csv_data).unwrap();
assert_eq!(result.total, 3);
assert_eq!(result.imported, 1);
assert_eq!(result.skipped, 2);
assert_eq!(result.errors.len(), 2);
}
#[test]
fn test_import_history_json() {
let store = Store::open_in_memory().unwrap();
let json_data = r#"[
{
"id": 0,
"device_id": "Aranet4 17C3C",
"timestamp": "2024-01-15T10:30:00Z",
"synced_at": "2024-01-15T12:00:00Z",
"co2": 800,
"temperature": 22.5,
"pressure": 1013.25,
"humidity": 45,
"radon": null,
"radiation_rate": null,
"radiation_total": null
},
{
"id": 0,
"device_id": "Aranet4 17C3C",
"timestamp": "2024-01-15T11:30:00Z",
"synced_at": "2024-01-15T12:00:00Z",
"co2": 850,
"temperature": 23.0,
"pressure": 1014.0,
"humidity": 48,
"radon": null,
"radiation_rate": null,
"radiation_total": null
}
]"#;
let result = store.import_history_json(json_data).unwrap();
assert_eq!(result.total, 2);
assert_eq!(result.imported, 2);
assert_eq!(result.skipped, 0);
let query = HistoryQuery::new().device("Aranet4 17C3C");
let records = store.query_history(&query).unwrap();
assert_eq!(records.len(), 2);
}
#[test]
fn test_history_stats_empty() {
let store = Store::open_in_memory().unwrap();
let query = HistoryQuery::new();
let stats = store.history_stats(&query).unwrap();
assert_eq!(stats.count, 0);
assert!(stats.min.co2.is_none());
assert!(stats.max.co2.is_none());
assert!(stats.avg.co2.is_none());
assert!(stats.time_range.is_none());
}
#[test]
fn test_history_stats_single_record() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
let records = vec![HistoryRecord {
timestamp: now,
co2: 800,
temperature: 22.5,
pressure: 1013.0,
humidity: 45,
radon: None,
radiation_rate: None,
radiation_total: None,
}];
store.insert_history("test-device", &records).unwrap();
let query = HistoryQuery::new();
let stats = store.history_stats(&query).unwrap();
assert_eq!(stats.count, 1);
assert_eq!(stats.min.co2, Some(800.0));
assert_eq!(stats.max.co2, Some(800.0));
assert_eq!(stats.avg.co2, Some(800.0));
assert_eq!(stats.min.temperature, Some(22.5));
assert_eq!(stats.max.temperature, Some(22.5));
}
#[test]
fn test_history_stats_multiple_records() {
let store = Store::open_in_memory().unwrap();
let base_time = OffsetDateTime::now_utc();
let records = vec![
HistoryRecord {
timestamp: base_time,
co2: 600,
temperature: 20.0,
pressure: 1010.0,
humidity: 40,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: base_time + time::Duration::hours(1),
co2: 800,
temperature: 22.0,
pressure: 1012.0,
humidity: 50,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: base_time + time::Duration::hours(2),
co2: 1000,
temperature: 24.0,
pressure: 1014.0,
humidity: 60,
radon: None,
radiation_rate: None,
radiation_total: None,
},
];
store.insert_history("test-device", &records).unwrap();
let query = HistoryQuery::new();
let stats = store.history_stats(&query).unwrap();
assert_eq!(stats.count, 3);
assert_eq!(stats.min.co2, Some(600.0));
assert_eq!(stats.max.co2, Some(1000.0));
assert_eq!(stats.avg.co2, Some(800.0));
assert_eq!(stats.min.temperature, Some(20.0));
assert_eq!(stats.max.temperature, Some(24.0));
assert_eq!(stats.avg.humidity, Some(50.0));
}
#[test]
fn test_history_stats_with_device_filter() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
store
.insert_history(
"device-1",
&[HistoryRecord {
timestamp: now,
co2: 1200,
temperature: 25.0,
pressure: 1015.0,
humidity: 55,
radon: None,
radiation_rate: None,
radiation_total: None,
}],
)
.unwrap();
store
.insert_history(
"device-2",
&[HistoryRecord {
timestamp: now,
co2: 400,
temperature: 18.0,
pressure: 1010.0,
humidity: 35,
radon: None,
radiation_rate: None,
radiation_total: None,
}],
)
.unwrap();
let query = HistoryQuery::new().device("device-1");
let stats = store.history_stats(&query).unwrap();
assert_eq!(stats.count, 1);
assert_eq!(stats.avg.co2, Some(1200.0));
}
#[test]
fn test_history_stats_with_time_range() {
let store = Store::open_in_memory().unwrap();
let base_time = OffsetDateTime::now_utc();
let records = vec![
HistoryRecord {
timestamp: base_time - time::Duration::days(2),
co2: 500,
temperature: 19.0,
pressure: 1008.0,
humidity: 40,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: base_time,
co2: 800,
temperature: 22.0,
pressure: 1012.0,
humidity: 50,
radon: None,
radiation_rate: None,
radiation_total: None,
},
];
store.insert_history("test-device", &records).unwrap();
let query = HistoryQuery::new().since(base_time - time::Duration::hours(1));
let stats = store.history_stats(&query).unwrap();
assert_eq!(stats.count, 1);
assert_eq!(stats.avg.co2, Some(800.0));
}
#[test]
fn test_history_stats_with_radon() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
let records = vec![
HistoryRecord {
timestamp: now,
co2: 0,
temperature: 20.0,
pressure: 1010.0,
humidity: 50,
radon: Some(100),
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: now + time::Duration::hours(1),
co2: 0,
temperature: 20.0,
pressure: 1010.0,
humidity: 50,
radon: Some(200),
radiation_rate: None,
radiation_total: None,
},
];
store.insert_history("radon-device", &records).unwrap();
let query = HistoryQuery::new();
let stats = store.history_stats(&query).unwrap();
assert_eq!(stats.count, 2);
assert_eq!(stats.min.radon, Some(100.0));
assert_eq!(stats.max.radon, Some(200.0));
assert_eq!(stats.avg.radon, Some(150.0));
}
#[test]
fn test_history_stats_time_range_values() {
let store = Store::open_in_memory().unwrap();
use time::macros::datetime;
let start = datetime!(2024-01-01 00:00:00 UTC);
let end = datetime!(2024-01-08 00:00:00 UTC);
let records = vec![
HistoryRecord {
timestamp: start,
co2: 700,
temperature: 21.0,
pressure: 1011.0,
humidity: 45,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: end,
co2: 900,
temperature: 23.0,
pressure: 1013.0,
humidity: 55,
radon: None,
radiation_rate: None,
radiation_total: None,
},
];
store.insert_history("test-device", &records).unwrap();
let query = HistoryQuery::new();
let stats = store.history_stats(&query).unwrap();
let (min_ts, max_ts) = stats.time_range.unwrap();
assert_eq!(min_ts, start);
assert_eq!(max_ts, end);
}
#[test]
fn test_export_history_csv_empty() {
let store = Store::open_in_memory().unwrap();
let query = HistoryQuery::new();
let csv = store.export_history_csv(&query).unwrap();
assert!(csv.starts_with("timestamp,device_id,co2,temperature,pressure,humidity,radon\n"));
assert_eq!(csv.lines().count(), 1);
}
#[test]
fn test_export_history_csv_with_data() {
let store = Store::open_in_memory().unwrap();
let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
2024-01-15T10:30:00Z,test-device,800,22.5,1013.25,45,
"#;
store.import_history_csv(csv_data).unwrap();
let query = HistoryQuery::new();
let csv = store.export_history_csv(&query).unwrap();
assert!(csv.contains("test-device"));
assert!(csv.contains("800"));
assert!(csv.contains("22.5"));
assert!(csv.contains("1013.25"));
assert!(csv.contains("45"));
}
#[test]
fn test_export_history_csv_with_radon() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
let records = vec![HistoryRecord {
timestamp: now,
co2: 0,
temperature: 20.0,
pressure: 1010.0,
humidity: 50,
radon: Some(150),
radiation_rate: None,
radiation_total: None,
}];
store.insert_history("radon-device", &records).unwrap();
let query = HistoryQuery::new();
let csv = store.export_history_csv(&query).unwrap();
assert!(csv.contains("150"));
}
#[test]
fn test_export_history_csv_format() {
let store = Store::open_in_memory().unwrap();
let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
2024-01-15T10:30:00Z,device-1,800,22.5,1013.25,45,
2024-01-15T11:30:00Z,device-1,850,23.0,1014.00,48,
"#;
store.import_history_csv(csv_data).unwrap();
let query = HistoryQuery::new().oldest_first();
let csv = store.export_history_csv(&query).unwrap();
let lines: Vec<&str> = csv.lines().collect();
assert_eq!(lines.len(), 3);
assert!(lines[0].contains("timestamp"));
assert!(lines[0].contains("device_id"));
assert!(lines[0].contains("co2"));
assert!(lines[1].contains("800"));
assert!(lines[2].contains("850"));
}
#[test]
fn test_export_history_json_empty() {
let store = Store::open_in_memory().unwrap();
let query = HistoryQuery::new();
let json = store.export_history_json(&query).unwrap();
assert_eq!(json.trim(), "[]");
}
#[test]
fn test_export_history_json_with_data() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
let records = vec![HistoryRecord {
timestamp: now,
co2: 800,
temperature: 22.5,
pressure: 1013.0,
humidity: 45,
radon: None,
radiation_rate: None,
radiation_total: None,
}];
store.insert_history("test-device", &records).unwrap();
let query = HistoryQuery::new();
let json = store.export_history_json(&query).unwrap();
let parsed: Vec<serde_json::Value> = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0]["device_id"], "test-device");
assert_eq!(parsed[0]["co2"], 800);
}
#[test]
fn test_export_import_json_roundtrip() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
let original_records = vec![
HistoryRecord {
timestamp: now,
co2: 750,
temperature: 21.5,
pressure: 1012.0,
humidity: 48,
radon: None,
radiation_rate: None,
radiation_total: None,
},
HistoryRecord {
timestamp: now + time::Duration::hours(1),
co2: 850,
temperature: 22.5,
pressure: 1013.0,
humidity: 52,
radon: None,
radiation_rate: None,
radiation_total: None,
},
];
store
.insert_history("roundtrip-device", &original_records)
.unwrap();
let query = HistoryQuery::new()
.device("roundtrip-device")
.oldest_first();
let json = store.export_history_json(&query).unwrap();
let store2 = Store::open_in_memory().unwrap();
let result = store2.import_history_json(&json).unwrap();
assert_eq!(result.imported, 2);
let records = store2.query_history(&query).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].co2, 750);
assert_eq!(records[1].co2, 850);
}
#[test]
fn test_query_readings_with_pagination() {
let store = Store::open_in_memory().unwrap();
for i in 0..10 {
let mut reading = create_test_reading();
reading.co2 = 700 + i * 10;
store.insert_reading("paginated-device", &reading).unwrap();
}
let query = ReadingQuery::new()
.device("paginated-device")
.oldest_first()
.limit(3)
.offset(2);
let readings = store.query_readings(&query).unwrap();
assert_eq!(readings.len(), 3);
assert_eq!(readings[0].co2, 720); assert_eq!(readings[2].co2, 740); }
#[test]
fn test_query_readings_time_range() {
let store = Store::open_in_memory().unwrap();
let base_time = OffsetDateTime::now_utc();
let mut reading1 = create_test_reading();
reading1.captured_at = Some(base_time - time::Duration::days(2));
reading1.co2 = 600;
store.insert_reading("time-device", &reading1).unwrap();
let mut reading2 = create_test_reading();
reading2.captured_at = Some(base_time - time::Duration::hours(1));
reading2.co2 = 800;
store.insert_reading("time-device", &reading2).unwrap();
let mut reading3 = create_test_reading();
reading3.captured_at = Some(base_time);
reading3.co2 = 900;
store.insert_reading("time-device", &reading3).unwrap();
let query = ReadingQuery::new()
.device("time-device")
.since(base_time - time::Duration::days(1));
let readings = store.query_readings(&query).unwrap();
assert_eq!(readings.len(), 2);
}
#[test]
fn test_query_history_with_pagination() {
let store = Store::open_in_memory().unwrap();
let base_time = OffsetDateTime::now_utc();
let records: Vec<_> = (0..10)
.map(|i| HistoryRecord {
timestamp: base_time + time::Duration::hours(i),
co2: 700 + (i as u16) * 10,
temperature: 22.0,
pressure: 1013.0,
humidity: 50,
radon: None,
radiation_rate: None,
radiation_total: None,
})
.collect();
store.insert_history("paginated-device", &records).unwrap();
let query = HistoryQuery::new()
.device("paginated-device")
.oldest_first()
.limit(3)
.offset(2);
let results = store.query_history(&query).unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0].co2, 720);
assert_eq!(results[2].co2, 740);
}
#[test]
fn test_update_device_info() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("info-device", None).unwrap();
let info = aranet_types::DeviceInfo {
name: "My Aranet4".to_string(),
model: "Aranet4".to_string(),
serial: "ABC123".to_string(),
firmware: "v1.2.0".to_string(),
hardware: "1.0".to_string(),
..Default::default()
};
store.update_device_info("info-device", &info).unwrap();
let device = store.get_device("info-device").unwrap().unwrap();
assert_eq!(device.name, Some("My Aranet4".to_string()));
assert_eq!(device.serial, Some("ABC123".to_string()));
assert_eq!(device.firmware, Some("v1.2.0".to_string()));
assert_eq!(device.device_type, Some(aranet_types::DeviceType::Aranet4));
}
#[test]
fn test_update_device_info_aranet2() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("aranet2-device", None).unwrap();
let info = aranet_types::DeviceInfo {
name: "My Aranet2".to_string(),
model: "Aranet2".to_string(),
serial: "XYZ789".to_string(),
firmware: "v2.0.0".to_string(),
hardware: "2.0".to_string(),
..Default::default()
};
store.update_device_info("aranet2-device", &info).unwrap();
let device = store.get_device("aranet2-device").unwrap().unwrap();
assert_eq!(device.device_type, Some(aranet_types::DeviceType::Aranet2));
}
#[test]
fn test_update_device_info_radon() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("radon-device", None).unwrap();
let info = aranet_types::DeviceInfo {
name: "My AranetRn+".to_string(),
model: "AranetRn+ Radon".to_string(),
serial: "RAD001".to_string(),
firmware: "v1.0.0".to_string(),
hardware: "1.0".to_string(),
..Default::default()
};
store.update_device_info("radon-device", &info).unwrap();
let device = store.get_device("radon-device").unwrap().unwrap();
assert_eq!(
device.device_type,
Some(aranet_types::DeviceType::AranetRadon)
);
}
#[test]
fn test_update_device_metadata() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("meta-device", None).unwrap();
store
.update_device_metadata(
"meta-device",
Some("Kitchen Sensor"),
Some(aranet_types::DeviceType::Aranet4),
)
.unwrap();
let device = store.get_device("meta-device").unwrap().unwrap();
assert_eq!(device.name, Some("Kitchen Sensor".to_string()));
assert_eq!(device.device_type, Some(aranet_types::DeviceType::Aranet4));
}
#[test]
fn test_list_devices_ordered_by_last_seen() {
let store = Store::open_in_memory().unwrap();
store.upsert_device("device-a", Some("First")).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
store.upsert_device("device-b", Some("Second")).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
store.upsert_device("device-c", Some("Third")).unwrap();
let devices = store.list_devices().unwrap();
assert_eq!(devices.len(), 3);
assert!(devices[0].last_seen >= devices[1].last_seen);
assert!(devices[1].last_seen >= devices[2].last_seen);
}
#[test]
fn test_count_readings() {
let store = Store::open_in_memory().unwrap();
for _ in 0..5 {
store
.insert_reading("device-1", &create_test_reading())
.unwrap();
}
for _ in 0..3 {
store
.insert_reading("device-2", &create_test_reading())
.unwrap();
}
assert_eq!(store.count_readings(Some("device-1")).unwrap(), 5);
assert_eq!(store.count_readings(Some("device-2")).unwrap(), 3);
assert_eq!(store.count_readings(Some("nonexistent")).unwrap(), 0);
assert_eq!(store.count_readings(None).unwrap(), 8);
}
#[test]
fn test_count_history() {
let store = Store::open_in_memory().unwrap();
let now = OffsetDateTime::now_utc();
let records: Vec<_> = (0..5)
.map(|i| HistoryRecord {
timestamp: now + time::Duration::hours(i),
co2: 800,
temperature: 22.0,
pressure: 1013.0,
humidity: 50,
radon: None,
radiation_rate: None,
radiation_total: None,
})
.collect();
store.insert_history("device-1", &records).unwrap();
store.insert_history("device-2", &records[..3]).unwrap();
assert_eq!(store.count_history(Some("device-1")).unwrap(), 5);
assert_eq!(store.count_history(Some("device-2")).unwrap(), 3);
assert_eq!(store.count_history(None).unwrap(), 8);
}
#[test]
fn test_reading_with_all_sensor_types() {
let store = Store::open_in_memory().unwrap();
let reading = create_test_reading();
store.insert_reading("aranet4", &reading).unwrap();
let mut radon_reading = create_test_reading();
radon_reading.co2 = 0;
radon_reading.radon = Some(150);
store.insert_reading("aranet-rn", &radon_reading).unwrap();
let mut rad_reading = create_test_reading();
rad_reading.co2 = 0;
rad_reading.radiation_rate = Some(0.12);
rad_reading.radiation_total = Some(0.003);
store.insert_reading("aranet-rad", &rad_reading).unwrap();
let aranet4_readings = store
.query_readings(&ReadingQuery::new().device("aranet4"))
.unwrap();
assert_eq!(aranet4_readings.len(), 1);
assert_eq!(aranet4_readings[0].co2, 800);
let radon_readings = store
.query_readings(&ReadingQuery::new().device("aranet-rn"))
.unwrap();
assert_eq!(radon_readings.len(), 1);
assert_eq!(radon_readings[0].radon, Some(150));
let rad_readings = store
.query_readings(&ReadingQuery::new().device("aranet-rad"))
.unwrap();
assert_eq!(rad_readings.len(), 1);
assert_eq!(rad_readings[0].radiation_rate, Some(0.12));
}
#[test]
fn test_device_not_found_error() {
let store = Store::open_in_memory().unwrap();
let result = store.get_device("nonexistent");
assert!(result.unwrap().is_none());
}
#[test]
fn test_empty_device_name() {
let store = Store::open_in_memory().unwrap();
let info = aranet_types::DeviceInfo {
name: "".to_string(),
model: "Aranet4".to_string(),
..Default::default()
};
store.upsert_device("empty-name-device", None).unwrap();
store
.update_device_info("empty-name-device", &info)
.unwrap();
let device = store.get_device("empty-name-device").unwrap().unwrap();
assert!(device.name.is_none());
}
#[test]
fn test_import_csv_invalid_json() {
let store = Store::open_in_memory().unwrap();
let result = store.import_history_json("not valid json");
assert!(result.is_err());
}
#[test]
fn test_reading_with_all_status_types() {
let store = Store::open_in_memory().unwrap();
for status in [Status::Green, Status::Yellow, Status::Red, Status::Error] {
let mut reading = create_test_reading();
reading.status = status;
let device_id = format!("status-{:?}", status);
store.insert_reading(&device_id, &reading).unwrap();
let stored = store.get_latest_reading(&device_id).unwrap().unwrap();
assert_eq!(stored.status, status);
}
}
#[tokio::test]
async fn test_concurrent_reading_inserts() {
use std::sync::Arc;
use tokio::sync::Mutex;
let store = Arc::new(Mutex::new(Store::open_in_memory().unwrap()));
let mut handles = Vec::new();
for task_id in 0..10 {
let store = Arc::clone(&store);
handles.push(tokio::spawn(async move {
for i in 0..10 {
let reading = CurrentReading {
co2: 400 + (task_id * 100) + i,
temperature: 20.0 + (task_id as f32),
pressure: 1013.0,
humidity: 50,
battery: 85,
status: Status::Green,
interval: 60,
age: 0,
captured_at: Some(OffsetDateTime::now_utc()),
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
};
let device_id = format!("concurrent-device-{}", task_id);
let guard = store.lock().await;
guard.insert_reading(&device_id, &reading).unwrap();
}
}));
}
for handle in handles {
handle.await.unwrap();
}
let guard = store.lock().await;
let total = guard.count_readings(None).unwrap();
assert_eq!(total, 100); }
#[tokio::test]
async fn test_concurrent_reads_and_writes() {
use std::sync::Arc;
use tokio::sync::Mutex;
let store = Arc::new(Mutex::new(Store::open_in_memory().unwrap()));
{
let guard = store.lock().await;
for i in 0..10 {
let reading = CurrentReading {
co2: 500 + i * 50,
temperature: 22.0,
pressure: 1013.0,
humidity: 50,
battery: 85,
status: Status::Green,
interval: 60,
age: 0,
captured_at: Some(OffsetDateTime::now_utc()),
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
};
guard.insert_reading("shared-device", &reading).unwrap();
}
}
let mut handles = Vec::new();
for _ in 0..5 {
let store = Arc::clone(&store);
handles.push(tokio::spawn(async move {
for _ in 0..10 {
let guard = store.lock().await;
let readings = guard
.query_readings(&ReadingQuery::new().device("shared-device"))
.unwrap();
assert!(!readings.is_empty());
drop(guard);
tokio::task::yield_now().await;
}
}));
}
for task_id in 0..3 {
let store = Arc::clone(&store);
handles.push(tokio::spawn(async move {
for i in 0..5 {
let reading = CurrentReading {
co2: 1000 + (task_id * 100) + i,
temperature: 25.0,
pressure: 1015.0,
humidity: 55,
battery: 80,
status: Status::Yellow,
interval: 60,
age: 0,
captured_at: Some(OffsetDateTime::now_utc()),
radon: None,
radiation_rate: None,
radiation_total: None,
radon_avg_24h: None,
radon_avg_7d: None,
radon_avg_30d: None,
};
let guard = store.lock().await;
guard.insert_reading("shared-device", &reading).unwrap();
drop(guard);
tokio::task::yield_now().await;
}
}));
}
for handle in handles {
handle.await.unwrap();
}
let guard = store.lock().await;
let total = guard.count_readings(Some("shared-device")).unwrap();
assert_eq!(total, 10 + (3 * 5)); }
#[tokio::test]
async fn test_concurrent_device_upserts() {
use std::sync::Arc;
use tokio::sync::Mutex;
let store = Arc::new(Mutex::new(Store::open_in_memory().unwrap()));
let mut handles = Vec::new();
for i in 0..20 {
let store = Arc::clone(&store);
handles.push(tokio::spawn(async move {
let guard = store.lock().await;
guard
.upsert_device("contested-device", Some(&format!("Name-{}", i)))
.unwrap();
}));
}
for handle in handles {
handle.await.unwrap();
}
let guard = store.lock().await;
let device = guard.get_device("contested-device").unwrap().unwrap();
assert!(device.name.unwrap().starts_with("Name-"));
}
}