1use anyhow::Context;
2use rusqlite::Connection;
3use std::path::Path;
4
5const MIGRATION_001: &str = r#"
6CREATE TABLE IF NOT EXISTS tasks (
7 task_id TEXT PRIMARY KEY,
8 title TEXT NOT NULL,
9 status TEXT NOT NULL,
10 project_hash TEXT NOT NULL,
11 opened_at TEXT NOT NULL,
12 closed_at TEXT,
13 last_event_at TEXT NOT NULL
14);
15CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
16
17CREATE TABLE IF NOT EXISTS events_index (
18 event_id TEXT PRIMARY KEY,
19 task_id TEXT NOT NULL,
20 type TEXT NOT NULL,
21 timestamp TEXT NOT NULL,
22 confidence REAL,
23 status TEXT NOT NULL
24);
25CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
26
27CREATE TABLE IF NOT EXISTS decisions (
28 decision_id TEXT PRIMARY KEY,
29 task_id TEXT NOT NULL,
30 text TEXT NOT NULL,
31 status TEXT NOT NULL,
32 superseded_by TEXT
33);
34
35CREATE TABLE IF NOT EXISTS evidence (
36 evidence_id TEXT PRIMARY KEY,
37 task_id TEXT NOT NULL,
38 text TEXT NOT NULL,
39 strength TEXT NOT NULL,
40 refers_to_decision_id TEXT
41);
42
43CREATE TABLE IF NOT EXISTS task_pack_cache (
44 task_id TEXT NOT NULL,
45 mode TEXT NOT NULL,
46 text TEXT NOT NULL,
47 generated_at TEXT NOT NULL,
48 source_event_count INTEGER NOT NULL,
49 PRIMARY KEY (task_id, mode)
50);
51
52CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
53 task_id UNINDEXED,
54 event_id UNINDEXED,
55 text,
56 type
57);
58"#;
59
60use crate::event::{Event, EventType};
61
62pub fn upsert_task_from_event(
63 conn: &Connection,
64 event: &Event,
65 project_hash: &str,
66) -> anyhow::Result<()> {
67 match event.event_type {
68 EventType::Open => {
69 let title = event
70 .meta
71 .get("title")
72 .and_then(|v| v.as_str())
73 .unwrap_or(&event.text)
74 .to_string();
75 conn.execute(
76 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
77 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
78 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
79 rusqlite::params![event.task_id, title, project_hash, event.timestamp],
80 )?;
81 }
82 EventType::Close => {
83 conn.execute(
84 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
85 rusqlite::params![event.task_id, event.timestamp],
86 )?;
87 }
88 EventType::Reopen => {
89 conn.execute(
90 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
91 rusqlite::params![event.task_id, event.timestamp],
92 )?;
93 }
94 _ => {
95 conn.execute(
96 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
97 rusqlite::params![event.task_id, event.timestamp],
98 )?;
99 }
100 }
101 Ok(())
102}
103
104use std::io::BufRead;
105
106pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
107 let dir = state_dir.as_ref();
108 if !dir.exists() {
109 return Ok(vec![]);
110 }
111 let mut out = Vec::new();
112 for entry in std::fs::read_dir(dir)? {
113 let entry = entry?;
114 let path = entry.path();
115 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
116 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
117 out.push(stem.to_string());
118 }
119 }
120 }
121 Ok(out)
122}
123
124pub fn rebuild_state(
125 conn: &Connection,
126 jsonl_path: impl AsRef<Path>,
127 project_hash: &str,
128) -> anyhow::Result<usize> {
129 let f = std::fs::File::open(&jsonl_path)
130 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
131 let reader = std::io::BufReader::new(f);
132
133 let tx = conn.unchecked_transaction()?;
134 let mut count = 0;
135 for (i, line) in reader.lines().enumerate() {
136 let line = line.with_context(|| format!("read line {i}"))?;
137 if line.trim().is_empty() {
138 continue;
139 }
140 let event: Event =
141 serde_json::from_str(&line).with_context(|| format!("parse line {i}"))?;
142 upsert_task_from_event(&tx, &event, project_hash)?;
143 index_event(&tx, &event)?;
144 count += 1;
145 }
146 tx.commit()?;
147 Ok(count)
148}
149
150pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
151 let type_str = serde_json::to_value(event.event_type)?
152 .as_str()
153 .unwrap()
154 .to_string();
155 let status_str = serde_json::to_value(event.status)?
156 .as_str()
157 .unwrap()
158 .to_string();
159 conn.execute(
160 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status)
161 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
162 rusqlite::params![
163 event.event_id, event.task_id, type_str,
164 event.timestamp, event.confidence, status_str
165 ],
166 )?;
167 conn.execute(
169 "DELETE FROM search_fts WHERE event_id=?1",
170 rusqlite::params![event.event_id],
171 )?;
172 conn.execute(
173 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
174 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
175 )?;
176
177 if event.event_type == EventType::Decision {
178 conn.execute(
179 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
180 VALUES (?1, ?2, ?3, 'active')",
181 rusqlite::params![event.event_id, event.task_id, event.text],
182 )?;
183 }
184
185 if event.event_type == EventType::Supersede {
186 if let Some(target) = &event.supersedes {
187 conn.execute(
188 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
189 rusqlite::params![event.event_id, target],
190 )?;
191 }
192 }
193
194 if event.event_type == EventType::Evidence {
195 let strength_str = event
196 .evidence_strength
197 .map(|s| {
198 serde_json::to_value(s)
199 .unwrap()
200 .as_str()
201 .unwrap()
202 .to_string()
203 })
204 .unwrap_or_else(|| "medium".into());
205 conn.execute(
206 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
207 VALUES (?1, ?2, ?3, ?4)",
208 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
209 )?;
210 }
211
212 conn.execute(
214 "DELETE FROM task_pack_cache WHERE task_id=?1",
215 rusqlite::params![event.task_id],
216 )?;
217
218 Ok(())
219}
220
221pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
222 if let Some(parent) = path.as_ref().parent() {
223 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
224 }
225 let conn =
226 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
227 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
228 conn.execute_batch(MIGRATION_001)
229 .context("apply migration 001")?;
230 Ok(conn)
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use tempfile::TempDir;
237
238 #[test]
239 fn open_creates_all_tables() {
240 let d = TempDir::new().unwrap();
241 let p = d.path().join("state.sqlite");
242 let conn = open(&p).unwrap();
243
244 let names: Vec<String> = conn
245 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
246 .unwrap()
247 .query_map([], |r| r.get::<_, String>(0))
248 .unwrap()
249 .collect::<Result<_, _>>()
250 .unwrap();
251
252 for required in [
253 "decisions",
254 "events_index",
255 "evidence",
256 "task_pack_cache",
257 "tasks",
258 "search_fts",
259 ] {
260 assert!(
261 names.iter().any(|n| n == required),
262 "missing table {required}, have {names:?}"
263 );
264 }
265 }
266
267 #[test]
268 fn open_is_idempotent() {
269 let d = TempDir::new().unwrap();
270 let p = d.path().join("state.sqlite");
271 let _ = open(&p).unwrap();
272 let _ = open(&p).unwrap();
273 }
274
275 #[test]
276 fn index_event_projects_evidence() {
277 let d = TempDir::new().unwrap();
278 let conn = open(d.path().join("s.sqlite")).unwrap();
279 let mut open_e = crate::event::Event::new(
280 "tj-e",
281 crate::event::EventType::Open,
282 crate::event::Author::User,
283 crate::event::Source::Cli,
284 "x".into(),
285 );
286 open_e.meta = serde_json::json!({"title": "T"});
287 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
288 index_event(&conn, &open_e).unwrap();
289
290 let mut ev = crate::event::Event::new(
291 "tj-e",
292 crate::event::EventType::Evidence,
293 crate::event::Author::Agent,
294 crate::event::Source::Chat,
295 "Hook startup measured at 12ms".into(),
296 );
297 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
298 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
299 index_event(&conn, &ev).unwrap();
300
301 let (text, strength): (String, String) = conn
302 .query_row(
303 "SELECT text, strength FROM evidence WHERE task_id=?1",
304 rusqlite::params!["tj-e"],
305 |r| Ok((r.get(0)?, r.get(1)?)),
306 )
307 .unwrap();
308 assert!(text.contains("12ms"));
309 assert_eq!(strength, "strong");
310 }
311
312 #[test]
313 fn supersede_event_marks_decision_superseded() {
314 let d = TempDir::new().unwrap();
315 let conn = open(d.path().join("s.sqlite")).unwrap();
316 let mut open_e = crate::event::Event::new(
317 "tj-s",
318 crate::event::EventType::Open,
319 crate::event::Author::User,
320 crate::event::Source::Cli,
321 "x".into(),
322 );
323 open_e.meta = serde_json::json!({"title": "T"});
324 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
325 index_event(&conn, &open_e).unwrap();
326
327 let dec = crate::event::Event::new(
328 "tj-s",
329 crate::event::EventType::Decision,
330 crate::event::Author::Agent,
331 crate::event::Source::Chat,
332 "Use TS".into(),
333 );
334 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
335 index_event(&conn, &dec).unwrap();
336
337 let mut sup = crate::event::Event::new(
338 "tj-s",
339 crate::event::EventType::Supersede,
340 crate::event::Author::Agent,
341 crate::event::Source::Chat,
342 "Replaced by Rust decision".into(),
343 );
344 sup.supersedes = Some(dec.event_id.clone());
345 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
346 index_event(&conn, &sup).unwrap();
347
348 let (status, by): (String, Option<String>) = conn
349 .query_row(
350 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
351 rusqlite::params![dec.event_id],
352 |r| Ok((r.get(0)?, r.get(1)?)),
353 )
354 .unwrap();
355 assert_eq!(status, "superseded");
356 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
357 }
358
359 #[test]
360 fn index_event_projects_decision_to_decisions_table() {
361 let d = TempDir::new().unwrap();
362 let conn = open(d.path().join("s.sqlite")).unwrap();
363
364 let mut open_e = crate::event::Event::new(
365 "tj-d",
366 crate::event::EventType::Open,
367 crate::event::Author::User,
368 crate::event::Source::Cli,
369 "x".into(),
370 );
371 open_e.meta = serde_json::json!({"title": "T"});
372 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
373 index_event(&conn, &open_e).unwrap();
374
375 let dec = crate::event::Event::new(
376 "tj-d",
377 crate::event::EventType::Decision,
378 crate::event::Author::Agent,
379 crate::event::Source::Chat,
380 "Adopt Rust".into(),
381 );
382 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
383 index_event(&conn, &dec).unwrap();
384
385 let (id, text, status): (String, String, String) = conn
386 .query_row(
387 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
388 rusqlite::params!["tj-d"],
389 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
390 )
391 .unwrap();
392 assert_eq!(id, dec.event_id);
393 assert_eq!(text, "Adopt Rust");
394 assert_eq!(status, "active");
395 }
396
397 #[test]
398 fn index_event_is_idempotent_no_search_fts_duplicates() {
399 let d = TempDir::new().unwrap();
400 let conn = open(d.path().join("s.sqlite")).unwrap();
401 let mut open_e = crate::event::Event::new(
402 "tj-id",
403 crate::event::EventType::Open,
404 crate::event::Author::User,
405 crate::event::Source::Cli,
406 "x".into(),
407 );
408 open_e.meta = serde_json::json!({"title": "Idempotent"});
409 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
410
411 index_event(&conn, &open_e).unwrap();
413 index_event(&conn, &open_e).unwrap();
414 index_event(&conn, &open_e).unwrap();
415
416 let n: i64 = conn
417 .query_row(
418 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
419 rusqlite::params![open_e.event_id],
420 |r| r.get(0),
421 )
422 .unwrap();
423 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
424 }
425
426 #[test]
427 fn list_all_projects_returns_hashes_from_state_dir() {
428 use std::fs::File;
429 let d = TempDir::new().unwrap();
430 let state_dir = d.path().join("state");
431 std::fs::create_dir_all(&state_dir).unwrap();
432 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
433 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
434 File::create(state_dir.join("not-a-project.txt")).unwrap();
435
436 let mut hashes = list_all_projects(&state_dir).unwrap();
437 hashes.sort();
438 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
439 }
440
441 #[test]
442 fn rebuild_state_reads_jsonl_and_populates_db() {
443 use std::io::Write;
444 let d = TempDir::new().unwrap();
445 let events_path = d.path().join("events.jsonl");
446 let db_path = d.path().join("s.sqlite");
447
448 let mut f = std::fs::File::create(&events_path).unwrap();
449 let mut e1 = crate::event::Event::new(
450 "tj-9",
451 crate::event::EventType::Open,
452 crate::event::Author::User,
453 crate::event::Source::Cli,
454 "x".into(),
455 );
456 e1.meta = serde_json::json!({"title": "Nine"});
457 let e2 = crate::event::Event::new(
458 "tj-9",
459 crate::event::EventType::Decision,
460 crate::event::Author::Agent,
461 crate::event::Source::Chat,
462 "Adopt Rust".into(),
463 );
464 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
465 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
466 drop(f);
467
468 let conn = open(&db_path).unwrap();
469 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
470 assert_eq!(n, 2);
471
472 let n: i64 = conn
473 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
474 .unwrap();
475 assert_eq!(n, 1);
476 let n: i64 = conn
477 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
478 .unwrap();
479 assert_eq!(n, 2);
480 }
481
482 #[test]
483 fn index_event_writes_index_and_fts() {
484 let d = TempDir::new().unwrap();
485 let conn = open(d.path().join("s.sqlite")).unwrap();
486 let mut open_e = crate::event::Event::new(
487 "tj-1",
488 crate::event::EventType::Open,
489 crate::event::Author::User,
490 crate::event::Source::Cli,
491 "Title".into(),
492 );
493 open_e.meta = serde_json::json!({"title": "Title"});
494 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
495 index_event(&conn, &open_e).unwrap();
496
497 let mut decision = crate::event::Event::new(
498 "tj-1",
499 crate::event::EventType::Decision,
500 crate::event::Author::Agent,
501 crate::event::Source::Chat,
502 "Adopt Rust".into(),
503 );
504 decision.confidence = Some(0.92);
505 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
506 index_event(&conn, &decision).unwrap();
507
508 let count: i64 = conn
509 .query_row(
510 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
511 rusqlite::params!["tj-1"],
512 |r| r.get(0),
513 )
514 .unwrap();
515 assert_eq!(count, 2);
516
517 let mut stmt = conn
518 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
519 .unwrap();
520 let hits: Vec<String> = stmt
521 .query_map(rusqlite::params!["Rust"], |r| {
522 let s: String = r.get(0)?;
523 Ok(s)
524 })
525 .unwrap()
526 .collect::<Result<Vec<_>, _>>()
527 .unwrap();
528 assert_eq!(hits.len(), 1);
529 assert_eq!(hits[0], decision.event_id);
530 }
531
532 #[test]
533 fn upsert_task_from_open_event_inserts_row() {
534 let d = TempDir::new().unwrap();
535 let conn = open(d.path().join("s.sqlite")).unwrap();
536
537 let mut e = crate::event::Event::new(
538 "tj-7f3a",
539 crate::event::EventType::Open,
540 crate::event::Author::User,
541 crate::event::Source::Cli,
542 "Add OAuth".into(),
543 );
544 e.meta = serde_json::json!({ "title": "Add OAuth login" });
545
546 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
547
548 let (id, title, status): (String, String, String) = conn
549 .query_row(
550 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
551 ["tj-7f3a"],
552 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
553 )
554 .unwrap();
555
556 assert_eq!(id, "tj-7f3a");
557 assert_eq!(title, "Add OAuth login");
558 assert_eq!(status, "open");
559 }
560}