aranet_store/
store.rs

1//! Main store implementation.
2
3use std::path::Path;
4
5use rusqlite::{Connection, OptionalExtension};
6use time::OffsetDateTime;
7use tracing::{debug, info};
8
9use aranet_types::{CurrentReading, DeviceInfo, DeviceType, HistoryRecord, Status};
10
11use crate::error::{Error, Result};
12use crate::models::{StoredDevice, StoredHistoryRecord, StoredReading, SyncState};
13use crate::queries::{HistoryQuery, ReadingQuery};
14use crate::schema;
15
16/// SQLite-based store for Aranet sensor data.
17pub struct Store {
18    conn: Connection,
19}
20
21impl Store {
22    /// Open or create a database at the given path.
23    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
24        let path = path.as_ref();
25
26        // Create parent directories if needed
27        if let Some(parent) = path.parent()
28            && !parent.exists()
29        {
30            std::fs::create_dir_all(parent).map_err(|e| Error::CreateDirectory {
31                path: parent.to_path_buf(),
32                source: e,
33            })?;
34        }
35
36        info!("Opening database at {}", path.display());
37        let conn = Connection::open(path)?;
38
39        // Enable foreign keys and WAL mode for better performance
40        conn.execute_batch(
41            "PRAGMA foreign_keys = ON;
42             PRAGMA journal_mode = WAL;
43             PRAGMA synchronous = NORMAL;",
44        )?;
45
46        // Initialize schema
47        schema::initialize(&conn)?;
48
49        Ok(Self { conn })
50    }
51
52    /// Open the default database location.
53    pub fn open_default() -> Result<Self> {
54        Self::open(crate::default_db_path())
55    }
56
57    /// Open an in-memory database (for testing).
58    pub fn open_in_memory() -> Result<Self> {
59        let conn = Connection::open_in_memory()?;
60        schema::initialize(&conn)?;
61        Ok(Self { conn })
62    }
63
64    // === Device operations ===
65
66    /// Get or create a device entry.
67    pub fn upsert_device(&self, device_id: &str, name: Option<&str>) -> Result<StoredDevice> {
68        let now = OffsetDateTime::now_utc().unix_timestamp();
69
70        self.conn.execute(
71            "INSERT INTO devices (id, name, first_seen, last_seen) VALUES (?1, ?2, ?3, ?3)
72             ON CONFLICT(id) DO UPDATE SET 
73                name = COALESCE(?2, name),
74                last_seen = ?3",
75            rusqlite::params![device_id, name, now],
76        )?;
77
78        self.get_device(device_id)?
79            .ok_or_else(|| Error::DeviceNotFound(device_id.to_string()))
80    }
81
82    /// Update device metadata (name and type).
83    ///
84    /// This is a simpler version of `update_device_info` for when you only have
85    /// basic device information (e.g., from BLE advertisement or connection).
86    pub fn update_device_metadata(
87        &self,
88        device_id: &str,
89        name: Option<&str>,
90        device_type: Option<DeviceType>,
91    ) -> Result<()> {
92        let device_type_str = device_type.map(|dt| format!("{:?}", dt));
93        let now = OffsetDateTime::now_utc().unix_timestamp();
94
95        self.conn.execute(
96            "UPDATE devices SET
97                name = COALESCE(?2, name),
98                device_type = COALESCE(?3, device_type),
99                last_seen = ?4
100             WHERE id = ?1",
101            rusqlite::params![device_id, name, device_type_str, now],
102        )?;
103
104        Ok(())
105    }
106
107    /// Update device info from DeviceInfo.
108    pub fn update_device_info(&self, device_id: &str, info: &DeviceInfo) -> Result<()> {
109        // Infer device type from model name if possible
110        let device_type = if info.model.contains("Aranet4") {
111            Some("Aranet4")
112        } else if info.model.contains("Aranet2") {
113            Some("Aranet2")
114        } else if info.model.contains("Radon") || info.model.contains("Rn") {
115            Some("AranetRadon")
116        } else if info.model.contains("Radiation") {
117            Some("AranetRadiation")
118        } else {
119            None
120        };
121
122        let name = if info.name.is_empty() {
123            None
124        } else {
125            Some(&info.name)
126        };
127
128        self.conn.execute(
129            "UPDATE devices SET
130                name = COALESCE(?2, name),
131                device_type = COALESCE(?3, device_type),
132                serial = COALESCE(?4, serial),
133                firmware = COALESCE(?5, firmware),
134                hardware = COALESCE(?6, hardware),
135                last_seen = ?7
136             WHERE id = ?1",
137            rusqlite::params![
138                device_id,
139                name,
140                device_type,
141                &info.serial,
142                &info.firmware,
143                &info.hardware,
144                OffsetDateTime::now_utc().unix_timestamp()
145            ],
146        )?;
147
148        Ok(())
149    }
150
151    /// Get a device by ID.
152    pub fn get_device(&self, device_id: &str) -> Result<Option<StoredDevice>> {
153        let mut stmt = self.conn.prepare(
154            "SELECT id, name, device_type, serial, firmware, hardware, first_seen, last_seen 
155             FROM devices WHERE id = ?",
156        )?;
157
158        let device = stmt
159            .query_row([device_id], |row| {
160                Ok(StoredDevice {
161                    id: row.get(0)?,
162                    name: row.get(1)?,
163                    device_type: row
164                        .get::<_, Option<String>>(2)?
165                        .and_then(|s| parse_device_type(&s)),
166                    serial: row.get(3)?,
167                    firmware: row.get(4)?,
168                    hardware: row.get(5)?,
169                    first_seen: OffsetDateTime::from_unix_timestamp(row.get(6)?).unwrap(),
170                    last_seen: OffsetDateTime::from_unix_timestamp(row.get(7)?).unwrap(),
171                })
172            })
173            .optional()?;
174
175        Ok(device)
176    }
177
178    /// List all devices.
179    pub fn list_devices(&self) -> Result<Vec<StoredDevice>> {
180        let mut stmt = self.conn.prepare(
181            "SELECT id, name, device_type, serial, firmware, hardware, first_seen, last_seen 
182             FROM devices ORDER BY last_seen DESC",
183        )?;
184
185        let devices = stmt
186            .query_map([], |row| {
187                Ok(StoredDevice {
188                    id: row.get(0)?,
189                    name: row.get(1)?,
190                    device_type: row
191                        .get::<_, Option<String>>(2)?
192                        .and_then(|s| parse_device_type(&s)),
193                    serial: row.get(3)?,
194                    firmware: row.get(4)?,
195                    hardware: row.get(5)?,
196                    first_seen: OffsetDateTime::from_unix_timestamp(row.get(6)?).unwrap(),
197                    last_seen: OffsetDateTime::from_unix_timestamp(row.get(7)?).unwrap(),
198                })
199            })?
200            .collect::<std::result::Result<Vec<_>, _>>()?;
201
202        Ok(devices)
203    }
204}
205
206fn parse_device_type(s: &str) -> Option<DeviceType> {
207    match s {
208        "Aranet4" => Some(DeviceType::Aranet4),
209        "Aranet2" => Some(DeviceType::Aranet2),
210        "AranetRadon" => Some(DeviceType::AranetRadon),
211        "AranetRadiation" => Some(DeviceType::AranetRadiation),
212        _ => None,
213    }
214}
215
216fn parse_status(s: &str) -> Status {
217    match s {
218        "Green" => Status::Green,
219        "Yellow" => Status::Yellow,
220        "Red" => Status::Red,
221        "Error" => Status::Error,
222        _ => Status::Green,
223    }
224}
225
226// Reading operations
227impl Store {
228    /// Insert a current reading.
229    pub fn insert_reading(&self, device_id: &str, reading: &CurrentReading) -> Result<i64> {
230        // Ensure device exists
231        self.upsert_device(device_id, None)?;
232
233        let captured_at = reading
234            .captured_at
235            .unwrap_or_else(OffsetDateTime::now_utc)
236            .unix_timestamp();
237
238        self.conn.execute(
239            "INSERT INTO readings (device_id, captured_at, co2, temperature, pressure,
240             humidity, battery, status, radon, radiation_rate, radiation_total)
241             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
242            rusqlite::params![
243                device_id,
244                captured_at,
245                reading.co2,
246                reading.temperature,
247                reading.pressure,
248                reading.humidity,
249                reading.battery,
250                format!("{:?}", reading.status),
251                reading.radon,
252                reading.radiation_rate,
253                reading.radiation_total,
254            ],
255        )?;
256
257        Ok(self.conn.last_insert_rowid())
258    }
259
260    /// Query readings with filters.
261    pub fn query_readings(&self, query: &ReadingQuery) -> Result<Vec<StoredReading>> {
262        let sql = query.build_sql();
263        let (_, params) = query.build_where();
264
265        debug!("Executing query: {}", sql);
266
267        let params_ref: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
268
269        let mut stmt = self.conn.prepare(&sql)?;
270        let readings = stmt
271            .query_map(params_ref.as_slice(), |row| {
272                Ok(StoredReading {
273                    id: row.get(0)?,
274                    device_id: row.get(1)?,
275                    captured_at: OffsetDateTime::from_unix_timestamp(row.get(2)?).unwrap(),
276                    co2: row.get::<_, i64>(3)? as u16,
277                    temperature: row.get(4)?,
278                    pressure: row.get(5)?,
279                    humidity: row.get::<_, i64>(6)? as u8,
280                    battery: row.get::<_, i64>(7)? as u8,
281                    status: parse_status(&row.get::<_, String>(8)?),
282                    radon: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
283                    radiation_rate: row.get(10)?,
284                    radiation_total: row.get(11)?,
285                })
286            })?
287            .collect::<std::result::Result<Vec<_>, _>>()?;
288
289        Ok(readings)
290    }
291
292    /// Get the latest reading for a device.
293    pub fn get_latest_reading(&self, device_id: &str) -> Result<Option<StoredReading>> {
294        let query = ReadingQuery::new().device(device_id).limit(1);
295        let mut readings = self.query_readings(&query)?;
296        Ok(readings.pop())
297    }
298
299    /// Count readings for a device.
300    pub fn count_readings(&self, device_id: Option<&str>) -> Result<u64> {
301        let count: i64 = match device_id {
302            Some(id) => self.conn.query_row(
303                "SELECT COUNT(*) FROM readings WHERE device_id = ?",
304                [id],
305                |row| row.get(0),
306            )?,
307            None => self
308                .conn
309                .query_row("SELECT COUNT(*) FROM readings", [], |row| row.get(0))?,
310        };
311
312        Ok(count as u64)
313    }
314}
315
316// History operations
317impl Store {
318    /// Insert history records (with deduplication).
319    pub fn insert_history(&self, device_id: &str, records: &[HistoryRecord]) -> Result<usize> {
320        // Ensure device exists
321        self.upsert_device(device_id, None)?;
322
323        let synced_at = OffsetDateTime::now_utc().unix_timestamp();
324        let mut inserted = 0;
325
326        for record in records {
327            let result = self.conn.execute(
328                "INSERT OR IGNORE INTO history (device_id, timestamp, synced_at, co2,
329                 temperature, pressure, humidity, radon, radiation_rate, radiation_total)
330                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
331                rusqlite::params![
332                    device_id,
333                    record.timestamp.unix_timestamp(),
334                    synced_at,
335                    record.co2,
336                    record.temperature,
337                    record.pressure,
338                    record.humidity,
339                    record.radon,
340                    record.radiation_rate,
341                    record.radiation_total,
342                ],
343            )?;
344            inserted += result;
345        }
346
347        info!(
348            "Inserted {} new history records for {}",
349            inserted, device_id
350        );
351        Ok(inserted)
352    }
353
354    /// Query history records with filters.
355    pub fn query_history(&self, query: &HistoryQuery) -> Result<Vec<StoredHistoryRecord>> {
356        let mut conditions = Vec::new();
357        let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
358
359        if let Some(ref device_id) = query.device_id {
360            conditions.push("device_id = ?");
361            params.push(Box::new(device_id.clone()));
362        }
363
364        if let Some(since) = query.since {
365            conditions.push("timestamp >= ?");
366            params.push(Box::new(since.unix_timestamp()));
367        }
368
369        if let Some(until) = query.until {
370            conditions.push("timestamp <= ?");
371            params.push(Box::new(until.unix_timestamp()));
372        }
373
374        let where_clause = if conditions.is_empty() {
375            String::new()
376        } else {
377            format!("WHERE {}", conditions.join(" AND "))
378        };
379
380        let order = if query.newest_first { "DESC" } else { "ASC" };
381
382        let mut sql = format!(
383            "SELECT id, device_id, timestamp, synced_at, co2, temperature, pressure,
384             humidity, radon, radiation_rate, radiation_total
385             FROM history {} ORDER BY timestamp {}",
386            where_clause, order
387        );
388
389        if let Some(limit) = query.limit {
390            sql.push_str(&format!(" LIMIT {}", limit));
391        }
392
393        if let Some(offset) = query.offset {
394            sql.push_str(&format!(" OFFSET {}", offset));
395        }
396
397        let params_ref: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
398
399        let mut stmt = self.conn.prepare(&sql)?;
400        let records = stmt
401            .query_map(params_ref.as_slice(), |row| {
402                Ok(StoredHistoryRecord {
403                    id: row.get(0)?,
404                    device_id: row.get(1)?,
405                    timestamp: OffsetDateTime::from_unix_timestamp(row.get(2)?).unwrap(),
406                    synced_at: OffsetDateTime::from_unix_timestamp(row.get(3)?).unwrap(),
407                    co2: row.get::<_, i64>(4)? as u16,
408                    temperature: row.get(5)?,
409                    pressure: row.get(6)?,
410                    humidity: row.get::<_, i64>(7)? as u8,
411                    radon: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
412                    radiation_rate: row.get(9)?,
413                    radiation_total: row.get(10)?,
414                })
415            })?
416            .collect::<std::result::Result<Vec<_>, _>>()?;
417
418        Ok(records)
419    }
420
421    /// Count history records for a device.
422    pub fn count_history(&self, device_id: Option<&str>) -> Result<u64> {
423        let count: i64 = match device_id {
424            Some(id) => self.conn.query_row(
425                "SELECT COUNT(*) FROM history WHERE device_id = ?",
426                [id],
427                |row| row.get(0),
428            )?,
429            None => self
430                .conn
431                .query_row("SELECT COUNT(*) FROM history", [], |row| row.get(0))?,
432        };
433
434        Ok(count as u64)
435    }
436}
437
438// Sync state operations
439impl Store {
440    /// Get sync state for a device.
441    pub fn get_sync_state(&self, device_id: &str) -> Result<Option<SyncState>> {
442        let mut stmt = self.conn.prepare(
443            "SELECT device_id, last_history_index, total_readings, last_sync_at
444             FROM sync_state WHERE device_id = ?",
445        )?;
446
447        let state = stmt
448            .query_row([device_id], |row| {
449                Ok(SyncState {
450                    device_id: row.get(0)?,
451                    last_history_index: row.get::<_, Option<i64>>(1)?.map(|v| v as u16),
452                    total_readings: row.get::<_, Option<i64>>(2)?.map(|v| v as u16),
453                    last_sync_at: row
454                        .get::<_, Option<i64>>(3)?
455                        .map(|ts| OffsetDateTime::from_unix_timestamp(ts).unwrap()),
456                })
457            })
458            .optional()?;
459
460        Ok(state)
461    }
462
463    /// Update sync state after a successful sync.
464    pub fn update_sync_state(
465        &self,
466        device_id: &str,
467        last_index: u16,
468        total_readings: u16,
469    ) -> Result<()> {
470        let now = OffsetDateTime::now_utc().unix_timestamp();
471
472        self.conn.execute(
473            "INSERT INTO sync_state (device_id, last_history_index, total_readings, last_sync_at)
474             VALUES (?1, ?2, ?3, ?4)
475             ON CONFLICT(device_id) DO UPDATE SET
476                last_history_index = ?2,
477                total_readings = ?3,
478                last_sync_at = ?4",
479            rusqlite::params![device_id, last_index, total_readings, now],
480        )?;
481
482        debug!(
483            "Updated sync state for {}: index={}, total={}",
484            device_id, last_index, total_readings
485        );
486
487        Ok(())
488    }
489
490    /// Calculate the start index for incremental sync.
491    ///
492    /// Returns the index to start downloading from (1-based).
493    /// If the device has new readings since last sync, returns the next index.
494    /// If this is the first sync, returns 1 to download all.
495    pub fn calculate_sync_start(&self, device_id: &str, current_total: u16) -> Result<u16> {
496        let state = self.get_sync_state(device_id)?;
497
498        match state {
499            Some(s) if s.total_readings == Some(current_total) => {
500                // No new readings since last sync
501                debug!("No new readings for {}", device_id);
502                Ok(current_total + 1) // Return beyond range to indicate no sync needed
503            }
504            Some(s) if s.last_history_index.is_some() => {
505                // We have previous state, calculate new records
506                let last_index = s.last_history_index.unwrap();
507                let prev_total = s.total_readings.unwrap_or(0);
508                let new_count = current_total.saturating_sub(prev_total);
509
510                if new_count > 0 {
511                    // Start from where we left off
512                    let start = last_index.saturating_add(1);
513                    debug!(
514                        "Incremental sync for {}: {} new readings, starting at {}",
515                        device_id, new_count, start
516                    );
517                    Ok(start)
518                } else {
519                    Ok(current_total + 1)
520                }
521            }
522            _ => {
523                // First sync - download all
524                debug!(
525                    "First sync for {}: downloading all {} readings",
526                    device_id, current_total
527                );
528                Ok(1)
529            }
530        }
531    }
532}
533
534/// Aggregate statistics for history data.
535#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
536pub struct HistoryStats {
537    /// Number of records.
538    pub count: u64,
539    /// Minimum values.
540    pub min: HistoryAggregates,
541    /// Maximum values.
542    pub max: HistoryAggregates,
543    /// Average values.
544    pub avg: HistoryAggregates,
545    /// Time range of records.
546    pub time_range: Option<(OffsetDateTime, OffsetDateTime)>,
547}
548
549/// Aggregate values for a single metric set.
550#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
551pub struct HistoryAggregates {
552    /// CO2 in ppm.
553    pub co2: Option<f64>,
554    /// Temperature in Celsius.
555    pub temperature: Option<f64>,
556    /// Pressure in hPa.
557    pub pressure: Option<f64>,
558    /// Humidity percentage.
559    pub humidity: Option<f64>,
560    /// Radon in Bq/m3 (for radon devices).
561    pub radon: Option<f64>,
562}
563
564// Aggregate and export operations
565impl Store {
566    /// Get aggregate statistics for history records.
567    pub fn history_stats(&self, query: &HistoryQuery) -> Result<HistoryStats> {
568        let mut conditions = Vec::new();
569        let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
570
571        if let Some(ref device_id) = query.device_id {
572            conditions.push("device_id = ?");
573            params.push(Box::new(device_id.clone()));
574        }
575
576        if let Some(since) = query.since {
577            conditions.push("timestamp >= ?");
578            params.push(Box::new(since.unix_timestamp()));
579        }
580
581        if let Some(until) = query.until {
582            conditions.push("timestamp <= ?");
583            params.push(Box::new(until.unix_timestamp()));
584        }
585
586        let where_clause = if conditions.is_empty() {
587            String::new()
588        } else {
589            format!("WHERE {}", conditions.join(" AND "))
590        };
591
592        let sql = format!(
593            "SELECT
594                COUNT(*) as count,
595                MIN(co2) as min_co2, MAX(co2) as max_co2, AVG(co2) as avg_co2,
596                MIN(temperature) as min_temp, MAX(temperature) as max_temp, AVG(temperature) as avg_temp,
597                MIN(pressure) as min_press, MAX(pressure) as max_press, AVG(pressure) as avg_press,
598                MIN(humidity) as min_hum, MAX(humidity) as max_hum, AVG(humidity) as avg_hum,
599                MIN(radon) as min_radon, MAX(radon) as max_radon, AVG(radon) as avg_radon,
600                MIN(timestamp) as min_ts, MAX(timestamp) as max_ts
601             FROM history {}",
602            where_clause
603        );
604
605        let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
606
607        let stats = self.conn.query_row(&sql, params_refs.as_slice(), |row| {
608            let count: i64 = row.get(0)?;
609            let min_ts: Option<i64> = row.get(16)?;
610            let max_ts: Option<i64> = row.get(17)?;
611
612            let time_range = match (min_ts, max_ts) {
613                (Some(min), Some(max)) => Some((
614                    OffsetDateTime::from_unix_timestamp(min).unwrap(),
615                    OffsetDateTime::from_unix_timestamp(max).unwrap(),
616                )),
617                _ => None,
618            };
619
620            Ok(HistoryStats {
621                count: count as u64,
622                min: HistoryAggregates {
623                    co2: row.get::<_, Option<i64>>(1)?.map(|v| v as f64),
624                    temperature: row.get(4)?,
625                    pressure: row.get(7)?,
626                    humidity: row.get::<_, Option<i64>>(10)?.map(|v| v as f64),
627                    radon: row.get::<_, Option<i64>>(13)?.map(|v| v as f64),
628                },
629                max: HistoryAggregates {
630                    co2: row.get::<_, Option<i64>>(2)?.map(|v| v as f64),
631                    temperature: row.get(5)?,
632                    pressure: row.get(8)?,
633                    humidity: row.get::<_, Option<i64>>(11)?.map(|v| v as f64),
634                    radon: row.get::<_, Option<i64>>(14)?.map(|v| v as f64),
635                },
636                avg: HistoryAggregates {
637                    co2: row.get(3)?,
638                    temperature: row.get(6)?,
639                    pressure: row.get(9)?,
640                    humidity: row.get(12)?,
641                    radon: row.get(15)?,
642                },
643                time_range,
644            })
645        })?;
646
647        Ok(stats)
648    }
649
650    /// Export history records to CSV format.
651    pub fn export_history_csv(&self, query: &HistoryQuery) -> Result<String> {
652        let records = self.query_history(query)?;
653        let mut output = String::new();
654
655        // Header
656        output.push_str("timestamp,device_id,co2,temperature,pressure,humidity,radon\n");
657
658        // Data rows
659        for record in records {
660            let timestamp = record
661                .timestamp
662                .format(&time::format_description::well_known::Rfc3339)
663                .unwrap_or_default();
664            let radon = record.radon.map(|r| r.to_string()).unwrap_or_default();
665
666            output.push_str(&format!(
667                "{},{},{},{:.1},{:.2},{},{}\n",
668                timestamp,
669                record.device_id,
670                record.co2,
671                record.temperature,
672                record.pressure,
673                record.humidity,
674                radon
675            ));
676        }
677
678        Ok(output)
679    }
680
681    /// Export history records to JSON format.
682    pub fn export_history_json(&self, query: &HistoryQuery) -> Result<String> {
683        let records = self.query_history(query)?;
684        let json = serde_json::to_string_pretty(&records)
685            .map_err(|e| Error::Database(rusqlite::Error::ToSqlConversionFailure(Box::new(e))))?;
686        Ok(json)
687    }
688
689    /// Import history records from CSV format.
690    ///
691    /// Expected CSV format:
692    /// ```csv
693    /// timestamp,device_id,co2,temperature,pressure,humidity,radon
694    /// 2024-01-15T10:30:00Z,Aranet4 17C3C,800,22.5,1013.25,45,
695    /// ```
696    ///
697    /// Returns the number of records imported (deduplicated by device_id + timestamp).
698    pub fn import_history_csv(&self, csv_data: &str) -> Result<ImportResult> {
699        let mut reader = csv::ReaderBuilder::new()
700            .has_headers(true)
701            .flexible(true)
702            .trim(csv::Trim::All)
703            .from_reader(csv_data.as_bytes());
704
705        let mut total = 0;
706        let mut imported = 0;
707        let mut skipped = 0;
708        let mut errors = Vec::new();
709
710        for (line_num, result) in reader.records().enumerate() {
711            total += 1;
712            let line = line_num + 2; // Account for header and 0-indexing
713
714            let record = match result {
715                Ok(r) => r,
716                Err(e) => {
717                    errors.push(format!("Line {}: parse error - {}", line, e));
718                    skipped += 1;
719                    continue;
720                }
721            };
722
723            // Parse fields
724            let timestamp_str = record.get(0).unwrap_or("").trim();
725            let device_id = record.get(1).unwrap_or("").trim();
726            let co2_str = record.get(2).unwrap_or("").trim();
727            let temp_str = record.get(3).unwrap_or("").trim();
728            let pressure_str = record.get(4).unwrap_or("").trim();
729            let humidity_str = record.get(5).unwrap_or("").trim();
730            let radon_str = record.get(6).unwrap_or("").trim();
731
732            // Validate required fields
733            if device_id.is_empty() {
734                errors.push(format!("Line {}: missing device_id", line));
735                skipped += 1;
736                continue;
737            }
738
739            // Parse timestamp
740            let timestamp = match OffsetDateTime::parse(
741                timestamp_str,
742                &time::format_description::well_known::Rfc3339,
743            ) {
744                Ok(ts) => ts,
745                Err(_) => {
746                    errors.push(format!(
747                        "Line {}: invalid timestamp '{}'",
748                        line, timestamp_str
749                    ));
750                    skipped += 1;
751                    continue;
752                }
753            };
754
755            // Parse numeric fields with defaults
756            let co2: u16 = co2_str.parse().unwrap_or(0);
757            let temperature: f32 = temp_str.parse().unwrap_or(0.0);
758            let pressure: f32 = pressure_str.parse().unwrap_or(0.0);
759            let humidity: u8 = humidity_str.parse().unwrap_or(0);
760            let radon: Option<u32> = if radon_str.is_empty() {
761                None
762            } else {
763                radon_str.parse().ok()
764            };
765
766            // Create history record
767            let history_record = HistoryRecord {
768                timestamp,
769                co2,
770                temperature,
771                pressure,
772                humidity,
773                radon,
774                radiation_rate: None,
775                radiation_total: None,
776            };
777
778            // Ensure device exists and insert record
779            self.upsert_device(device_id, None)?;
780            let count = self.insert_history(device_id, &[history_record])?;
781            imported += count;
782            if count == 0 {
783                skipped += 1; // Duplicate record
784            }
785        }
786
787        Ok(ImportResult {
788            total,
789            imported,
790            skipped,
791            errors,
792        })
793    }
794
795    /// Import history records from JSON format.
796    ///
797    /// Expected JSON format: an array of StoredHistoryRecord objects.
798    ///
799    /// Returns the number of records imported (deduplicated by device_id + timestamp).
800    pub fn import_history_json(&self, json_data: &str) -> Result<ImportResult> {
801        let records: Vec<StoredHistoryRecord> = serde_json::from_str(json_data)
802            .map_err(|e| Error::Database(rusqlite::Error::ToSqlConversionFailure(Box::new(e))))?;
803
804        let total = records.len();
805        let mut imported = 0;
806        let mut skipped = 0;
807
808        for record in records {
809            // Convert to HistoryRecord
810            let history_record = record.to_history();
811
812            // Ensure device exists and insert record
813            self.upsert_device(&record.device_id, None)?;
814            let count = self.insert_history(&record.device_id, &[history_record])?;
815            imported += count;
816            if count == 0 {
817                skipped += 1; // Duplicate record
818            }
819        }
820
821        Ok(ImportResult {
822            total,
823            imported,
824            skipped,
825            errors: Vec::new(),
826        })
827    }
828}
829
830/// Result of an import operation.
831#[derive(Debug, Clone)]
832pub struct ImportResult {
833    /// Total records processed.
834    pub total: usize,
835    /// Records successfully imported.
836    pub imported: usize,
837    /// Records skipped (duplicates or errors).
838    pub skipped: usize,
839    /// Error messages for failed records.
840    pub errors: Vec<String>,
841}
842
843#[cfg(test)]
844mod tests {
845    use super::*;
846    use aranet_types::Status;
847
848    fn create_test_reading() -> CurrentReading {
849        CurrentReading {
850            co2: 800,
851            temperature: 22.5,
852            pressure: 1013.0,
853            humidity: 45,
854            battery: 85,
855            status: Status::Green,
856            interval: 60,
857            age: 30,
858            captured_at: Some(OffsetDateTime::now_utc()),
859            radon: None,
860            radiation_rate: None,
861            radiation_total: None,
862            radon_avg_24h: None,
863            radon_avg_7d: None,
864            radon_avg_30d: None,
865        }
866    }
867
868    #[test]
869    fn test_open_in_memory() {
870        let store = Store::open_in_memory().unwrap();
871        let devices = store.list_devices().unwrap();
872        assert!(devices.is_empty());
873    }
874
875    #[test]
876    fn test_upsert_device() {
877        let store = Store::open_in_memory().unwrap();
878
879        let device = store.upsert_device("test-device", Some("Test")).unwrap();
880        assert_eq!(device.id, "test-device");
881        assert_eq!(device.name, Some("Test".to_string()));
882
883        // Update name
884        let device = store
885            .upsert_device("test-device", Some("New Name"))
886            .unwrap();
887        assert_eq!(device.name, Some("New Name".to_string()));
888    }
889
890    #[test]
891    fn test_insert_and_query_reading() {
892        let store = Store::open_in_memory().unwrap();
893        let reading = create_test_reading();
894
895        store.insert_reading("test-device", &reading).unwrap();
896
897        let query = ReadingQuery::new().device("test-device");
898        let readings = store.query_readings(&query).unwrap();
899
900        assert_eq!(readings.len(), 1);
901        assert_eq!(readings[0].co2, 800);
902        assert_eq!(readings[0].temperature, 22.5);
903    }
904
905    #[test]
906    fn test_get_latest_reading() {
907        let store = Store::open_in_memory().unwrap();
908
909        let mut reading1 = create_test_reading();
910        reading1.co2 = 700;
911        store.insert_reading("test-device", &reading1).unwrap();
912
913        let mut reading2 = create_test_reading();
914        reading2.co2 = 900;
915        store.insert_reading("test-device", &reading2).unwrap();
916
917        let latest = store.get_latest_reading("test-device").unwrap().unwrap();
918        assert_eq!(latest.co2, 900);
919    }
920
921    #[test]
922    fn test_insert_history_deduplication() {
923        let store = Store::open_in_memory().unwrap();
924
925        let now = OffsetDateTime::now_utc();
926        let records = vec![
927            HistoryRecord {
928                timestamp: now,
929                co2: 800,
930                temperature: 22.0,
931                pressure: 1013.0,
932                humidity: 45,
933                radon: None,
934                radiation_rate: None,
935                radiation_total: None,
936            },
937            HistoryRecord {
938                timestamp: now, // Same timestamp - should be deduplicated
939                co2: 850,
940                temperature: 23.0,
941                pressure: 1014.0,
942                humidity: 46,
943                radon: None,
944                radiation_rate: None,
945                radiation_total: None,
946            },
947        ];
948
949        let inserted = store.insert_history("test-device", &records).unwrap();
950        assert_eq!(inserted, 1); // Only one inserted due to dedup
951
952        let count = store.count_history(Some("test-device")).unwrap();
953        assert_eq!(count, 1);
954    }
955
956    #[test]
957    fn test_sync_state() {
958        let store = Store::open_in_memory().unwrap();
959        store.upsert_device("test-device", None).unwrap();
960
961        // Initially no sync state
962        let state = store.get_sync_state("test-device").unwrap();
963        assert!(state.is_none());
964
965        // Update sync state
966        store.update_sync_state("test-device", 100, 100).unwrap();
967
968        let state = store.get_sync_state("test-device").unwrap().unwrap();
969        assert_eq!(state.last_history_index, Some(100));
970        assert_eq!(state.total_readings, Some(100));
971        assert!(state.last_sync_at.is_some());
972    }
973
974    #[test]
975    fn test_calculate_sync_start() {
976        let store = Store::open_in_memory().unwrap();
977        store.upsert_device("test-device", None).unwrap();
978
979        // First sync - should start from 1
980        let start = store.calculate_sync_start("test-device", 100).unwrap();
981        assert_eq!(start, 1);
982
983        // After syncing all 100, update state
984        store.update_sync_state("test-device", 100, 100).unwrap();
985
986        // No new readings - should return beyond range
987        let start = store.calculate_sync_start("test-device", 100).unwrap();
988        assert_eq!(start, 101);
989
990        // New readings added - should start from 101
991        let start = store.calculate_sync_start("test-device", 110).unwrap();
992        assert_eq!(start, 101);
993    }
994
995    #[test]
996    fn test_import_history_csv() {
997        let store = Store::open_in_memory().unwrap();
998
999        let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
10002024-01-15T10:30:00Z,Aranet4 17C3C,800,22.5,1013.25,45,
10012024-01-15T11:30:00Z,Aranet4 17C3C,850,23.0,1014.00,48,
10022024-01-15T12:30:00Z,AranetRn+ 306B8,0,21.0,1012.00,50,150
1003"#;
1004
1005        let result = store.import_history_csv(csv_data).unwrap();
1006
1007        assert_eq!(result.total, 3);
1008        assert_eq!(result.imported, 3);
1009        assert_eq!(result.skipped, 0);
1010        assert!(result.errors.is_empty());
1011
1012        // Verify data was imported
1013        let devices = store.list_devices().unwrap();
1014        assert_eq!(devices.len(), 2);
1015
1016        // Query defaults to newest_first=true (DESC order)
1017        let query = HistoryQuery::new().device("Aranet4 17C3C");
1018        let records = store.query_history(&query).unwrap();
1019        assert_eq!(records.len(), 2);
1020        assert_eq!(records[0].co2, 850); // 11:30 - newest first
1021        assert_eq!(records[1].co2, 800); // 10:30 - oldest
1022
1023        // Verify radon device
1024        let query = HistoryQuery::new().device("AranetRn+ 306B8");
1025        let records = store.query_history(&query).unwrap();
1026        assert_eq!(records.len(), 1);
1027        assert_eq!(records[0].radon, Some(150));
1028    }
1029
1030    #[test]
1031    fn test_import_history_csv_deduplication() {
1032        let store = Store::open_in_memory().unwrap();
1033
1034        let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
10352024-01-15T10:30:00Z,test-device,800,22.5,1013.25,45,
1036"#;
1037
1038        // Import once
1039        let result = store.import_history_csv(csv_data).unwrap();
1040        assert_eq!(result.imported, 1);
1041
1042        // Import again - should skip duplicate
1043        let result = store.import_history_csv(csv_data).unwrap();
1044        assert_eq!(result.imported, 0);
1045        assert_eq!(result.skipped, 1);
1046    }
1047
1048    #[test]
1049    fn test_import_history_csv_with_errors() {
1050        let store = Store::open_in_memory().unwrap();
1051
1052        let csv_data = r#"timestamp,device_id,co2,temperature,pressure,humidity,radon
1053invalid-timestamp,test-device,800,22.5,1013.25,45,
10542024-01-15T10:30:00Z,,800,22.5,1013.25,45,
10552024-01-15T11:30:00Z,valid-device,900,23.0,1014.00,50,
1056"#;
1057
1058        let result = store.import_history_csv(csv_data).unwrap();
1059
1060        assert_eq!(result.total, 3);
1061        assert_eq!(result.imported, 1);
1062        assert_eq!(result.skipped, 2);
1063        assert_eq!(result.errors.len(), 2);
1064    }
1065
1066    #[test]
1067    fn test_import_history_json() {
1068        let store = Store::open_in_memory().unwrap();
1069
1070        let json_data = r#"[
1071            {
1072                "id": 0,
1073                "device_id": "Aranet4 17C3C",
1074                "timestamp": "2024-01-15T10:30:00Z",
1075                "synced_at": "2024-01-15T12:00:00Z",
1076                "co2": 800,
1077                "temperature": 22.5,
1078                "pressure": 1013.25,
1079                "humidity": 45,
1080                "radon": null,
1081                "radiation_rate": null,
1082                "radiation_total": null
1083            },
1084            {
1085                "id": 0,
1086                "device_id": "Aranet4 17C3C",
1087                "timestamp": "2024-01-15T11:30:00Z",
1088                "synced_at": "2024-01-15T12:00:00Z",
1089                "co2": 850,
1090                "temperature": 23.0,
1091                "pressure": 1014.0,
1092                "humidity": 48,
1093                "radon": null,
1094                "radiation_rate": null,
1095                "radiation_total": null
1096            }
1097        ]"#;
1098
1099        let result = store.import_history_json(json_data).unwrap();
1100
1101        assert_eq!(result.total, 2);
1102        assert_eq!(result.imported, 2);
1103        assert_eq!(result.skipped, 0);
1104
1105        // Verify data was imported
1106        let query = HistoryQuery::new().device("Aranet4 17C3C");
1107        let records = store.query_history(&query).unwrap();
1108        assert_eq!(records.len(), 2);
1109    }
1110}