1use std::path::Path;
15use std::sync::Mutex;
16use async_trait::async_trait;
17use rusqlite::{Connection, params};
18use obd2_core::error::Obd2Error;
19use obd2_core::protocol::pid::Pid;
20use obd2_core::protocol::enhanced::Reading;
21use obd2_core::protocol::dtc::Dtc;
22use obd2_core::store::{VehicleStore, SessionStore};
23use obd2_core::vehicle::{VehicleProfile, ThresholdSet};
24
25pub struct SqliteStore {
27 conn: Mutex<Connection>,
28}
29
30impl SqliteStore {
31 pub fn open(path: &Path) -> Result<Self, Obd2Error> {
33 let conn = Connection::open(path)
34 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
35 let store = Self { conn: Mutex::new(conn) };
36 store.create_tables()?;
37 Ok(store)
38 }
39
40 pub fn in_memory() -> Result<Self, Obd2Error> {
42 let conn = Connection::open_in_memory()
43 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
44 let store = Self { conn: Mutex::new(conn) };
45 store.create_tables()?;
46 Ok(store)
47 }
48
49 fn create_tables(&self) -> Result<(), Obd2Error> {
50 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
51 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
52 )))?;
53 conn.execute_batch(
54 "CREATE TABLE IF NOT EXISTS vehicles (
55 vin TEXT PRIMARY KEY,
56 data TEXT NOT NULL,
57 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
58 );
59
60 CREATE TABLE IF NOT EXISTS thresholds (
61 vin TEXT PRIMARY KEY,
62 data TEXT NOT NULL,
63 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
64 );
65
66 CREATE TABLE IF NOT EXISTS readings (
67 id INTEGER PRIMARY KEY AUTOINCREMENT,
68 vin TEXT NOT NULL,
69 pid_code INTEGER NOT NULL,
70 value REAL,
71 unit TEXT,
72 timestamp TEXT NOT NULL DEFAULT (datetime('now'))
73 );
74
75 CREATE TABLE IF NOT EXISTS dtc_events (
76 id INTEGER PRIMARY KEY AUTOINCREMENT,
77 vin TEXT NOT NULL,
78 dtc_codes TEXT NOT NULL,
79 timestamp TEXT NOT NULL DEFAULT (datetime('now'))
80 );
81
82 CREATE INDEX IF NOT EXISTS idx_readings_vin ON readings(vin);
83 CREATE INDEX IF NOT EXISTS idx_dtc_events_vin ON dtc_events(vin);"
84 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
85 Ok(())
86 }
87}
88
89impl std::fmt::Debug for SqliteStore {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("SqliteStore").finish()
92 }
93}
94
95#[async_trait]
96impl VehicleStore for SqliteStore {
97 async fn save_vehicle(&self, profile: &VehicleProfile) -> Result<(), Obd2Error> {
98 let vin = &profile.vin;
99 let data = serde_json::to_string(&SerializableProfile::from(profile))
100 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
101
102 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
103 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
104 )))?;
105 conn.execute(
106 "INSERT OR REPLACE INTO vehicles (vin, data) VALUES (?1, ?2)",
107 params![vin, data],
108 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
109
110 Ok(())
111 }
112
113 async fn get_vehicle(&self, vin: &str) -> Result<Option<VehicleProfile>, Obd2Error> {
114 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
115 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
116 )))?;
117 let mut stmt = conn.prepare(
118 "SELECT data FROM vehicles WHERE vin = ?1"
119 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
120
121 let result = stmt.query_row(params![vin], |row| {
122 let data: String = row.get(0)?;
123 Ok(data)
124 });
125
126 match result {
127 Ok(data) => {
128 let sp: SerializableProfile = serde_json::from_str(&data)
129 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
130 Ok(Some(sp.into()))
131 }
132 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
133 Err(e) => Err(Obd2Error::Other(Box::new(e))),
134 }
135 }
136
137 async fn save_thresholds(&self, vin: &str, thresholds: &ThresholdSet) -> Result<(), Obd2Error> {
138 let data = serde_json::to_string(thresholds)
139 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
140
141 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
142 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
143 )))?;
144 conn.execute(
145 "INSERT OR REPLACE INTO thresholds (vin, data) VALUES (?1, ?2)",
146 params![vin, data],
147 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
148
149 Ok(())
150 }
151
152 async fn get_thresholds(&self, vin: &str) -> Result<Option<ThresholdSet>, Obd2Error> {
153 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
154 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
155 )))?;
156 let mut stmt = conn.prepare(
157 "SELECT data FROM thresholds WHERE vin = ?1"
158 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
159
160 let result = stmt.query_row(params![vin], |row| {
161 let data: String = row.get(0)?;
162 Ok(data)
163 });
164
165 match result {
166 Ok(data) => {
167 let ts: ThresholdSet = serde_json::from_str(&data)
168 .map_err(|e| Obd2Error::Other(Box::new(e)))?;
169 Ok(Some(ts))
170 }
171 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
172 Err(e) => Err(Obd2Error::Other(Box::new(e))),
173 }
174 }
175}
176
177#[async_trait]
178impl SessionStore for SqliteStore {
179 async fn save_reading(&self, vin: &str, pid: Pid, reading: &Reading) -> Result<(), Obd2Error> {
180 let value = reading.value.as_f64().ok();
181 let unit = reading.unit;
182
183 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
184 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
185 )))?;
186 conn.execute(
187 "INSERT INTO readings (vin, pid_code, value, unit) VALUES (?1, ?2, ?3, ?4)",
188 params![vin, pid.0, value, unit],
189 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
190
191 Ok(())
192 }
193
194 async fn save_dtc_event(&self, vin: &str, dtcs: &[Dtc]) -> Result<(), Obd2Error> {
195 let codes: Vec<&str> = dtcs.iter().map(|d| d.code.as_str()).collect();
196 let codes_str = codes.join(",");
197
198 let conn = self.conn.lock().map_err(|e| Obd2Error::Other(Box::new(
199 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
200 )))?;
201 conn.execute(
202 "INSERT INTO dtc_events (vin, dtc_codes) VALUES (?1, ?2)",
203 params![vin, codes_str],
204 ).map_err(|e| Obd2Error::Other(Box::new(e)))?;
205
206 Ok(())
207 }
208}
209
210#[derive(serde::Serialize, serde::Deserialize)]
212struct SerializableProfile {
213 vin: String,
214 make: Option<String>,
215 model: Option<String>,
216 year: Option<i32>,
217 engine_code: Option<String>,
218}
219
220impl From<&VehicleProfile> for SerializableProfile {
221 fn from(p: &VehicleProfile) -> Self {
222 Self {
223 vin: p.vin.clone(),
224 make: None,
225 model: None,
226 year: None,
227 engine_code: p.spec.as_ref().map(|s| s.identity.engine.code.clone()),
228 }
229 }
230}
231
232impl From<SerializableProfile> for VehicleProfile {
233 fn from(sp: SerializableProfile) -> Self {
234 VehicleProfile {
235 vin: sp.vin,
236 info: None,
237 spec: None,
238 supported_pids: std::collections::HashSet::new(),
239 }
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use obd2_core::protocol::enhanced::{Value, ReadingSource};
247 use std::time::Instant;
248
249 #[tokio::test]
250 async fn test_create_store() {
251 let store = SqliteStore::in_memory().unwrap();
252 let conn = store.conn.lock().unwrap();
253 assert!(conn.is_autocommit());
254 }
255
256 #[tokio::test]
257 async fn test_save_and_get_vehicle() {
258 let store = SqliteStore::in_memory().unwrap();
259 let profile = VehicleProfile {
260 vin: "1GCHK23224F000001".into(),
261 info: None,
262 spec: None,
263 supported_pids: std::collections::HashSet::new(),
264 };
265
266 store.save_vehicle(&profile).await.unwrap();
267 let retrieved = store.get_vehicle("1GCHK23224F000001").await.unwrap();
268 assert!(retrieved.is_some());
269 assert_eq!(retrieved.unwrap().vin, "1GCHK23224F000001");
270 }
271
272 #[tokio::test]
273 async fn test_get_vehicle_not_found() {
274 let store = SqliteStore::in_memory().unwrap();
275 let result = store.get_vehicle("NONEXISTENT").await.unwrap();
276 assert!(result.is_none());
277 }
278
279 #[tokio::test]
280 async fn test_save_reading() {
281 let store = SqliteStore::in_memory().unwrap();
282 let reading = Reading {
283 value: Value::Scalar(680.0),
284 unit: "RPM",
285 timestamp: Instant::now(),
286 raw_bytes: vec![0x0A, 0xA0],
287 source: ReadingSource::Live,
288 };
289
290 store.save_reading("1GCHK23224F000001", Pid::ENGINE_RPM, &reading).await.unwrap();
291
292 let conn = store.conn.lock().unwrap();
294 let count: i64 = conn.query_row(
295 "SELECT COUNT(*) FROM readings WHERE vin = '1GCHK23224F000001'",
296 [],
297 |row| row.get(0),
298 ).unwrap();
299 assert_eq!(count, 1);
300 }
301
302 #[tokio::test]
303 async fn test_save_dtc_event() {
304 let store = SqliteStore::in_memory().unwrap();
305 let dtcs = vec![
306 Dtc::from_code("P0420"),
307 Dtc::from_code("P0171"),
308 ];
309
310 store.save_dtc_event("1GCHK23224F000001", &dtcs).await.unwrap();
311
312 let conn = store.conn.lock().unwrap();
313 let codes: String = conn.query_row(
314 "SELECT dtc_codes FROM dtc_events WHERE vin = '1GCHK23224F000001'",
315 [],
316 |row| row.get(0),
317 ).unwrap();
318 assert!(codes.contains("P0420"));
319 assert!(codes.contains("P0171"));
320 }
321
322 #[tokio::test]
323 async fn test_save_and_get_thresholds() {
324 let store = SqliteStore::in_memory().unwrap();
325 let ts = ThresholdSet {
326 engine: vec![],
327 transmission: vec![],
328 };
329
330 store.save_thresholds("TEST_VIN_12345678", &ts).await.unwrap();
331 let retrieved = store.get_thresholds("TEST_VIN_12345678").await.unwrap();
332 assert!(retrieved.is_some());
333 }
334
335 #[tokio::test]
336 async fn test_upsert_vehicle() {
337 let store = SqliteStore::in_memory().unwrap();
338 let profile = VehicleProfile {
339 vin: "1GCHK23224F000001".into(),
340 info: None,
341 spec: None,
342 supported_pids: std::collections::HashSet::new(),
343 };
344
345 store.save_vehicle(&profile).await.unwrap();
347 store.save_vehicle(&profile).await.unwrap();
348
349 let conn = store.conn.lock().unwrap();
350 let count: i64 = conn.query_row(
351 "SELECT COUNT(*) FROM vehicles",
352 [],
353 |row| row.get(0),
354 ).unwrap();
355 assert_eq!(count, 1);
356 }
357}