Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::path::Path;
4
5const MIGRATION_001: &str = r#"
6CREATE TABLE IF NOT EXISTS tasks (
7  task_id        TEXT PRIMARY KEY,
8  title          TEXT NOT NULL,
9  status         TEXT NOT NULL,
10  project_hash   TEXT NOT NULL,
11  opened_at      TEXT NOT NULL,
12  closed_at      TEXT,
13  last_event_at  TEXT NOT NULL
14);
15CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
16
17CREATE TABLE IF NOT EXISTS events_index (
18  event_id    TEXT PRIMARY KEY,
19  task_id     TEXT NOT NULL,
20  type        TEXT NOT NULL,
21  timestamp   TEXT NOT NULL,
22  confidence  REAL,
23  status      TEXT NOT NULL
24);
25CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
26
27CREATE TABLE IF NOT EXISTS decisions (
28  decision_id    TEXT PRIMARY KEY,
29  task_id        TEXT NOT NULL,
30  text           TEXT NOT NULL,
31  status         TEXT NOT NULL,
32  superseded_by  TEXT
33);
34
35CREATE TABLE IF NOT EXISTS evidence (
36  evidence_id           TEXT PRIMARY KEY,
37  task_id               TEXT NOT NULL,
38  text                  TEXT NOT NULL,
39  strength              TEXT NOT NULL,
40  refers_to_decision_id TEXT
41);
42
43CREATE TABLE IF NOT EXISTS task_pack_cache (
44  task_id             TEXT NOT NULL,
45  mode                TEXT NOT NULL,
46  text                TEXT NOT NULL,
47  generated_at        TEXT NOT NULL,
48  source_event_count  INTEGER NOT NULL,
49  PRIMARY KEY (task_id, mode)
50);
51
52CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
53  task_id UNINDEXED,
54  event_id UNINDEXED,
55  text,
56  type
57);
58"#;
59
60use crate::event::{Event, EventType};
61
62pub fn upsert_task_from_event(
63    conn: &Connection,
64    event: &Event,
65    project_hash: &str,
66) -> anyhow::Result<()> {
67    match event.event_type {
68        EventType::Open => {
69            let title = event
70                .meta
71                .get("title")
72                .and_then(|v| v.as_str())
73                .unwrap_or(&event.text)
74                .to_string();
75            conn.execute(
76                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
77                 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
78                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
79                rusqlite::params![event.task_id, title, project_hash, event.timestamp],
80            )?;
81        }
82        EventType::Close => {
83            conn.execute(
84                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
85                rusqlite::params![event.task_id, event.timestamp],
86            )?;
87        }
88        EventType::Reopen => {
89            conn.execute(
90                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
91                rusqlite::params![event.task_id, event.timestamp],
92            )?;
93        }
94        _ => {
95            conn.execute(
96                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
97                rusqlite::params![event.task_id, event.timestamp],
98            )?;
99        }
100    }
101    Ok(())
102}
103
104use std::io::BufRead;
105
106pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
107    let dir = state_dir.as_ref();
108    if !dir.exists() {
109        return Ok(vec![]);
110    }
111    let mut out = Vec::new();
112    for entry in std::fs::read_dir(dir)? {
113        let entry = entry?;
114        let path = entry.path();
115        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
116            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
117                out.push(stem.to_string());
118            }
119        }
120    }
121    Ok(out)
122}
123
124pub fn rebuild_state(
125    conn: &Connection,
126    jsonl_path: impl AsRef<Path>,
127    project_hash: &str,
128) -> anyhow::Result<usize> {
129    let f = std::fs::File::open(&jsonl_path)
130        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
131    let reader = std::io::BufReader::new(f);
132
133    let tx = conn.unchecked_transaction()?;
134    let mut count = 0;
135    for (i, line) in reader.lines().enumerate() {
136        let line = line.with_context(|| format!("read line {i}"))?;
137        if line.trim().is_empty() {
138            continue;
139        }
140        let event: Event =
141            serde_json::from_str(&line).with_context(|| format!("parse line {i}"))?;
142        upsert_task_from_event(&tx, &event, project_hash)?;
143        index_event(&tx, &event)?;
144        count += 1;
145    }
146    tx.commit()?;
147    Ok(count)
148}
149
150pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
151    let type_str = serde_json::to_value(event.event_type)?
152        .as_str()
153        .unwrap()
154        .to_string();
155    let status_str = serde_json::to_value(event.status)?
156        .as_str()
157        .unwrap()
158        .to_string();
159    conn.execute(
160        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status)
161         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
162        rusqlite::params![
163            event.event_id, event.task_id, type_str,
164            event.timestamp, event.confidence, status_str
165        ],
166    )?;
167    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
168    conn.execute(
169        "DELETE FROM search_fts WHERE event_id=?1",
170        rusqlite::params![event.event_id],
171    )?;
172    conn.execute(
173        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
174        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
175    )?;
176
177    if event.event_type == EventType::Decision {
178        conn.execute(
179            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
180             VALUES (?1, ?2, ?3, 'active')",
181            rusqlite::params![event.event_id, event.task_id, event.text],
182        )?;
183    }
184
185    if event.event_type == EventType::Supersede {
186        if let Some(target) = &event.supersedes {
187            conn.execute(
188                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
189                rusqlite::params![event.event_id, target],
190            )?;
191        }
192    }
193
194    if event.event_type == EventType::Evidence {
195        let strength_str = event
196            .evidence_strength
197            .map(|s| {
198                serde_json::to_value(s)
199                    .unwrap()
200                    .as_str()
201                    .unwrap()
202                    .to_string()
203            })
204            .unwrap_or_else(|| "medium".into());
205        conn.execute(
206            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
207             VALUES (?1, ?2, ?3, ?4)",
208            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
209        )?;
210    }
211
212    // Invalidate any cached pack for this task.
213    conn.execute(
214        "DELETE FROM task_pack_cache WHERE task_id=?1",
215        rusqlite::params![event.task_id],
216    )?;
217
218    Ok(())
219}
220
221pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
222    if let Some(parent) = path.as_ref().parent() {
223        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
224    }
225    let conn =
226        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
227    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
228    conn.execute_batch(MIGRATION_001)
229        .context("apply migration 001")?;
230    Ok(conn)
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use tempfile::TempDir;
237
238    #[test]
239    fn open_creates_all_tables() {
240        let d = TempDir::new().unwrap();
241        let p = d.path().join("state.sqlite");
242        let conn = open(&p).unwrap();
243
244        let names: Vec<String> = conn
245            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
246            .unwrap()
247            .query_map([], |r| r.get::<_, String>(0))
248            .unwrap()
249            .collect::<Result<_, _>>()
250            .unwrap();
251
252        for required in [
253            "decisions",
254            "events_index",
255            "evidence",
256            "task_pack_cache",
257            "tasks",
258            "search_fts",
259        ] {
260            assert!(
261                names.iter().any(|n| n == required),
262                "missing table {required}, have {names:?}"
263            );
264        }
265    }
266
267    #[test]
268    fn open_is_idempotent() {
269        let d = TempDir::new().unwrap();
270        let p = d.path().join("state.sqlite");
271        let _ = open(&p).unwrap();
272        let _ = open(&p).unwrap();
273    }
274
275    #[test]
276    fn index_event_projects_evidence() {
277        let d = TempDir::new().unwrap();
278        let conn = open(d.path().join("s.sqlite")).unwrap();
279        let mut open_e = crate::event::Event::new(
280            "tj-e",
281            crate::event::EventType::Open,
282            crate::event::Author::User,
283            crate::event::Source::Cli,
284            "x".into(),
285        );
286        open_e.meta = serde_json::json!({"title": "T"});
287        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
288        index_event(&conn, &open_e).unwrap();
289
290        let mut ev = crate::event::Event::new(
291            "tj-e",
292            crate::event::EventType::Evidence,
293            crate::event::Author::Agent,
294            crate::event::Source::Chat,
295            "Hook startup measured at 12ms".into(),
296        );
297        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
298        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
299        index_event(&conn, &ev).unwrap();
300
301        let (text, strength): (String, String) = conn
302            .query_row(
303                "SELECT text, strength FROM evidence WHERE task_id=?1",
304                rusqlite::params!["tj-e"],
305                |r| Ok((r.get(0)?, r.get(1)?)),
306            )
307            .unwrap();
308        assert!(text.contains("12ms"));
309        assert_eq!(strength, "strong");
310    }
311
312    #[test]
313    fn supersede_event_marks_decision_superseded() {
314        let d = TempDir::new().unwrap();
315        let conn = open(d.path().join("s.sqlite")).unwrap();
316        let mut open_e = crate::event::Event::new(
317            "tj-s",
318            crate::event::EventType::Open,
319            crate::event::Author::User,
320            crate::event::Source::Cli,
321            "x".into(),
322        );
323        open_e.meta = serde_json::json!({"title": "T"});
324        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
325        index_event(&conn, &open_e).unwrap();
326
327        let dec = crate::event::Event::new(
328            "tj-s",
329            crate::event::EventType::Decision,
330            crate::event::Author::Agent,
331            crate::event::Source::Chat,
332            "Use TS".into(),
333        );
334        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
335        index_event(&conn, &dec).unwrap();
336
337        let mut sup = crate::event::Event::new(
338            "tj-s",
339            crate::event::EventType::Supersede,
340            crate::event::Author::Agent,
341            crate::event::Source::Chat,
342            "Replaced by Rust decision".into(),
343        );
344        sup.supersedes = Some(dec.event_id.clone());
345        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
346        index_event(&conn, &sup).unwrap();
347
348        let (status, by): (String, Option<String>) = conn
349            .query_row(
350                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
351                rusqlite::params![dec.event_id],
352                |r| Ok((r.get(0)?, r.get(1)?)),
353            )
354            .unwrap();
355        assert_eq!(status, "superseded");
356        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
357    }
358
359    #[test]
360    fn index_event_projects_decision_to_decisions_table() {
361        let d = TempDir::new().unwrap();
362        let conn = open(d.path().join("s.sqlite")).unwrap();
363
364        let mut open_e = crate::event::Event::new(
365            "tj-d",
366            crate::event::EventType::Open,
367            crate::event::Author::User,
368            crate::event::Source::Cli,
369            "x".into(),
370        );
371        open_e.meta = serde_json::json!({"title": "T"});
372        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
373        index_event(&conn, &open_e).unwrap();
374
375        let dec = crate::event::Event::new(
376            "tj-d",
377            crate::event::EventType::Decision,
378            crate::event::Author::Agent,
379            crate::event::Source::Chat,
380            "Adopt Rust".into(),
381        );
382        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
383        index_event(&conn, &dec).unwrap();
384
385        let (id, text, status): (String, String, String) = conn
386            .query_row(
387                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
388                rusqlite::params!["tj-d"],
389                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
390            )
391            .unwrap();
392        assert_eq!(id, dec.event_id);
393        assert_eq!(text, "Adopt Rust");
394        assert_eq!(status, "active");
395    }
396
397    #[test]
398    fn index_event_is_idempotent_no_search_fts_duplicates() {
399        let d = TempDir::new().unwrap();
400        let conn = open(d.path().join("s.sqlite")).unwrap();
401        let mut open_e = crate::event::Event::new(
402            "tj-id",
403            crate::event::EventType::Open,
404            crate::event::Author::User,
405            crate::event::Source::Cli,
406            "x".into(),
407        );
408        open_e.meta = serde_json::json!({"title": "Idempotent"});
409        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
410
411        // Index three times — simulates rebuild_state replays.
412        index_event(&conn, &open_e).unwrap();
413        index_event(&conn, &open_e).unwrap();
414        index_event(&conn, &open_e).unwrap();
415
416        let n: i64 = conn
417            .query_row(
418                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
419                rusqlite::params![open_e.event_id],
420                |r| r.get(0),
421            )
422            .unwrap();
423        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
424    }
425
426    #[test]
427    fn list_all_projects_returns_hashes_from_state_dir() {
428        use std::fs::File;
429        let d = TempDir::new().unwrap();
430        let state_dir = d.path().join("state");
431        std::fs::create_dir_all(&state_dir).unwrap();
432        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
433        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
434        File::create(state_dir.join("not-a-project.txt")).unwrap();
435
436        let mut hashes = list_all_projects(&state_dir).unwrap();
437        hashes.sort();
438        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
439    }
440
441    #[test]
442    fn rebuild_state_reads_jsonl_and_populates_db() {
443        use std::io::Write;
444        let d = TempDir::new().unwrap();
445        let events_path = d.path().join("events.jsonl");
446        let db_path = d.path().join("s.sqlite");
447
448        let mut f = std::fs::File::create(&events_path).unwrap();
449        let mut e1 = crate::event::Event::new(
450            "tj-9",
451            crate::event::EventType::Open,
452            crate::event::Author::User,
453            crate::event::Source::Cli,
454            "x".into(),
455        );
456        e1.meta = serde_json::json!({"title": "Nine"});
457        let e2 = crate::event::Event::new(
458            "tj-9",
459            crate::event::EventType::Decision,
460            crate::event::Author::Agent,
461            crate::event::Source::Chat,
462            "Adopt Rust".into(),
463        );
464        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
465        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
466        drop(f);
467
468        let conn = open(&db_path).unwrap();
469        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
470        assert_eq!(n, 2);
471
472        let n: i64 = conn
473            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
474            .unwrap();
475        assert_eq!(n, 1);
476        let n: i64 = conn
477            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
478            .unwrap();
479        assert_eq!(n, 2);
480    }
481
482    #[test]
483    fn index_event_writes_index_and_fts() {
484        let d = TempDir::new().unwrap();
485        let conn = open(d.path().join("s.sqlite")).unwrap();
486        let mut open_e = crate::event::Event::new(
487            "tj-1",
488            crate::event::EventType::Open,
489            crate::event::Author::User,
490            crate::event::Source::Cli,
491            "Title".into(),
492        );
493        open_e.meta = serde_json::json!({"title": "Title"});
494        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
495        index_event(&conn, &open_e).unwrap();
496
497        let mut decision = crate::event::Event::new(
498            "tj-1",
499            crate::event::EventType::Decision,
500            crate::event::Author::Agent,
501            crate::event::Source::Chat,
502            "Adopt Rust".into(),
503        );
504        decision.confidence = Some(0.92);
505        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
506        index_event(&conn, &decision).unwrap();
507
508        let count: i64 = conn
509            .query_row(
510                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
511                rusqlite::params!["tj-1"],
512                |r| r.get(0),
513            )
514            .unwrap();
515        assert_eq!(count, 2);
516
517        let mut stmt = conn
518            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
519            .unwrap();
520        let hits: Vec<String> = stmt
521            .query_map(rusqlite::params!["Rust"], |r| {
522                let s: String = r.get(0)?;
523                Ok(s)
524            })
525            .unwrap()
526            .collect::<Result<Vec<_>, _>>()
527            .unwrap();
528        assert_eq!(hits.len(), 1);
529        assert_eq!(hits[0], decision.event_id);
530    }
531
532    #[test]
533    fn upsert_task_from_open_event_inserts_row() {
534        let d = TempDir::new().unwrap();
535        let conn = open(d.path().join("s.sqlite")).unwrap();
536
537        let mut e = crate::event::Event::new(
538            "tj-7f3a",
539            crate::event::EventType::Open,
540            crate::event::Author::User,
541            crate::event::Source::Cli,
542            "Add OAuth".into(),
543        );
544        e.meta = serde_json::json!({ "title": "Add OAuth login" });
545
546        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
547
548        let (id, title, status): (String, String, String) = conn
549            .query_row(
550                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
551                ["tj-7f3a"],
552                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
553            )
554            .unwrap();
555
556        assert_eq!(id, "tj-7f3a");
557        assert_eq!(title, "Add OAuth login");
558        assert_eq!(status, "open");
559    }
560}