Skip to main content

trace_share_core/
state.rs

1use anyhow::Result;
2use chrono::Utc;
3use rusqlite::{Connection, params};
4use std::{fs, path::PathBuf};
5
6use crate::config::data_dir;
7use crate::consent::ConsentState;
8
9pub struct StateStore {
10    conn: Connection,
11}
12
13#[derive(Debug, Clone)]
14pub struct RunStats {
15    pub run_id: String,
16    pub scanned_files: usize,
17    pub produced_docs: usize,
18    pub uploaded_docs: usize,
19    pub redactions: usize,
20    pub errors: usize,
21}
22
23#[derive(Debug, Clone)]
24pub struct RevocationRecord {
25    pub episode_id: String,
26    pub reason: Option<String>,
27    pub revoked_at: String,
28}
29
30impl StateStore {
31    pub fn open_default() -> Result<Self> {
32        let dir = data_dir()?;
33        fs::create_dir_all(&dir)?;
34        let path = dir.join("state.sqlite");
35        Self::open(path)
36    }
37
38    pub fn open(path: PathBuf) -> Result<Self> {
39        let conn = Connection::open(path)?;
40        let store = Self { conn };
41        store.init_schema()?;
42        Ok(store)
43    }
44
45    fn init_schema(&self) -> Result<()> {
46        self.conn.execute_batch(
47            "
48            CREATE TABLE IF NOT EXISTS sources (
49              source_name TEXT PRIMARY KEY,
50              last_scan_ts TEXT,
51              cursor_json TEXT
52            );
53
54            CREATE TABLE IF NOT EXISTS files (
55              path TEXT PRIMARY KEY,
56              fingerprint TEXT,
57              last_seen_ts TEXT
58            );
59
60            CREATE TABLE IF NOT EXISTS uploads (
61              doc_id TEXT PRIMARY KEY,
62              content_hash TEXT NOT NULL,
63              source_name TEXT NOT NULL,
64              session_id TEXT,
65              ts_start TEXT,
66              ts_end TEXT,
67              uploaded_at TEXT NOT NULL
68            );
69
70            CREATE TABLE IF NOT EXISTS runs (
71              run_id TEXT PRIMARY KEY,
72              started_at TEXT,
73              finished_at TEXT,
74              scanned_files INT,
75              produced_docs INT,
76              uploaded_docs INT,
77              redactions INT,
78              errors INT
79            );
80
81            CREATE TABLE IF NOT EXISTS consent_state (
82              id INTEGER PRIMARY KEY CHECK(id=1),
83              accepted_at TEXT,
84              consent_version TEXT,
85              license TEXT,
86              public_searchable INTEGER,
87              trainable INTEGER,
88              ack_sanitization INTEGER,
89              ack_public_search INTEGER,
90              ack_training_release INTEGER
91            );
92
93            CREATE TABLE IF NOT EXISTS episodes (
94              id TEXT PRIMARY KEY,
95              content_hash TEXT NOT NULL,
96              source_tool TEXT NOT NULL,
97              session_id_hash TEXT NOT NULL,
98              r2_object_key TEXT,
99              indexed_at TEXT,
100              uploaded_at TEXT,
101              consent_version TEXT,
102              license TEXT
103            );
104
105            CREATE TABLE IF NOT EXISTS revocations (
106              episode_id TEXT PRIMARY KEY,
107              reason TEXT,
108              revoked_at TEXT,
109              pushed_at TEXT,
110              push_status TEXT
111            );
112
113            CREATE TABLE IF NOT EXISTS snapshots (
114              version TEXT PRIMARY KEY,
115              built_at TEXT,
116              train_count INT,
117              val_count INT,
118              manifest_hash TEXT,
119              published_at TEXT
120            );
121            ",
122        )?;
123        Ok(())
124    }
125
126    pub fn has_upload(&self, doc_id: &str) -> Result<bool> {
127        let mut stmt = self
128            .conn
129            .prepare("SELECT 1 FROM uploads WHERE doc_id = ?1 LIMIT 1")?;
130        let mut rows = stmt.query(params![doc_id])?;
131        Ok(rows.next()?.is_some())
132    }
133
134    pub fn insert_upload(
135        &self,
136        doc_id: &str,
137        content_hash: &str,
138        source_name: &str,
139        session_id: &str,
140        ts_start: &str,
141        ts_end: &str,
142    ) -> Result<()> {
143        self.conn.execute(
144            "INSERT OR IGNORE INTO uploads (doc_id, content_hash, source_name, session_id, ts_start, ts_end, uploaded_at)
145             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
146            params![
147                doc_id,
148                content_hash,
149                source_name,
150                session_id,
151                ts_start,
152                ts_end,
153                Utc::now().to_rfc3339(),
154            ],
155        )?;
156        Ok(())
157    }
158
159    pub fn upsert_source_cursor(&self, source_name: &str, cursor_json: &str) -> Result<()> {
160        self.conn.execute(
161            "INSERT INTO sources (source_name, last_scan_ts, cursor_json)
162             VALUES (?1, ?2, ?3)
163             ON CONFLICT(source_name) DO UPDATE SET
164               last_scan_ts=excluded.last_scan_ts,
165               cursor_json=excluded.cursor_json",
166            params![source_name, Utc::now().to_rfc3339(), cursor_json],
167        )?;
168        Ok(())
169    }
170
171    pub fn source_cursor(&self, source_name: &str) -> Result<Option<String>> {
172        let mut stmt = self
173            .conn
174            .prepare("SELECT cursor_json FROM sources WHERE source_name = ?1")?;
175        let mut rows = stmt.query(params![source_name])?;
176        if let Some(row) = rows.next()? {
177            let c: Option<String> = row.get(0)?;
178            Ok(c)
179        } else {
180            Ok(None)
181        }
182    }
183
184    pub fn upsert_file_fingerprint(&self, path: &str, fingerprint: &str) -> Result<()> {
185        self.conn.execute(
186            "INSERT INTO files (path, fingerprint, last_seen_ts)
187             VALUES (?1, ?2, ?3)
188             ON CONFLICT(path) DO UPDATE SET
189               fingerprint=excluded.fingerprint,
190               last_seen_ts=excluded.last_seen_ts",
191            params![path, fingerprint, Utc::now().to_rfc3339()],
192        )?;
193        Ok(())
194    }
195
196    pub fn file_fingerprint(&self, path: &str) -> Result<Option<String>> {
197        let mut stmt = self
198            .conn
199            .prepare("SELECT fingerprint FROM files WHERE path = ?1")?;
200        let mut rows = stmt.query(params![path])?;
201        if let Some(row) = rows.next()? {
202            let fp: String = row.get(0)?;
203            Ok(Some(fp))
204        } else {
205            Ok(None)
206        }
207    }
208
209    pub fn start_run(&self, run_id: &str) -> Result<()> {
210        self.conn.execute(
211            "INSERT INTO runs (run_id, started_at) VALUES (?1, ?2)",
212            params![run_id, Utc::now().to_rfc3339()],
213        )?;
214        Ok(())
215    }
216
217    pub fn finish_run(&self, stats: &RunStats) -> Result<()> {
218        self.conn.execute(
219            "UPDATE runs SET
220               finished_at=?2,
221               scanned_files=?3,
222               produced_docs=?4,
223               uploaded_docs=?5,
224               redactions=?6,
225               errors=?7
226             WHERE run_id=?1",
227            params![
228                stats.run_id,
229                Utc::now().to_rfc3339(),
230                stats.scanned_files as i64,
231                stats.produced_docs as i64,
232                stats.uploaded_docs as i64,
233                stats.redactions as i64,
234                stats.errors as i64,
235            ],
236        )?;
237        Ok(())
238    }
239
240    pub fn totals_by_source(&self) -> Result<Vec<(String, i64)>> {
241        let mut stmt = self.conn.prepare(
242            "SELECT source_name, COUNT(*) as c FROM uploads GROUP BY source_name ORDER BY source_name",
243        )?;
244        let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
245        Ok(rows.flatten().collect())
246    }
247
248    pub fn episode_totals_by_source(&self) -> Result<Vec<(String, i64)>> {
249        let mut stmt = self.conn.prepare(
250            "SELECT source_tool, COUNT(*) as c FROM episodes GROUP BY source_tool ORDER BY source_tool",
251        )?;
252        let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
253        Ok(rows.flatten().collect())
254    }
255
256    pub fn reset_all(&self) -> Result<()> {
257        self.conn.execute("DELETE FROM sources", [])?;
258        self.conn.execute("DELETE FROM files", [])?;
259        self.conn.execute("DELETE FROM uploads", [])?;
260        self.conn.execute("DELETE FROM consent_state", [])?;
261        self.conn.execute("DELETE FROM episodes", [])?;
262        self.conn.execute("DELETE FROM revocations", [])?;
263        self.conn.execute("DELETE FROM snapshots", [])?;
264        Ok(())
265    }
266
267    pub fn reset_source(&self, source_name: &str) -> Result<()> {
268        self.conn.execute(
269            "DELETE FROM sources WHERE source_name=?1",
270            params![source_name],
271        )?;
272        self.conn.execute(
273            "DELETE FROM uploads WHERE source_name=?1",
274            params![source_name],
275        )?;
276        Ok(())
277    }
278
279    pub fn upsert_consent_state(&self, state: &ConsentState) -> Result<()> {
280        self.conn.execute(
281            "INSERT INTO consent_state (
282                id, accepted_at, consent_version, license, public_searchable, trainable,
283                ack_sanitization, ack_public_search, ack_training_release
284            ) VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
285            ON CONFLICT(id) DO UPDATE SET
286              accepted_at=excluded.accepted_at,
287              consent_version=excluded.consent_version,
288              license=excluded.license,
289              public_searchable=excluded.public_searchable,
290              trainable=excluded.trainable,
291              ack_sanitization=excluded.ack_sanitization,
292              ack_public_search=excluded.ack_public_search,
293              ack_training_release=excluded.ack_training_release",
294            params![
295                state.accepted_at,
296                state.consent_version,
297                state.license,
298                state.public_searchable as i32,
299                state.trainable as i32,
300                state.ack_sanitization as i32,
301                state.ack_public_search as i32,
302                state.ack_training_release as i32,
303            ],
304        )?;
305        Ok(())
306    }
307
308    pub fn consent_state(&self) -> Result<Option<ConsentState>> {
309        let mut stmt = self.conn.prepare(
310            "SELECT accepted_at, consent_version, license, public_searchable, trainable,
311                    ack_sanitization, ack_public_search, ack_training_release
312             FROM consent_state WHERE id=1",
313        )?;
314        let mut rows = stmt.query([])?;
315        if let Some(row) = rows.next()? {
316            Ok(Some(ConsentState {
317                accepted_at: row.get(0)?,
318                consent_version: row.get(1)?,
319                license: row.get(2)?,
320                public_searchable: row.get::<_, i64>(3)? != 0,
321                trainable: row.get::<_, i64>(4)? != 0,
322                ack_sanitization: row.get::<_, i64>(5)? != 0,
323                ack_public_search: row.get::<_, i64>(6)? != 0,
324                ack_training_release: row.get::<_, i64>(7)? != 0,
325            }))
326        } else {
327            Ok(None)
328        }
329    }
330
331    #[allow(clippy::too_many_arguments)]
332    pub fn upsert_episode_upload(
333        &self,
334        id: &str,
335        content_hash: &str,
336        source_tool: &str,
337        session_id_hash: &str,
338        r2_object_key: &str,
339        consent_version: &str,
340        license: &str,
341    ) -> Result<()> {
342        self.conn.execute(
343            "INSERT INTO episodes (
344                id, content_hash, source_tool, session_id_hash, r2_object_key,
345                indexed_at, uploaded_at, consent_version, license
346            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, ?8)
347            ON CONFLICT(id) DO UPDATE SET
348              r2_object_key=excluded.r2_object_key,
349              indexed_at=excluded.indexed_at,
350              uploaded_at=excluded.uploaded_at,
351              consent_version=excluded.consent_version,
352              license=excluded.license",
353            params![
354                id,
355                content_hash,
356                source_tool,
357                session_id_hash,
358                r2_object_key,
359                Utc::now().to_rfc3339(),
360                consent_version,
361                license,
362            ],
363        )?;
364        Ok(())
365    }
366
367    pub fn has_episode_upload(&self, id: &str) -> Result<bool> {
368        let mut stmt = self
369            .conn
370            .prepare("SELECT 1 FROM episodes WHERE id=?1 LIMIT 1")?;
371        let mut rows = stmt.query(params![id])?;
372        Ok(rows.next()?.is_some())
373    }
374
375    pub fn upsert_revocation(
376        &self,
377        episode_id: &str,
378        reason: Option<&str>,
379        revoked_at: &str,
380        push_status: &str,
381    ) -> Result<()> {
382        self.conn.execute(
383            "INSERT INTO revocations (episode_id, reason, revoked_at, pushed_at, push_status)
384             VALUES (?1, ?2, ?3, NULL, ?4)
385             ON CONFLICT(episode_id) DO UPDATE SET
386               reason=excluded.reason,
387               revoked_at=excluded.revoked_at,
388               push_status=excluded.push_status",
389            params![episode_id, reason, revoked_at, push_status],
390        )?;
391        Ok(())
392    }
393
394    pub fn pending_revocations(&self) -> Result<Vec<RevocationRecord>> {
395        let mut stmt = self.conn.prepare(
396            "SELECT episode_id, reason, revoked_at
397             FROM revocations
398             WHERE push_status IS NULL OR push_status != 'pushed'
399             ORDER BY revoked_at ASC",
400        )?;
401        let rows = stmt.query_map([], |row| {
402            Ok(RevocationRecord {
403                episode_id: row.get(0)?,
404                reason: row.get(1)?,
405                revoked_at: row.get(2)?,
406            })
407        })?;
408        Ok(rows.flatten().collect())
409    }
410
411    pub fn mark_revocation_pushed(&self, episode_id: &str) -> Result<()> {
412        self.conn.execute(
413            "UPDATE revocations SET push_status='pushed', pushed_at=?2 WHERE episode_id=?1",
414            params![episode_id, Utc::now().to_rfc3339()],
415        )?;
416        Ok(())
417    }
418
419    pub fn all_revoked_ids(&self) -> Result<Vec<String>> {
420        let mut stmt = self
421            .conn
422            .prepare("SELECT episode_id FROM revocations ORDER BY revoked_at ASC")?;
423        let rows = stmt.query_map([], |row| row.get(0))?;
424        Ok(rows.flatten().collect())
425    }
426
427    pub fn record_snapshot(
428        &self,
429        version: &str,
430        train_count: usize,
431        val_count: usize,
432        manifest_hash: &str,
433    ) -> Result<()> {
434        self.conn.execute(
435            "INSERT INTO snapshots(version, built_at, train_count, val_count, manifest_hash, published_at)
436             VALUES (?1, ?2, ?3, ?4, ?5, NULL)
437             ON CONFLICT(version) DO UPDATE SET
438               built_at=excluded.built_at,
439               train_count=excluded.train_count,
440               val_count=excluded.val_count,
441               manifest_hash=excluded.manifest_hash",
442            params![
443                version,
444                Utc::now().to_rfc3339(),
445                train_count as i64,
446                val_count as i64,
447                manifest_hash
448            ],
449        )?;
450        Ok(())
451    }
452
453    pub fn mark_snapshot_published(&self, version: &str) -> Result<()> {
454        self.conn.execute(
455            "UPDATE snapshots SET published_at=?2 WHERE version=?1",
456            params![version, Utc::now().to_rfc3339()],
457        )?;
458        Ok(())
459    }
460}