1use std::path::Path;
4
5use rusqlite::{Connection, OptionalExtension};
6use time::OffsetDateTime;
7use tracing::{debug, info};
8
9use aranet_types::{CurrentReading, DeviceInfo, DeviceType, HistoryRecord, Status};
10
11use crate::error::{Error, Result};
12use crate::models::{StoredDevice, StoredHistoryRecord, StoredReading, SyncState};
13use crate::queries::{HistoryQuery, ReadingQuery};
14use crate::schema;
15
16pub struct Store {
18 conn: Connection,
19}
20
21impl Store {
22 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
24 let path = path.as_ref();
25
26 if let Some(parent) = path.parent()
28 && !parent.exists()
29 {
30 std::fs::create_dir_all(parent).map_err(|e| Error::CreateDirectory {
31 path: parent.to_path_buf(),
32 source: e,
33 })?;
34 }
35
36 info!("Opening database at {}", path.display());
37 let conn = Connection::open(path)?;
38
39 conn.execute_batch(
41 "PRAGMA foreign_keys = ON;
42 PRAGMA journal_mode = WAL;
43 PRAGMA synchronous = NORMAL;",
44 )?;
45
46 schema::initialize(&conn)?;
48
49 Ok(Self { conn })
50 }
51
52 pub fn open_default() -> Result<Self> {
54 Self::open(crate::default_db_path())
55 }
56
57 pub fn open_in_memory() -> Result<Self> {
59 let conn = Connection::open_in_memory()?;
60 schema::initialize(&conn)?;
61 Ok(Self { conn })
62 }
63
64 pub fn upsert_device(&self, device_id: &str, name: Option<&str>) -> Result<StoredDevice> {
68 let now = OffsetDateTime::now_utc().unix_timestamp();
69
70 self.conn.execute(
71 "INSERT INTO devices (id, name, first_seen, last_seen) VALUES (?1, ?2, ?3, ?3)
72 ON CONFLICT(id) DO UPDATE SET
73 name = COALESCE(?2, name),
74 last_seen = ?3",
75 rusqlite::params![device_id, name, now],
76 )?;
77
78 self.get_device(device_id)?
79 .ok_or_else(|| Error::DeviceNotFound(device_id.to_string()))
80 }
81
82 pub fn update_device_metadata(
87 &self,
88 device_id: &str,
89 name: Option<&str>,
90 device_type: Option<DeviceType>,
91 ) -> Result<()> {
92 let device_type_str = device_type.map(|dt| format!("{:?}", dt));
93 let now = OffsetDateTime::now_utc().unix_timestamp();
94
95 self.conn.execute(
96 "UPDATE devices SET
97 name = COALESCE(?2, name),
98 device_type = COALESCE(?3, device_type),
99 last_seen = ?4
100 WHERE id = ?1",
101 rusqlite::params![device_id, name, device_type_str, now],
102 )?;
103
104 Ok(())
105 }
106
107 pub fn update_device_info(&self, device_id: &str, info: &DeviceInfo) -> Result<()> {
109 let device_type = if info.model.contains("Aranet4") {
111 Some("Aranet4")
112 } else if info.model.contains("Aranet2") {
113 Some("Aranet2")
114 } else if info.model.contains("Radon") || info.model.contains("Rn") {
115 Some("AranetRadon")
116 } else if info.model.contains("Radiation") {
117 Some("AranetRadiation")
118 } else {
119 None
120 };
121
122 let name = if info.name.is_empty() {
123 None
124 } else {
125 Some(&info.name)
126 };
127
128 self.conn.execute(
129 "UPDATE devices SET
130 name = COALESCE(?2, name),
131 device_type = COALESCE(?3, device_type),
132 serial = COALESCE(?4, serial),
133 firmware = COALESCE(?5, firmware),
134 hardware = COALESCE(?6, hardware),
135 last_seen = ?7
136 WHERE id = ?1",
137 rusqlite::params![
138 device_id,
139 name,
140 device_type,
141 &info.serial,
142 &info.firmware,
143 &info.hardware,
144 OffsetDateTime::now_utc().unix_timestamp()
145 ],
146 )?;
147
148 Ok(())
149 }
150
151 pub fn get_device(&self, device_id: &str) -> Result<Option<StoredDevice>> {
153 let mut stmt = self.conn.prepare(
154 "SELECT id, name, device_type, serial, firmware, hardware, first_seen, last_seen
155 FROM devices WHERE id = ?",
156 )?;
157
158 let device = stmt
159 .query_row([device_id], |row| {
160 Ok(StoredDevice {
161 id: row.get(0)?,
162 name: row.get(1)?,
163 device_type: row
164 .get::<_, Option<String>>(2)?
165 .and_then(|s| parse_device_type(&s)),
166 serial: row.get(3)?,
167 firmware: row.get(4)?,
168 hardware: row.get(5)?,
169 first_seen: OffsetDateTime::from_unix_timestamp(row.get(6)?).unwrap(),
170 last_seen: OffsetDateTime::from_unix_timestamp(row.get(7)?).unwrap(),
171 })
172 })
173 .optional()?;
174
175 Ok(device)
176 }
177
178 pub fn list_devices(&self) -> Result<Vec<StoredDevice>> {
180 let mut stmt = self.conn.prepare(
181 "SELECT id, name, device_type, serial, firmware, hardware, first_seen, last_seen
182 FROM devices ORDER BY last_seen DESC",
183 )?;
184
185 let devices = stmt
186 .query_map([], |row| {
187 Ok(StoredDevice {
188 id: row.get(0)?,
189 name: row.get(1)?,
190 device_type: row
191 .get::<_, Option<String>>(2)?
192 .and_then(|s| parse_device_type(&s)),
193 serial: row.get(3)?,
194 firmware: row.get(4)?,
195 hardware: row.get(5)?,
196 first_seen: OffsetDateTime::from_unix_timestamp(row.get(6)?).unwrap(),
197 last_seen: OffsetDateTime::from_unix_timestamp(row.get(7)?).unwrap(),
198 })
199 })?
200 .collect::<std::result::Result<Vec<_>, _>>()?;
201
202 Ok(devices)
203 }
204}
205
206fn parse_device_type(s: &str) -> Option<DeviceType> {
207 match s {
208 "Aranet4" => Some(DeviceType::Aranet4),
209 "Aranet2" => Some(DeviceType::Aranet2),
210 "AranetRadon" => Some(DeviceType::AranetRadon),
211 "AranetRadiation" => Some(DeviceType::AranetRadiation),
212 _ => None,
213 }
214}
215
216fn parse_status(s: &str) -> Status {
217 match s {
218 "Green" => Status::Green,
219 "Yellow" => Status::Yellow,
220 "Red" => Status::Red,
221 "Error" => Status::Error,
222 _ => Status::Green,
223 }
224}
225
226impl Store {
228 pub fn insert_reading(&self, device_id: &str, reading: &CurrentReading) -> Result<i64> {
230 self.upsert_device(device_id, None)?;
232
233 let captured_at = reading
234 .captured_at
235 .unwrap_or_else(OffsetDateTime::now_utc)
236 .unix_timestamp();
237
238 self.conn.execute(
239 "INSERT INTO readings (device_id, captured_at, co2, temperature, pressure,
240 humidity, battery, status, radon, radiation_rate, radiation_total)
241 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
242 rusqlite::params![
243 device_id,
244 captured_at,
245 reading.co2,
246 reading.temperature,
247 reading.pressure,
248 reading.humidity,
249 reading.battery,
250 format!("{:?}", reading.status),
251 reading.radon,
252 reading.radiation_rate,
253 reading.radiation_total,
254 ],
255 )?;
256
257 Ok(self.conn.last_insert_rowid())
258 }
259
260 pub fn query_readings(&self, query: &ReadingQuery) -> Result<Vec<StoredReading>> {
262 let sql = query.build_sql();
263 let (_, params) = query.build_where();
264
265 debug!("Executing query: {}", sql);
266
267 let params_ref: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
268
269 let mut stmt = self.conn.prepare(&sql)?;
270 let readings = stmt
271 .query_map(params_ref.as_slice(), |row| {
272 Ok(StoredReading {
273 id: row.get(0)?,
274 device_id: row.get(1)?,
275 captured_at: OffsetDateTime::from_unix_timestamp(row.get(2)?).unwrap(),
276 co2: row.get::<_, i64>(3)? as u16,
277 temperature: row.get(4)?,
278 pressure: row.get(5)?,
279 humidity: row.get::<_, i64>(6)? as u8,
280 battery: row.get::<_, i64>(7)? as u8,
281 status: parse_status(&row.get::<_, String>(8)?),
282 radon: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
283 radiation_rate: row.get(10)?,
284 radiation_total: row.get(11)?,
285 })
286 })?
287 .collect::<std::result::Result<Vec<_>, _>>()?;
288
289 Ok(readings)
290 }
291
292 pub fn get_latest_reading(&self, device_id: &str) -> Result<Option<StoredReading>> {
294 let query = ReadingQuery::new().device(device_id).limit(1);
295 let mut readings = self.query_readings(&query)?;
296 Ok(readings.pop())
297 }
298
299 pub fn count_readings(&self, device_id: Option<&str>) -> Result<u64> {
301 let count: i64 = match device_id {
302 Some(id) => self.conn.query_row(
303 "SELECT COUNT(*) FROM readings WHERE device_id = ?",
304 [id],
305 |row| row.get(0),
306 )?,
307 None => self
308 .conn
309 .query_row("SELECT COUNT(*) FROM readings", [], |row| row.get(0))?,
310 };
311
312 Ok(count as u64)
313 }
314}
315
316impl Store {
318 pub fn insert_history(&self, device_id: &str, records: &[HistoryRecord]) -> Result<usize> {
320 self.upsert_device(device_id, None)?;
322
323 let synced_at = OffsetDateTime::now_utc().unix_timestamp();
324 let mut inserted = 0;
325
326 for record in records {
327 let result = self.conn.execute(
328 "INSERT OR IGNORE INTO history (device_id, timestamp, synced_at, co2,
329 temperature, pressure, humidity, radon, radiation_rate, radiation_total)
330 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
331 rusqlite::params![
332 device_id,
333 record.timestamp.unix_timestamp(),
334 synced_at,
335 record.co2,
336 record.temperature,
337 record.pressure,
338 record.humidity,
339 record.radon,
340 record.radiation_rate,
341 record.radiation_total,
342 ],
343 )?;
344 inserted += result;
345 }
346
347 info!(
348 "Inserted {} new history records for {}",
349 inserted, device_id
350 );
351 Ok(inserted)
352 }
353
354 pub fn query_history(&self, query: &HistoryQuery) -> Result<Vec<StoredHistoryRecord>> {
356 let mut conditions = Vec::new();
357 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
358
359 if let Some(ref device_id) = query.device_id {
360 conditions.push("device_id = ?");
361 params.push(Box::new(device_id.clone()));
362 }
363
364 if let Some(since) = query.since {
365 conditions.push("timestamp >= ?");
366 params.push(Box::new(since.unix_timestamp()));
367 }
368
369 if let Some(until) = query.until {
370 conditions.push("timestamp <= ?");
371 params.push(Box::new(until.unix_timestamp()));
372 }
373
374 let where_clause = if conditions.is_empty() {
375 String::new()
376 } else {
377 format!("WHERE {}", conditions.join(" AND "))
378 };
379
380 let order = if query.newest_first { "DESC" } else { "ASC" };
381
382 let mut sql = format!(
383 "SELECT id, device_id, timestamp, synced_at, co2, temperature, pressure,
384 humidity, radon, radiation_rate, radiation_total
385 FROM history {} ORDER BY timestamp {}",
386 where_clause, order
387 );
388
389 if let Some(limit) = query.limit {
390 sql.push_str(&format!(" LIMIT {}", limit));
391 }
392
393 if let Some(offset) = query.offset {
394 sql.push_str(&format!(" OFFSET {}", offset));
395 }
396
397 let params_ref: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
398
399 let mut stmt = self.conn.prepare(&sql)?;
400 let records = stmt
401 .query_map(params_ref.as_slice(), |row| {
402 Ok(StoredHistoryRecord {
403 id: row.get(0)?,
404 device_id: row.get(1)?,
405 timestamp: OffsetDateTime::from_unix_timestamp(row.get(2)?).unwrap(),
406 synced_at: OffsetDateTime::from_unix_timestamp(row.get(3)?).unwrap(),
407 co2: row.get::<_, i64>(4)? as u16,
408 temperature: row.get(5)?,
409 pressure: row.get(6)?,
410 humidity: row.get::<_, i64>(7)? as u8,
411 radon: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
412 radiation_rate: row.get(9)?,
413 radiation_total: row.get(10)?,
414 })
415 })?
416 .collect::<std::result::Result<Vec<_>, _>>()?;
417
418 Ok(records)
419 }
420
421 pub fn count_history(&self, device_id: Option<&str>) -> Result<u64> {
423 let count: i64 = match device_id {
424 Some(id) => self.conn.query_row(
425 "SELECT COUNT(*) FROM history WHERE device_id = ?",
426 [id],
427 |row| row.get(0),
428 )?,
429 None => self
430 .conn
431 .query_row("SELECT COUNT(*) FROM history", [], |row| row.get(0))?,
432 };
433
434 Ok(count as u64)
435 }
436}
437
438impl Store {
440 pub fn get_sync_state(&self, device_id: &str) -> Result<Option<SyncState>> {
442 let mut stmt = self.conn.prepare(
443 "SELECT device_id, last_history_index, total_readings, last_sync_at
444 FROM sync_state WHERE device_id = ?",
445 )?;
446
447 let state = stmt
448 .query_row([device_id], |row| {
449 Ok(SyncState {
450 device_id: row.get(0)?,
451 last_history_index: row.get::<_, Option<i64>>(1)?.map(|v| v as u16),
452 total_readings: row.get::<_, Option<i64>>(2)?.map(|v| v as u16),
453 last_sync_at: row
454 .get::<_, Option<i64>>(3)?
455 .map(|ts| OffsetDateTime::from_unix_timestamp(ts).unwrap()),
456 })
457 })
458 .optional()?;
459
460 Ok(state)
461 }
462
463 pub fn update_sync_state(
465 &self,
466 device_id: &str,
467 last_index: u16,
468 total_readings: u16,
469 ) -> Result<()> {
470 let now = OffsetDateTime::now_utc().unix_timestamp();
471
472 self.conn.execute(
473 "INSERT INTO sync_state (device_id, last_history_index, total_readings, last_sync_at)
474 VALUES (?1, ?2, ?3, ?4)
475 ON CONFLICT(device_id) DO UPDATE SET
476 last_history_index = ?2,
477 total_readings = ?3,
478 last_sync_at = ?4",
479 rusqlite::params![device_id, last_index, total_readings, now],
480 )?;
481
482 debug!(
483 "Updated sync state for {}: index={}, total={}",
484 device_id, last_index, total_readings
485 );
486
487 Ok(())
488 }
489
490 pub fn calculate_sync_start(&self, device_id: &str, current_total: u16) -> Result<u16> {
496 let state = self.get_sync_state(device_id)?;
497
498 match state {
499 Some(s) if s.total_readings == Some(current_total) => {
500 debug!("No new readings for {}", device_id);
502 Ok(current_total + 1) }
504 Some(s) if s.last_history_index.is_some() => {
505 let last_index = s.last_history_index.unwrap();
507 let prev_total = s.total_readings.unwrap_or(0);
508 let new_count = current_total.saturating_sub(prev_total);
509
510 if new_count > 0 {
511 let start = last_index.saturating_add(1);
513 debug!(
514 "Incremental sync for {}: {} new readings, starting at {}",
515 device_id, new_count, start
516 );
517 Ok(start)
518 } else {
519 Ok(current_total + 1)
520 }
521 }
522 _ => {
523 debug!(
525 "First sync for {}: downloading all {} readings",
526 device_id, current_total
527 );
528 Ok(1)
529 }
530 }
531 }
532}
533
534#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
536pub struct HistoryStats {
537 pub count: u64,
539 pub min: HistoryAggregates,
541 pub max: HistoryAggregates,
543 pub avg: HistoryAggregates,
545 pub time_range: Option<(OffsetDateTime, OffsetDateTime)>,
547}
548
549#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
551pub struct HistoryAggregates {
552 pub co2: Option<f64>,
554 pub temperature: Option<f64>,
556 pub pressure: Option<f64>,
558 pub humidity: Option<f64>,
560 pub radon: Option<f64>,
562}
563
564impl Store {
566 pub fn history_stats(&self, query: &HistoryQuery) -> Result<HistoryStats> {
568 let mut conditions = Vec::new();
569 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
570
571 if let Some(ref device_id) = query.device_id {
572 conditions.push("device_id = ?");
573 params.push(Box::new(device_id.clone()));
574 }
575
576 if let Some(since) = query.since {
577 conditions.push("timestamp >= ?");
578 params.push(Box::new(since.unix_timestamp()));
579 }
580
581 if let Some(until) = query.until {
582 conditions.push("timestamp <= ?");
583 params.push(Box::new(until.unix_timestamp()));
584 }
585
586 let where_clause = if conditions.is_empty() {
587 String::new()
588 } else {
589 format!("WHERE {}", conditions.join(" AND "))
590 };
591
592 let sql = format!(
593 "SELECT
594 COUNT(*) as count,
595 MIN(co2) as min_co2, MAX(co2) as max_co2, AVG(co2) as avg_co2,
596 MIN(temperature) as min_temp, MAX(temperature) as max_temp, AVG(temperature) as avg_temp,
597 MIN(pressure) as min_press, MAX(pressure) as max_press, AVG(pressure) as avg_press,
598 MIN(humidity) as min_hum, MAX(humidity) as max_hum, AVG(humidity) as avg_hum,
599 MIN(radon) as min_radon, MAX(radon) as max_radon, AVG(radon) as avg_radon,
600 MIN(timestamp) as min_ts, MAX(timestamp) as max_ts
601 FROM history {}",
602 where_clause
603 );
604
605 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
606
607 let stats = self.conn.query_row(&sql, params_refs.as_slice(), |row| {
608 let count: i64 = row.get(0)?;
609 let min_ts: Option<i64> = row.get(16)?;
610 let max_ts: Option<i64> = row.get(17)?;
611
612 let time_range = match (min_ts, max_ts) {
613 (Some(min), Some(max)) => Some((
614 OffsetDateTime::from_unix_timestamp(min).unwrap(),
615 OffsetDateTime::from_unix_timestamp(max).unwrap(),
616 )),
617 _ => None,
618 };
619
620 Ok(HistoryStats {
621 count: count as u64,
622 min: HistoryAggregates {
623 co2: row.get::<_, Option<i64>>(1)?.map(|v| v as f64),
624 temperature: row.get(4)?,
625 pressure: row.get(7)?,
626 humidity: row.get::<_, Option<i64>>(10)?.map(|v| v as f64),
627 radon: row.get::<_, Option<i64>>(13)?.map(|v| v as f64),
628 },
629 max: HistoryAggregates {
630 co2: row.get::<_, Option<i64>>(2)?.map(|v| v as f64),
631 temperature: row.get(5)?,
632 pressure: row.get(8)?,
633 humidity: row.get::<_, Option<i64>>(11)?.map(|v| v as f64),
634 radon: row.get::<_, Option<i64>>(14)?.map(|v| v as f64),
635 },
636 avg: HistoryAggregates {
637 co2: row.get(3)?,
638 temperature: row.get(6)?,
639 pressure: row.get(9)?,
640 humidity: row.get(12)?,
641 radon: row.get(15)?,
642 },
643 time_range,
644 })
645 })?;
646
647 Ok(stats)
648 }
649
650 pub fn export_history_csv(&self, query: &HistoryQuery) -> Result<String> {
652 let records = self.query_history(query)?;
653 let mut output = String::new();
654
655 output.push_str("timestamp,device_id,co2,temperature,pressure,humidity,radon\n");
657
658 for record in records {
660 let timestamp = record
661 .timestamp
662 .format(&time::format_description::well_known::Rfc3339)
663 .unwrap_or_default();
664 let radon = record.radon.map(|r| r.to_string()).unwrap_or_default();
665
666 output.push_str(&format!(
667 "{},{},{},{:.1},{:.2},{},{}\n",
668 timestamp,
669 record.device_id,
670 record.co2,
671 record.temperature,
672 record.pressure,
673 record.humidity,
674 radon
675 ));
676 }
677
678 Ok(output)
679 }
680
681 pub fn export_history_json(&self, query: &HistoryQuery) -> Result<String> {
683 let records = self.query_history(query)?;
684 let json = serde_json::to_string_pretty(&records)
685 .map_err(|e| Error::Database(rusqlite::Error::ToSqlConversionFailure(Box::new(e))))?;
686 Ok(json)
687 }
688
689 pub fn import_history_csv(&self, csv_data: &str) -> Result<ImportResult> {
699 let mut reader = csv::ReaderBuilder::new()
700 .has_headers(true)
701 .flexible(true)
702 .trim(csv::Trim::All)
703 .from_reader(csv_data.as_bytes());
704
705 let mut total = 0;
706 let mut imported = 0;
707 let mut skipped = 0;
708 let mut errors = Vec::new();
709
710 for (line_num, result) in reader.records().enumerate() {
711 total += 1;
712 let line = line_num + 2; let record = match result {
715 Ok(r) => r,
716 Err(e) => {
717 errors.push(format!("Line {}: parse error - {}", line, e));
718 skipped += 1;
719 continue;
720 }
721 };
722
723 let timestamp_str = record.get(0).unwrap_or("").trim();
725 let device_id = record.get(1).unwrap_or("").trim();
726 let co2_str = record.get(2).unwrap_or("").trim();
727 let temp_str = record.get(3).unwrap_or("").trim();
728 let pressure_str = record.get(4).unwrap_or("").trim();
729 let humidity_str = record.get(5).unwrap_or("").trim();
730 let radon_str = record.get(6).unwrap_or("").trim();
731
732 if device_id.is_empty() {
734 errors.push(format!("Line {}: missing device_id", line));
735 skipped += 1;
736 continue;
737 }
738
739 let timestamp = match OffsetDateTime::parse(
741 timestamp_str,
742 &time::format_description::well_known::Rfc3339,
743 ) {
744 Ok(ts) => ts,
745 Err(_) => {
746 errors.push(format!(
747 "Line {}: invalid timestamp '{}'",
748 line, timestamp_str
749 ));
750 skipped += 1;
751 continue;
752 }
753 };
754
755 let co2: u16 = co2_str.parse().unwrap_or(0);
757 let temperature: f32 = temp_str.parse().unwrap_or(0.0);
758 let pressure: f32 = pressure_str.parse().unwrap_or(0.0);
759 let humidity: u8 = humidity_str.parse().unwrap_or(0);
760 let radon: Option<u32> = if radon_str.is_empty() {
761 None
762 } else {
763 radon_str.parse().ok()
764 };
765
766 let history_record = HistoryRecord {
768 timestamp,
769 co2,
770 temperature,
771 pressure,
772 humidity,
773 radon,
774 radiation_rate: None,
775 radiation_total: None,
776 };
777
778 self.upsert_device(device_id, None)?;
780 let count = self.insert_history(device_id, &[history_record])?;
781 imported += count;
782 if count == 0 {
783 skipped += 1; }
785 }
786
787 Ok(ImportResult {
788 total,
789 imported,
790 skipped,
791 errors,
792 })
793 }
794
795 pub fn import_history_json(&self, json_data: &str) -> Result<ImportResult> {
801 let records: Vec<StoredHistoryRecord> = serde_json::from_str(json_data)
802 .map_err(|e| Error::Database(rusqlite::Error::ToSqlConversionFailure(Box::new(e))))?;
803
804 let total = records.len();
805 let mut imported = 0;
806 let mut skipped = 0;
807
808 for record in records {
809 let history_record = record.to_history();
811
812 self.upsert_device(&record.device_id, None)?;
814 let count = self.insert_history(&record.device_id, &[history_record])?;
815 imported += count;
816 if count == 0 {
817 skipped += 1; }
819 }
820
821 Ok(ImportResult {
822 total,
823 imported,
824 skipped,
825 errors: Vec::new(),
826 })
827 }
828}
829
830#[derive(Debug, Clone)]
832pub struct ImportResult {
833 pub total: usize,
835 pub imported: usize,
837 pub skipped: usize,
839 pub errors: Vec<String>,
841}
842
843#[cfg(test)]
844mod tests {
845 use super::*;
846 use aranet_types::Status;
847
848 fn create_test_reading() -> CurrentReading {
849 CurrentReading {
850 co2: 800,
851 temperature: 22.5,
852 pressure: 1013.0,
853 humidity: 45,
854 battery: 85,
855 status: Status::Green,
856 interval: 60,
857 age: 30,
858 captured_at: Some(OffsetDateTime::now_utc()),
859 radon: None,
860 radiation_rate: None,
861 radiation_total: None,
862 radon_avg_24h: None,
863 radon_avg_7d: None,
864 radon_avg_30d: None,
865 }
866 }
867
868 #[test]
869 fn test_open_in_memory() {
870 let store = Store::open_in_memory().unwrap();
871 let devices = store.list_devices().unwrap();
872 assert!(devices.is_empty());
873 }
874
875 #[test]
876 fn test_upsert_device() {
877 let store = Store::open_in_memory().unwrap();
878
879 let device = store.upsert_device("test-device", Some("Test")).unwrap();
880 assert_eq!(device.id, "test-device");
881 assert_eq!(device.name, Some("Test".to_string()));
882
883 let device = store
885 .upsert_device("test-device", Some("New Name"))
886 .unwrap();
887 assert_eq!(device.name, Some("New Name".to_string()));
888 }
889
890 #[test]
891 fn test_insert_and_query_reading() {
892 let store = Store::open_in_memory().unwrap();
893 let reading = create_test_reading();
894
895 store.insert_reading("test-device", &reading).unwrap();
896
897 let query = ReadingQuery::new().device("test-device");
898 let readings = store.query_readings(&query).unwrap();
899
900 assert_eq!(readings.len(), 1);
901 assert_eq!(readings[0].co2, 800);
902 assert_eq!(readings[0].temperature, 22.5);
903 }
904
905 #[test]
906 fn test_get_latest_reading() {
907 let store = Store::open_in_memory().unwrap();
908
909 let mut reading1 = create_test_reading();
910 reading1.co2 = 700;
911 store.insert_reading("test-device", &reading1).unwrap();
912
913 let mut reading2 = create_test_reading();
914 reading2.co2 = 900;
915 store.insert_reading("test-device", &reading2).unwrap();
916
917 let latest = store.get_latest_reading("test-device").unwrap().unwrap();
918 assert_eq!(latest.co2, 900);
919 }
920
921 #[test]
922 fn test_insert_history_deduplication() {
923 let store = Store::open_in_memory().unwrap();
924
925 let now = OffsetDateTime::now_utc();
926 let records = vec![
927 HistoryRecord {
928 timestamp: now,
929 co2: 800,
930 temperature: 22.0,
931 pressure: 1013.0,
932 humidity: 45,
933 radon: None,
934 radiation_rate: None,
935 radiation_total: None,
936 },
937 HistoryRecord {
938 timestamp: now, co2: 850,
940 temperature: 23.0,
941 pressure: 1014.0,
942 humidity: 46,
943 radon: None,
944 radiation_rate: None,
945 radiation_total: None,
946 },
947 ];
948
949 let inserted = store.insert_history("test-device", &records).unwrap();
950 assert_eq!(inserted, 1); let count = store.count_history(Some("test-device")).unwrap();
953 assert_eq!(count, 1);
954 }
955
956 #[test]
957 fn test_sync_state() {
958 let store = Store::open_in_memory().unwrap();
959 store.upsert_device("test-device", None).unwrap();
960
961 let state = store.get_sync_state("test-device").unwrap();
963 assert!(state.is_none());
964
965 store.update_sync_state("test-device", 100, 100).unwrap();
967
968 let state = store.get_sync_state("test-device").unwrap().unwrap();
969 assert_eq!(state.last_history_index, Some(100));
970 assert_eq!(state.total_readings, Some(100));
971 assert!(state.last_sync_at.is_some());
972 }
973
974 #[test]
975 fn test_calculate_sync_start() {
976 let store = Store::open_in_memory().unwrap();
977 store.upsert_device("test-device", None).unwrap();
978
979 let start = store.calculate_sync_start("test-device", 100).unwrap();
981 assert_eq!(start, 1);
982
983 store.update_sync_state("test-device", 100, 100).unwrap();
985
986 let start = store.calculate_sync_start("test-device", 100).unwrap();
988 assert_eq!(start, 101);
989
990 let start = store.calculate_sync_start("test-device", 110).unwrap();
992 assert_eq!(start, 101);
993 }
994
995 #[test]
996 fn test_import_history_csv() {
997 let store = Store::open_in_memory().unwrap();
998
999 let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
10002024-01-15T10:30:00Z,Aranet4 17C3C,800,22.5,1013.25,45,
10012024-01-15T11:30:00Z,Aranet4 17C3C,850,23.0,1014.00,48,
10022024-01-15T12:30:00Z,AranetRn+ 306B8,0,21.0,1012.00,50,150
1003"#;
1004
1005 let result = store.import_history_csv(csv_data).unwrap();
1006
1007 assert_eq!(result.total, 3);
1008 assert_eq!(result.imported, 3);
1009 assert_eq!(result.skipped, 0);
1010 assert!(result.errors.is_empty());
1011
1012 let devices = store.list_devices().unwrap();
1014 assert_eq!(devices.len(), 2);
1015
1016 let query = HistoryQuery::new().device("Aranet4 17C3C");
1018 let records = store.query_history(&query).unwrap();
1019 assert_eq!(records.len(), 2);
1020 assert_eq!(records[0].co2, 850); assert_eq!(records[1].co2, 800); let query = HistoryQuery::new().device("AranetRn+ 306B8");
1025 let records = store.query_history(&query).unwrap();
1026 assert_eq!(records.len(), 1);
1027 assert_eq!(records[0].radon, Some(150));
1028 }
1029
1030 #[test]
1031 fn test_import_history_csv_deduplication() {
1032 let store = Store::open_in_memory().unwrap();
1033
1034 let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
10352024-01-15T10:30:00Z,test-device,800,22.5,1013.25,45,
1036"#;
1037
1038 let result = store.import_history_csv(csv_data).unwrap();
1040 assert_eq!(result.imported, 1);
1041
1042 let result = store.import_history_csv(csv_data).unwrap();
1044 assert_eq!(result.imported, 0);
1045 assert_eq!(result.skipped, 1);
1046 }
1047
1048 #[test]
1049 fn test_import_history_csv_with_errors() {
1050 let store = Store::open_in_memory().unwrap();
1051
1052 let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
1053invalid-timestamp,test-device,800,22.5,1013.25,45,
10542024-01-15T10:30:00Z,,800,22.5,1013.25,45,
10552024-01-15T11:30:00Z,valid-device,900,23.0,1014.00,50,
1056"#;
1057
1058 let result = store.import_history_csv(csv_data).unwrap();
1059
1060 assert_eq!(result.total, 3);
1061 assert_eq!(result.imported, 1);
1062 assert_eq!(result.skipped, 2);
1063 assert_eq!(result.errors.len(), 2);
1064 }
1065
1066 #[test]
1067 fn test_import_history_json() {
1068 let store = Store::open_in_memory().unwrap();
1069
1070 let json_data = r#"[
1071 {
1072 "id": 0,
1073 "device_id": "Aranet4 17C3C",
1074 "timestamp": "2024-01-15T10:30:00Z",
1075 "synced_at": "2024-01-15T12:00:00Z",
1076 "co2": 800,
1077 "temperature": 22.5,
1078 "pressure": 1013.25,
1079 "humidity": 45,
1080 "radon": null,
1081 "radiation_rate": null,
1082 "radiation_total": null
1083 },
1084 {
1085 "id": 0,
1086 "device_id": "Aranet4 17C3C",
1087 "timestamp": "2024-01-15T11:30:00Z",
1088 "synced_at": "2024-01-15T12:00:00Z",
1089 "co2": 850,
1090 "temperature": 23.0,
1091 "pressure": 1014.0,
1092 "humidity": 48,
1093 "radon": null,
1094 "radiation_rate": null,
1095 "radiation_total": null
1096 }
1097 ]"#;
1098
1099 let result = store.import_history_json(json_data).unwrap();
1100
1101 assert_eq!(result.total, 2);
1102 assert_eq!(result.imported, 2);
1103 assert_eq!(result.skipped, 0);
1104
1105 let query = HistoryQuery::new().device("Aranet4 17C3C");
1107 let records = store.query_history(&query).unwrap();
1108 assert_eq!(records.len(), 2);
1109 }
1110}