1use anyhow::Result;
2use chrono::Utc;
3use rusqlite::{Connection, params};
4use std::{fs, path::PathBuf};
5
6use crate::config::data_dir;
7use crate::consent::ConsentState;
8
9pub struct StateStore {
10 conn: Connection,
11}
12
13#[derive(Debug, Clone)]
14pub struct RunStats {
15 pub run_id: String,
16 pub scanned_files: usize,
17 pub produced_docs: usize,
18 pub uploaded_docs: usize,
19 pub redactions: usize,
20 pub errors: usize,
21}
22
23#[derive(Debug, Clone)]
24pub struct RevocationRecord {
25 pub episode_id: String,
26 pub reason: Option<String>,
27 pub revoked_at: String,
28}
29
30impl StateStore {
31 pub fn open_default() -> Result<Self> {
32 let dir = data_dir()?;
33 fs::create_dir_all(&dir)?;
34 let path = dir.join("state.sqlite");
35 Self::open(path)
36 }
37
38 pub fn open(path: PathBuf) -> Result<Self> {
39 let conn = Connection::open(path)?;
40 let store = Self { conn };
41 store.init_schema()?;
42 Ok(store)
43 }
44
45 fn init_schema(&self) -> Result<()> {
46 self.conn.execute_batch(
47 "
48 CREATE TABLE IF NOT EXISTS sources (
49 source_name TEXT PRIMARY KEY,
50 last_scan_ts TEXT,
51 cursor_json TEXT
52 );
53
54 CREATE TABLE IF NOT EXISTS files (
55 path TEXT PRIMARY KEY,
56 fingerprint TEXT,
57 last_seen_ts TEXT
58 );
59
60 CREATE TABLE IF NOT EXISTS uploads (
61 doc_id TEXT PRIMARY KEY,
62 content_hash TEXT NOT NULL,
63 source_name TEXT NOT NULL,
64 session_id TEXT,
65 ts_start TEXT,
66 ts_end TEXT,
67 uploaded_at TEXT NOT NULL
68 );
69
70 CREATE TABLE IF NOT EXISTS runs (
71 run_id TEXT PRIMARY KEY,
72 started_at TEXT,
73 finished_at TEXT,
74 scanned_files INT,
75 produced_docs INT,
76 uploaded_docs INT,
77 redactions INT,
78 errors INT
79 );
80
81 CREATE TABLE IF NOT EXISTS consent_state (
82 id INTEGER PRIMARY KEY CHECK(id=1),
83 accepted_at TEXT,
84 consent_version TEXT,
85 license TEXT,
86 public_searchable INTEGER,
87 trainable INTEGER,
88 ack_sanitization INTEGER,
89 ack_public_search INTEGER,
90 ack_training_release INTEGER
91 );
92
93 CREATE TABLE IF NOT EXISTS episodes (
94 id TEXT PRIMARY KEY,
95 content_hash TEXT NOT NULL,
96 source_tool TEXT NOT NULL,
97 session_id_hash TEXT NOT NULL,
98 r2_object_key TEXT,
99 indexed_at TEXT,
100 uploaded_at TEXT,
101 consent_version TEXT,
102 license TEXT
103 );
104
105 CREATE TABLE IF NOT EXISTS revocations (
106 episode_id TEXT PRIMARY KEY,
107 reason TEXT,
108 revoked_at TEXT,
109 pushed_at TEXT,
110 push_status TEXT
111 );
112
113 CREATE TABLE IF NOT EXISTS snapshots (
114 version TEXT PRIMARY KEY,
115 built_at TEXT,
116 train_count INT,
117 val_count INT,
118 manifest_hash TEXT,
119 published_at TEXT
120 );
121 ",
122 )?;
123 Ok(())
124 }
125
126 pub fn has_upload(&self, doc_id: &str) -> Result<bool> {
127 let mut stmt = self
128 .conn
129 .prepare("SELECT 1 FROM uploads WHERE doc_id = ?1 LIMIT 1")?;
130 let mut rows = stmt.query(params![doc_id])?;
131 Ok(rows.next()?.is_some())
132 }
133
134 pub fn insert_upload(
135 &self,
136 doc_id: &str,
137 content_hash: &str,
138 source_name: &str,
139 session_id: &str,
140 ts_start: &str,
141 ts_end: &str,
142 ) -> Result<()> {
143 self.conn.execute(
144 "INSERT OR IGNORE INTO uploads (doc_id, content_hash, source_name, session_id, ts_start, ts_end, uploaded_at)
145 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
146 params![
147 doc_id,
148 content_hash,
149 source_name,
150 session_id,
151 ts_start,
152 ts_end,
153 Utc::now().to_rfc3339(),
154 ],
155 )?;
156 Ok(())
157 }
158
159 pub fn upsert_source_cursor(&self, source_name: &str, cursor_json: &str) -> Result<()> {
160 self.conn.execute(
161 "INSERT INTO sources (source_name, last_scan_ts, cursor_json)
162 VALUES (?1, ?2, ?3)
163 ON CONFLICT(source_name) DO UPDATE SET
164 last_scan_ts=excluded.last_scan_ts,
165 cursor_json=excluded.cursor_json",
166 params![source_name, Utc::now().to_rfc3339(), cursor_json],
167 )?;
168 Ok(())
169 }
170
171 pub fn source_cursor(&self, source_name: &str) -> Result<Option<String>> {
172 let mut stmt = self
173 .conn
174 .prepare("SELECT cursor_json FROM sources WHERE source_name = ?1")?;
175 let mut rows = stmt.query(params![source_name])?;
176 if let Some(row) = rows.next()? {
177 let c: Option<String> = row.get(0)?;
178 Ok(c)
179 } else {
180 Ok(None)
181 }
182 }
183
184 pub fn upsert_file_fingerprint(&self, path: &str, fingerprint: &str) -> Result<()> {
185 self.conn.execute(
186 "INSERT INTO files (path, fingerprint, last_seen_ts)
187 VALUES (?1, ?2, ?3)
188 ON CONFLICT(path) DO UPDATE SET
189 fingerprint=excluded.fingerprint,
190 last_seen_ts=excluded.last_seen_ts",
191 params![path, fingerprint, Utc::now().to_rfc3339()],
192 )?;
193 Ok(())
194 }
195
196 pub fn file_fingerprint(&self, path: &str) -> Result<Option<String>> {
197 let mut stmt = self
198 .conn
199 .prepare("SELECT fingerprint FROM files WHERE path = ?1")?;
200 let mut rows = stmt.query(params![path])?;
201 if let Some(row) = rows.next()? {
202 let fp: String = row.get(0)?;
203 Ok(Some(fp))
204 } else {
205 Ok(None)
206 }
207 }
208
209 pub fn start_run(&self, run_id: &str) -> Result<()> {
210 self.conn.execute(
211 "INSERT INTO runs (run_id, started_at) VALUES (?1, ?2)",
212 params![run_id, Utc::now().to_rfc3339()],
213 )?;
214 Ok(())
215 }
216
217 pub fn finish_run(&self, stats: &RunStats) -> Result<()> {
218 self.conn.execute(
219 "UPDATE runs SET
220 finished_at=?2,
221 scanned_files=?3,
222 produced_docs=?4,
223 uploaded_docs=?5,
224 redactions=?6,
225 errors=?7
226 WHERE run_id=?1",
227 params![
228 stats.run_id,
229 Utc::now().to_rfc3339(),
230 stats.scanned_files as i64,
231 stats.produced_docs as i64,
232 stats.uploaded_docs as i64,
233 stats.redactions as i64,
234 stats.errors as i64,
235 ],
236 )?;
237 Ok(())
238 }
239
240 pub fn totals_by_source(&self) -> Result<Vec<(String, i64)>> {
241 let mut stmt = self.conn.prepare(
242 "SELECT source_name, COUNT(*) as c FROM uploads GROUP BY source_name ORDER BY source_name",
243 )?;
244 let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
245 Ok(rows.flatten().collect())
246 }
247
248 pub fn episode_totals_by_source(&self) -> Result<Vec<(String, i64)>> {
249 let mut stmt = self.conn.prepare(
250 "SELECT source_tool, COUNT(*) as c FROM episodes GROUP BY source_tool ORDER BY source_tool",
251 )?;
252 let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
253 Ok(rows.flatten().collect())
254 }
255
256 pub fn reset_all(&self) -> Result<()> {
257 self.conn.execute("DELETE FROM sources", [])?;
258 self.conn.execute("DELETE FROM files", [])?;
259 self.conn.execute("DELETE FROM uploads", [])?;
260 self.conn.execute("DELETE FROM consent_state", [])?;
261 self.conn.execute("DELETE FROM episodes", [])?;
262 self.conn.execute("DELETE FROM revocations", [])?;
263 self.conn.execute("DELETE FROM snapshots", [])?;
264 Ok(())
265 }
266
267 pub fn reset_source(&self, source_name: &str) -> Result<()> {
268 self.conn.execute(
269 "DELETE FROM sources WHERE source_name=?1",
270 params![source_name],
271 )?;
272 self.conn.execute(
273 "DELETE FROM uploads WHERE source_name=?1",
274 params![source_name],
275 )?;
276 Ok(())
277 }
278
279 pub fn upsert_consent_state(&self, state: &ConsentState) -> Result<()> {
280 self.conn.execute(
281 "INSERT INTO consent_state (
282 id, accepted_at, consent_version, license, public_searchable, trainable,
283 ack_sanitization, ack_public_search, ack_training_release
284 ) VALUES (1, ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
285 ON CONFLICT(id) DO UPDATE SET
286 accepted_at=excluded.accepted_at,
287 consent_version=excluded.consent_version,
288 license=excluded.license,
289 public_searchable=excluded.public_searchable,
290 trainable=excluded.trainable,
291 ack_sanitization=excluded.ack_sanitization,
292 ack_public_search=excluded.ack_public_search,
293 ack_training_release=excluded.ack_training_release",
294 params![
295 state.accepted_at,
296 state.consent_version,
297 state.license,
298 state.public_searchable as i32,
299 state.trainable as i32,
300 state.ack_sanitization as i32,
301 state.ack_public_search as i32,
302 state.ack_training_release as i32,
303 ],
304 )?;
305 Ok(())
306 }
307
308 pub fn consent_state(&self) -> Result<Option<ConsentState>> {
309 let mut stmt = self.conn.prepare(
310 "SELECT accepted_at, consent_version, license, public_searchable, trainable,
311 ack_sanitization, ack_public_search, ack_training_release
312 FROM consent_state WHERE id=1",
313 )?;
314 let mut rows = stmt.query([])?;
315 if let Some(row) = rows.next()? {
316 Ok(Some(ConsentState {
317 accepted_at: row.get(0)?,
318 consent_version: row.get(1)?,
319 license: row.get(2)?,
320 public_searchable: row.get::<_, i64>(3)? != 0,
321 trainable: row.get::<_, i64>(4)? != 0,
322 ack_sanitization: row.get::<_, i64>(5)? != 0,
323 ack_public_search: row.get::<_, i64>(6)? != 0,
324 ack_training_release: row.get::<_, i64>(7)? != 0,
325 }))
326 } else {
327 Ok(None)
328 }
329 }
330
331 #[allow(clippy::too_many_arguments)]
332 pub fn upsert_episode_upload(
333 &self,
334 id: &str,
335 content_hash: &str,
336 source_tool: &str,
337 session_id_hash: &str,
338 r2_object_key: &str,
339 consent_version: &str,
340 license: &str,
341 ) -> Result<()> {
342 self.conn.execute(
343 "INSERT INTO episodes (
344 id, content_hash, source_tool, session_id_hash, r2_object_key,
345 indexed_at, uploaded_at, consent_version, license
346 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, ?8)
347 ON CONFLICT(id) DO UPDATE SET
348 r2_object_key=excluded.r2_object_key,
349 indexed_at=excluded.indexed_at,
350 uploaded_at=excluded.uploaded_at,
351 consent_version=excluded.consent_version,
352 license=excluded.license",
353 params![
354 id,
355 content_hash,
356 source_tool,
357 session_id_hash,
358 r2_object_key,
359 Utc::now().to_rfc3339(),
360 consent_version,
361 license,
362 ],
363 )?;
364 Ok(())
365 }
366
367 pub fn has_episode_upload(&self, id: &str) -> Result<bool> {
368 let mut stmt = self
369 .conn
370 .prepare("SELECT 1 FROM episodes WHERE id=?1 LIMIT 1")?;
371 let mut rows = stmt.query(params![id])?;
372 Ok(rows.next()?.is_some())
373 }
374
375 pub fn upsert_revocation(
376 &self,
377 episode_id: &str,
378 reason: Option<&str>,
379 revoked_at: &str,
380 push_status: &str,
381 ) -> Result<()> {
382 self.conn.execute(
383 "INSERT INTO revocations (episode_id, reason, revoked_at, pushed_at, push_status)
384 VALUES (?1, ?2, ?3, NULL, ?4)
385 ON CONFLICT(episode_id) DO UPDATE SET
386 reason=excluded.reason,
387 revoked_at=excluded.revoked_at,
388 push_status=excluded.push_status",
389 params![episode_id, reason, revoked_at, push_status],
390 )?;
391 Ok(())
392 }
393
394 pub fn pending_revocations(&self) -> Result<Vec<RevocationRecord>> {
395 let mut stmt = self.conn.prepare(
396 "SELECT episode_id, reason, revoked_at
397 FROM revocations
398 WHERE push_status IS NULL OR push_status != 'pushed'
399 ORDER BY revoked_at ASC",
400 )?;
401 let rows = stmt.query_map([], |row| {
402 Ok(RevocationRecord {
403 episode_id: row.get(0)?,
404 reason: row.get(1)?,
405 revoked_at: row.get(2)?,
406 })
407 })?;
408 Ok(rows.flatten().collect())
409 }
410
411 pub fn mark_revocation_pushed(&self, episode_id: &str) -> Result<()> {
412 self.conn.execute(
413 "UPDATE revocations SET push_status='pushed', pushed_at=?2 WHERE episode_id=?1",
414 params![episode_id, Utc::now().to_rfc3339()],
415 )?;
416 Ok(())
417 }
418
419 pub fn all_revoked_ids(&self) -> Result<Vec<String>> {
420 let mut stmt = self
421 .conn
422 .prepare("SELECT episode_id FROM revocations ORDER BY revoked_at ASC")?;
423 let rows = stmt.query_map([], |row| row.get(0))?;
424 Ok(rows.flatten().collect())
425 }
426
427 pub fn record_snapshot(
428 &self,
429 version: &str,
430 train_count: usize,
431 val_count: usize,
432 manifest_hash: &str,
433 ) -> Result<()> {
434 self.conn.execute(
435 "INSERT INTO snapshots(version, built_at, train_count, val_count, manifest_hash, published_at)
436 VALUES (?1, ?2, ?3, ?4, ?5, NULL)
437 ON CONFLICT(version) DO UPDATE SET
438 built_at=excluded.built_at,
439 train_count=excluded.train_count,
440 val_count=excluded.val_count,
441 manifest_hash=excluded.manifest_hash",
442 params![
443 version,
444 Utc::now().to_rfc3339(),
445 train_count as i64,
446 val_count as i64,
447 manifest_hash
448 ],
449 )?;
450 Ok(())
451 }
452
453 pub fn mark_snapshot_published(&self, version: &str) -> Result<()> {
454 self.conn.execute(
455 "UPDATE snapshots SET published_at=?2 WHERE version=?1",
456 params![version, Utc::now().to_rfc3339()],
457 )?;
458 Ok(())
459 }
460}