1use std::path::Path;
15use std::sync::Mutex;
16use async_trait::async_trait;
17use rusqlite::{Connection, params};
18use obd2_core::error::Obd2Error;
19use obd2_core::protocol::pid::Pid;
20use obd2_core::protocol::enhanced::Reading;
21use obd2_core::protocol::dtc::Dtc;
22use obd2_core::store::{VehicleStore, SessionStore};
23use obd2_core::vehicle::{VehicleProfile, ThresholdSet};
24
25pub struct SqliteStore {
27 conn: Mutex<Connection>,
28}
29
30impl SqliteStore {
31 pub fn open(path: &Path) -> Result<Self, Obd2Error> {
33 let conn = Connection::open(path)
34 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
35 let store = Self { conn: Mutex::new(conn) };
36 store.create_tables()?;
37 Ok(store)
38 }
39
40 pub fn in_memory() -> Result<Self, Obd2Error> {
42 let conn = Connection::open_in_memory()
43 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
44 let store = Self { conn: Mutex::new(conn) };
45 store.create_tables()?;
46 Ok(store)
47 }
48
49 fn create_tables(&self) -> Result<(), Obd2Error> {
50 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
51 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
52 )))?;
53 conn.execute_batch(
54 "CREATE TABLE IF NOT EXISTS vehicles (
55 vin TEXT PRIMARY KEY,
56 data TEXT NOT NULL,
57 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
58 );
59
60 CREATE TABLE IF NOT EXISTS thresholds (
61 vin TEXT PRIMARY KEY,
62 data TEXT NOT NULL,
63 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
64 );
65
66 CREATE TABLE IF NOT EXISTS readings (
67 id INTEGER PRIMARY KEY AUTOINCREMENT,
68 vin TEXT NOT NULL,
69 pid_code INTEGER NOT NULL,
70 value REAL,
71 unit TEXT,
72 timestamp TEXT NOT NULL DEFAULT (datetime('now'))
73 );
74
75 CREATE TABLE IF NOT EXISTS dtc_events (
76 id INTEGER PRIMARY KEY AUTOINCREMENT,
77 vin TEXT NOT NULL,
78 dtc_codes TEXT NOT NULL,
79 timestamp TEXT NOT NULL DEFAULT (datetime('now'))
80 );
81
82 CREATE INDEX IF NOT EXISTS idx_readings_vin ON readings(vin);
83 CREATE INDEX IF NOT EXISTS idx_dtc_events_vin ON dtc_events(vin);"
84 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
85 Ok(())
86 }
87}
88
89impl std::fmt::Debug for SqliteStore {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("SqliteStore").finish()
92 }
93}
94
95#[async_trait]
96impl VehicleStore for SqliteStore {
97 async fn save_vehicle(&self, profile: &VehicleProfile) -> Result<(), Obd2Error> {
98 let vin = &profile.vin;
99 let data = serde_json::to_string(&SerializableProfile::from(profile))
100 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
101
102 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
103 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
104 )))?;
105 conn.execute(
106 "INSERT OR REPLACE INTO vehicles (vin, data) VALUES (?1, ?2)",
107 params![vin, data],
108 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
109
110 Ok(())
111 }
112
113 async fn get_vehicle(&self, vin: &str) -> Result<Option<VehicleProfile>, Obd2Error> {
114 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
115 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
116 )))?;
117 let mut stmt = conn.prepare(
118 "SELECT data FROM vehicles WHERE vin = ?1"
119 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
120
121 let result = stmt.query_row(params![vin], |row| {
122 let data: String = row.get(0)?;
123 Ok(data)
124 });
125
126 match result {
127 Ok(data) => {
128 let sp: SerializableProfile = serde_json::from_str(&data)
129 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
130 Ok(Some(sp.into()))
131 }
132 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
133 Err(e) => Err(Obd2Error::Other(Box::new(e))),
134 }
135 }
136
137 async fn save_thresholds(&self, vin: &str, thresholds: &ThresholdSet) -> Result<(), Obd2Error> {
138 let data = serde_json::to_string(thresholds)
139 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
140
141 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
142 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
143 )))?;
144 conn.execute(
145 "INSERT OR REPLACE INTO thresholds (vin, data) VALUES (?1, ?2)",
146 params![vin, data],
147 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
148
149 Ok(())
150 }
151
152 async fn get_thresholds(&self, vin: &str) -> Result<Option<ThresholdSet>, Obd2Error> {
153 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
154 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
155 )))?;
156 let mut stmt = conn.prepare(
157 "SELECT data FROM thresholds WHERE vin = ?1"
158 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
159
160 let result = stmt.query_row(params![vin], |row| {
161 let data: String = row.get(0)?;
162 Ok(data)
163 });
164
165 match result {
166 Ok(data) => {
167 let ts: ThresholdSet = serde_json::from_str(&data)
168 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
169 Ok(Some(ts))
170 }
171 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
172 Err(e) => Err(Obd2Error::Other(Box::new(e))),
173 }
174 }
175}
176
177#[async_trait]
178impl SessionStore for SqliteStore {
179 async fn save_reading(&self, vin: &str, pid: Pid, reading: &Reading) -> Result<(), Obd2Error> {
180 let value = reading.value.as_f64().ok();
181 let unit = reading.unit;
182
183 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
184 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
185 )))?;
186 conn.execute(
187 "INSERT INTO readings (vin, pid_code, value, unit) VALUES (?1, ?2, ?3, ?4)",
188 params![vin, pid.0, value, unit],
189 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
190
191 Ok(())
192 }
193
194 async fn save_dtc_event(&self, vin: &str, dtcs: &[Dtc]) -> Result<(), Obd2Error> {
195 let codes: Vec<&str> = dtcs.iter().map(|d| d.code.as_str()).collect();
196 let codes_str = codes.join(",");
197
198 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
199 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
200 )))?;
201 conn.execute(
202 "INSERT INTO dtc_events (vin, dtc_codes) VALUES (?1, ?2)",
203 params![vin, codes_str],
204 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
205
206 Ok(())
207 }
208}
209
210#[derive(serde::Serialize, serde::Deserialize)]
212struct SerializableProfile {
213 vin: String,
214 make: Option<String>,
215 model: Option<String>,
216 year: Option<i32>,
217 engine_code: Option<String>,
218 manufacturer: Option<String>,
219 truck_class: Option<String>,
220}
221
222impl From<&VehicleProfile> for SerializableProfile {
223 fn from(p: &VehicleProfile) -> Self {
224 Self {
225 vin: p.vin.clone(),
226 make: None,
227 model: None,
228 year: p.decoded_vin.as_ref().and_then(|d| d.year),
229 engine_code: p.spec.as_ref().map(|s| s.identity.engine.code.clone()),
230 manufacturer: p.decoded_vin.as_ref().and_then(|d| d.manufacturer.clone()),
231 truck_class: p.decoded_vin.as_ref().and_then(|d| d.truck_class.clone()),
232 }
233 }
234}
235
236impl From<SerializableProfile> for VehicleProfile {
237 fn from(sp: SerializableProfile) -> Self {
238 let decoded_vin = if sp.manufacturer.is_some() || sp.year.is_some() || sp.truck_class.is_some() {
239 Some(obd2_core::vehicle::vin::DecodedVin {
240 year: sp.year,
241 year_alt: None,
242 manufacturer: sp.manufacturer,
243 truck_class: sp.truck_class,
244 })
245 } else {
246 None
247 };
248 VehicleProfile {
249 vin: sp.vin,
250 decoded_vin,
251 info: None,
252 spec: None,
253 supported_pids: std::collections::HashSet::new(),
254 }
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use obd2_core::protocol::enhanced::{Value, ReadingSource};
262 use std::time::Instant;
263
264 #[tokio::test]
265 async fn test_create_store() {
266 let store = SqliteStore::in_memory().unwrap();
267 let conn = store.conn.lock().unwrap();
268 assert!(conn.is_autocommit());
269 }
270
271 #[tokio::test]
272 async fn test_save_and_get_vehicle() {
273 let store = SqliteStore::in_memory().unwrap();
274 let profile = VehicleProfile {
275 vin: "1GCHK23224F000001".into(),
276 decoded_vin: None,
277 info: None,
278 spec: None,
279 supported_pids: std::collections::HashSet::new(),
280 };
281
282 store.save_vehicle(&profile).await.unwrap();
283 let retrieved = store.get_vehicle("1GCHK23224F000001").await.unwrap();
284 assert!(retrieved.is_some());
285 assert_eq!(retrieved.unwrap().vin, "1GCHK23224F000001");
286 }
287
288 #[tokio::test]
289 async fn test_get_vehicle_not_found() {
290 let store = SqliteStore::in_memory().unwrap();
291 let result = store.get_vehicle("NONEXISTENT").await.unwrap();
292 assert!(result.is_none());
293 }
294
295 #[tokio::test]
296 async fn test_save_reading() {
297 let store = SqliteStore::in_memory().unwrap();
298 let reading = Reading {
299 value: Value::Scalar(680.0),
300 unit: "RPM",
301 timestamp: Instant::now(),
302 raw_bytes: vec![0x0A, 0xA0],
303 source: ReadingSource::Live,
304 };
305
306 store.save_reading("1GCHK23224F000001", Pid::ENGINE_RPM, &reading).await.unwrap();
307
308 let conn = store.conn.lock().unwrap();
310 let count: i64 = conn.query_row(
311 "SELECT COUNT(*) FROM readings WHERE vin = '1GCHK23224F000001'",
312 [],
313 |row| row.get(0),
314 ).unwrap();
315 assert_eq!(count, 1);
316 }
317
318 #[tokio::test]
319 async fn test_save_dtc_event() {
320 let store = SqliteStore::in_memory().unwrap();
321 let dtcs = vec![
322 Dtc::from_code("P0420"),
323 Dtc::from_code("P0171"),
324 ];
325
326 store.save_dtc_event("1GCHK23224F000001", &dtcs).await.unwrap();
327
328 let conn = store.conn.lock().unwrap();
329 let codes: String = conn.query_row(
330 "SELECT dtc_codes FROM dtc_events WHERE vin = '1GCHK23224F000001'",
331 [],
332 |row| row.get(0),
333 ).unwrap();
334 assert!(codes.contains("P0420"));
335 assert!(codes.contains("P0171"));
336 }
337
338 #[tokio::test]
339 async fn test_save_and_get_thresholds() {
340 let store = SqliteStore::in_memory().unwrap();
341 let ts = ThresholdSet {
342 engine: vec![],
343 transmission: vec![],
344 };
345
346 store.save_thresholds("TEST_VIN_12345678", &ts).await.unwrap();
347 let retrieved = store.get_thresholds("TEST_VIN_12345678").await.unwrap();
348 assert!(retrieved.is_some());
349 }
350
351 #[tokio::test]
352 async fn test_upsert_vehicle() {
353 let store = SqliteStore::in_memory().unwrap();
354 let profile = VehicleProfile {
355 vin: "1GCHK23224F000001".into(),
356 decoded_vin: None,
357 info: None,
358 spec: None,
359 supported_pids: std::collections::HashSet::new(),
360 };
361
362 store.save_vehicle(&profile).await.unwrap();
364 store.save_vehicle(&profile).await.unwrap();
365
366 let conn = store.conn.lock().unwrap();
367 let count: i64 = conn.query_row(
368 "SELECT COUNT(*) FROM vehicles",
369 [],
370 |row| row.get(0),
371 ).unwrap();
372 assert_eq!(count, 1);
373 }
374}