Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6/// One forward-only schema migration. Migrations are applied in `version`
7/// order; each is recorded in `schema_migrations` so re-running `open()`
8/// is idempotent.
9struct Migration {
10    version: i64,
11    sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16  task_id        TEXT PRIMARY KEY,
17  title          TEXT NOT NULL,
18  status         TEXT NOT NULL,
19  project_hash   TEXT NOT NULL,
20  opened_at      TEXT NOT NULL,
21  closed_at      TEXT,
22  last_event_at  TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27  event_id    TEXT PRIMARY KEY,
28  task_id     TEXT NOT NULL,
29  type        TEXT NOT NULL,
30  timestamp   TEXT NOT NULL,
31  confidence  REAL,
32  status      TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37  decision_id    TEXT PRIMARY KEY,
38  task_id        TEXT NOT NULL,
39  text           TEXT NOT NULL,
40  status         TEXT NOT NULL,
41  superseded_by  TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45  evidence_id           TEXT PRIMARY KEY,
46  task_id               TEXT NOT NULL,
47  text                  TEXT NOT NULL,
48  strength              TEXT NOT NULL,
49  refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53  task_id             TEXT NOT NULL,
54  mode                TEXT NOT NULL,
55  text                TEXT NOT NULL,
56  generated_at        TEXT NOT NULL,
57  source_event_count  INTEGER NOT NULL,
58  PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62  task_id UNINDEXED,
63  event_id UNINDEXED,
64  text,
65  type
66);
67"#;
68
69/// Tracks how far we've ingested the JSONL log per project so subsequent
70/// `ingest_new_events` calls can read only the tail rather than rescanning
71/// the entire file. `last_indexed_event_id` is the `event_id` of the most
72/// recent event written to `events_index`.
73const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75  project_hash          TEXT PRIMARY KEY,
76  last_indexed_event_id TEXT NOT NULL,
77  updated_at            TEXT NOT NULL
78);
79"#;
80
81/// v0.4.0 task-as-goal redesign: explicit goal/outcome on tasks +
82/// typed artifacts on events. NULLable so existing rows survive
83/// without backfill. Wipes the pack cache so old packs (rendered
84/// without Goal/Outcome blocks) regenerate on next view.
85const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal        TEXT;
87ALTER TABLE tasks ADD COLUMN outcome     TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external    TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94/// All schema migrations in version order. Append new entries here; never
95/// edit a published migration's `sql` — write a new one instead.
96const MIGRATIONS: &[Migration] = &[
97    Migration {
98        version: 1,
99        sql: MIGRATION_001,
100    },
101    Migration {
102        version: 2,
103        sql: MIGRATION_002,
104    },
105    Migration {
106        version: 3,
107        sql: MIGRATION_003,
108    },
109];
110
111fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
112    conn.execute_batch(
113        "CREATE TABLE IF NOT EXISTS schema_migrations (
114            version    INTEGER PRIMARY KEY,
115            applied_at TEXT NOT NULL
116        )",
117    )
118    .context("create schema_migrations table")?;
119
120    let applied: HashSet<i64> = {
121        let mut stmt = conn
122            .prepare("SELECT version FROM schema_migrations")
123            .context("select applied versions")?;
124        let rows = stmt
125            .query_map([], |r| r.get::<_, i64>(0))
126            .context("iterate schema_migrations")?;
127        rows.collect::<rusqlite::Result<HashSet<_>>>()
128            .context("collect applied versions")?
129    };
130
131    for migration in MIGRATIONS {
132        if applied.contains(&migration.version) {
133            continue;
134        }
135        conn.execute_batch(migration.sql)
136            .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
137        conn.execute(
138            "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
139            rusqlite::params![
140                migration.version,
141                chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
142            ],
143        )
144        .with_context(|| {
145            format!(
146                "record schema migration v{:03} as applied",
147                migration.version
148            )
149        })?;
150    }
151    Ok(())
152}
153
154use crate::event::{Event, EventType};
155
156pub fn upsert_task_from_event(
157    conn: &Connection,
158    event: &Event,
159    project_hash: &str,
160) -> anyhow::Result<()> {
161    match event.event_type {
162        EventType::Open => {
163            let title = event
164                .meta
165                .get("title")
166                .and_then(|v| v.as_str())
167                .unwrap_or(&event.text)
168                .to_string();
169            conn.execute(
170                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
171                 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
172                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
173                rusqlite::params![event.task_id, title, project_hash, event.timestamp],
174            )?;
175        }
176        EventType::Close => {
177            conn.execute(
178                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
179                rusqlite::params![event.task_id, event.timestamp],
180            )?;
181        }
182        EventType::Reopen => {
183            conn.execute(
184                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
185                rusqlite::params![event.task_id, event.timestamp],
186            )?;
187        }
188        _ => {
189            conn.execute(
190                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
191                rusqlite::params![event.task_id, event.timestamp],
192            )?;
193        }
194    }
195    Ok(())
196}
197
198use std::io::BufRead;
199
200pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
201    let dir = state_dir.as_ref();
202    if !dir.exists() {
203        return Ok(vec![]);
204    }
205    let mut out = Vec::new();
206    for entry in std::fs::read_dir(dir)? {
207        let entry = entry?;
208        let path = entry.path();
209        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
210            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
211                out.push(stem.to_string());
212            }
213        }
214    }
215    Ok(out)
216}
217
218pub fn rebuild_state(
219    conn: &Connection,
220    jsonl_path: impl AsRef<Path>,
221    project_hash: &str,
222) -> anyhow::Result<usize> {
223    let f = std::fs::File::open(&jsonl_path)
224        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
225    let reader = std::io::BufReader::new(f);
226
227    let tx = conn.unchecked_transaction()?;
228    let mut count = 0;
229    let mut last_event_id: Option<String> = None;
230    for (i, line) in reader.lines().enumerate() {
231        let line = line.with_context(|| format!("read line {i}"))?;
232        if line.trim().is_empty() {
233            continue;
234        }
235        // Malformed JSONL lines are skipped with a warning so that one bad
236        // event cannot abort an otherwise-recoverable rebuild. SQL errors
237        // still propagate — those indicate schema/integrity problems.
238        let event: Event = match serde_json::from_str(&line) {
239            Ok(e) => e,
240            Err(err) => {
241                tracing::warn!(
242                    line_number = i + 1,
243                    error = %err,
244                    "skipping malformed JSONL line in rebuild_state"
245                );
246                continue;
247            }
248        };
249        upsert_task_from_event(&tx, &event, project_hash)?;
250        index_event(&tx, &event)?;
251        last_event_id = Some(event.event_id.clone());
252        count += 1;
253    }
254    if let Some(eid) = last_event_id.as_deref() {
255        record_last_indexed(&tx, project_hash, eid)?;
256    }
257    tx.commit()?;
258    Ok(count)
259}
260
261/// Returns whether a task with this id has been recorded in the derived
262/// state. Cheap O(1) lookup against the `tasks` primary key. Callers
263/// should run [`ingest_new_events`] first if they want to see the latest
264/// JSONL state.
265pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
266    let count: i64 = conn.query_row(
267        "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
268        rusqlite::params![task_id],
269        |r| r.get(0),
270    )?;
271    Ok(count > 0)
272}
273
274/// Status string for an existing task (e.g. "open", "closed"). Returns
275/// `None` when the task is unknown — caller decides whether that's a
276/// hard error or a route-to-pending case.
277pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
278    let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
279    let mut rows = stmt.query(rusqlite::params![task_id])?;
280    Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
281}
282
283/// Set or replace `tasks.goal` for an existing task. Caller is
284/// expected to have validated the task exists (via `task_exists`); we
285/// don't error on no-op rows so the upsert pattern is uniform.
286pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
287    conn.execute(
288        "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
289        rusqlite::params![goal, task_id],
290    )
291    .with_context(|| format!("set goal for {task_id}"))?;
292    // Pack cache is now stale for this task — drop the entry so the
293    // next render picks up the new goal.
294    conn.execute(
295        "DELETE FROM task_pack_cache WHERE task_id = ?1",
296        rusqlite::params![task_id],
297    )?;
298    Ok(())
299}
300
301/// Set or replace the closure metadata. Pass `None` for `outcome_tag`
302/// to leave it unset; pass `Some("done"|"abandoned"|"superseded")`
303/// for a structured tag. Free-text `outcome` is the primary field.
304pub fn set_task_outcome(
305    conn: &Connection,
306    task_id: &str,
307    outcome: &str,
308    outcome_tag: Option<&str>,
309) -> anyhow::Result<()> {
310    conn.execute(
311        "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
312        rusqlite::params![outcome, outcome_tag, task_id],
313    )
314    .with_context(|| format!("set outcome for {task_id}"))?;
315    conn.execute(
316        "DELETE FROM task_pack_cache WHERE task_id = ?1",
317        rusqlite::params![task_id],
318    )?;
319    Ok(())
320}
321
322/// Append an external reference to `tasks.external`. The column is
323/// stored as a comma-separated list — small, append-mostly, no
324/// uniqueness constraint. Acceptable shapes (loose, not enforced):
325/// `beads:claude-memory-rsw`, `github:#42`, `jira:PROJ-1234`.
326pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
327    let current: Option<String> = conn
328        .query_row(
329            "SELECT external FROM tasks WHERE task_id = ?1",
330            rusqlite::params![task_id],
331            |r| r.get::<_, Option<String>>(0),
332        )
333        .with_context(|| format!("read external for {task_id}"))?;
334    let next = match current {
335        Some(s) if !s.is_empty() => format!("{s},{reference}"),
336        _ => reference.to_string(),
337    };
338    conn.execute(
339        "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
340        rusqlite::params![next, task_id],
341    )?;
342    conn.execute(
343        "DELETE FROM task_pack_cache WHERE task_id = ?1",
344        rusqlite::params![task_id],
345    )?;
346    Ok(())
347}
348
349/// Read-only metadata bundle used by pack rendering (and TUI list
350/// teasers in v0.4.0+). Returns `None` for unknown tasks.
351#[derive(Debug, Clone, Default)]
352pub struct TaskMetadata {
353    pub goal: Option<String>,
354    pub outcome: Option<String>,
355    pub outcome_tag: Option<String>,
356    pub external: Option<String>,
357}
358
359pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
360    let mut stmt =
361        conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
362    let mut rows = stmt.query(rusqlite::params![task_id])?;
363    Ok(match rows.next()? {
364        Some(r) => Some(TaskMetadata {
365            goal: r.get::<_, Option<String>>(0)?,
366            outcome: r.get::<_, Option<String>>(1)?,
367            outcome_tag: r.get::<_, Option<String>>(2)?,
368            external: r.get::<_, Option<String>>(3)?,
369        }),
370        None => None,
371    })
372}
373
374/// Look up the most recent `event_id` we've ingested for this project.
375/// Returns `None` when the project has never been indexed (first call,
376/// or migration v002 just landed on an existing 0.1.x DB).
377fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
378    let mut stmt =
379        conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
380    let mut rows = stmt.query(rusqlite::params![project_hash])?;
381    if let Some(row) = rows.next()? {
382        Ok(Some(row.get::<_, String>(0)?))
383    } else {
384        Ok(None)
385    }
386}
387
388fn record_last_indexed(
389    conn: &Connection,
390    project_hash: &str,
391    event_id: &str,
392) -> anyhow::Result<()> {
393    conn.execute(
394        "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
395         VALUES (?1, ?2, ?3)
396         ON CONFLICT(project_hash) DO UPDATE SET
397             last_indexed_event_id = excluded.last_indexed_event_id,
398             updated_at = excluded.updated_at",
399        rusqlite::params![
400            project_hash,
401            event_id,
402            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
403        ],
404    )?;
405    Ok(())
406}
407
408/// Read only the tail of the JSONL log since the last call. The cheap path
409/// for hot loops (every MCP tool invocation): scan to the marker, ingest
410/// the rest, update the marker.
411///
412/// Falls back to a full [`rebuild_state`] in two cases:
413/// - No marker yet for this project (first call after migration v002 or
414///   on a brand-new install).
415/// - The stored marker is not present in the JSONL (corrupted / truncated
416///   file). A `tracing::warn!` is emitted so the operator notices.
417pub fn ingest_new_events(
418    conn: &Connection,
419    jsonl_path: impl AsRef<Path>,
420    project_hash: &str,
421) -> anyhow::Result<usize> {
422    let marker = match last_indexed_event_id(conn, project_hash)? {
423        Some(id) => id,
424        None => return rebuild_state(conn, jsonl_path, project_hash),
425    };
426
427    let f = std::fs::File::open(&jsonl_path)
428        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
429    let reader = std::io::BufReader::new(f);
430
431    // First pass: confirm the marker still exists in the file. If it does
432    // not, the JSONL has been rewritten under us — we can't trust the
433    // marker, so we fall back to a full rebuild.
434    let tx = conn.unchecked_transaction()?;
435    let mut found_marker = false;
436    let mut count = 0;
437    let mut last_event_id: Option<String> = None;
438    for (i, line) in reader.lines().enumerate() {
439        let line = line.with_context(|| format!("read line {i}"))?;
440        if line.trim().is_empty() {
441            continue;
442        }
443        let event: Event = match serde_json::from_str(&line) {
444            Ok(e) => e,
445            Err(err) => {
446                tracing::warn!(
447                    line_number = i + 1,
448                    error = %err,
449                    "skipping malformed JSONL line in ingest_new_events"
450                );
451                continue;
452            }
453        };
454        if !found_marker {
455            if event.event_id == marker {
456                found_marker = true;
457            }
458            continue;
459        }
460        upsert_task_from_event(&tx, &event, project_hash)?;
461        index_event(&tx, &event)?;
462        last_event_id = Some(event.event_id.clone());
463        count += 1;
464    }
465
466    if !found_marker {
467        // Discard the (empty) tx and rebuild from scratch.
468        drop(tx);
469        tracing::warn!(
470            project_hash = project_hash,
471            marker = marker.as_str(),
472            "last_indexed_event_id not found in JSONL — falling back to full rebuild"
473        );
474        return rebuild_state(conn, jsonl_path, project_hash);
475    }
476
477    if let Some(eid) = last_event_id.as_deref() {
478        record_last_indexed(&tx, project_hash, eid)?;
479    }
480    tx.commit()?;
481    Ok(count)
482}
483
484pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
485    let type_str = serde_json::to_value(event.event_type)?
486        .as_str()
487        .unwrap()
488        .to_string();
489    let status_str = serde_json::to_value(event.status)?
490        .as_str()
491        .unwrap()
492        .to_string();
493    conn.execute(
494        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status)
495         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
496        rusqlite::params![
497            event.event_id, event.task_id, type_str,
498            event.timestamp, event.confidence, status_str
499        ],
500    )?;
501    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
502    conn.execute(
503        "DELETE FROM search_fts WHERE event_id=?1",
504        rusqlite::params![event.event_id],
505    )?;
506    conn.execute(
507        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
508        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
509    )?;
510
511    if event.event_type == EventType::Decision {
512        conn.execute(
513            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
514             VALUES (?1, ?2, ?3, 'active')",
515            rusqlite::params![event.event_id, event.task_id, event.text],
516        )?;
517    }
518
519    if event.event_type == EventType::Supersede {
520        if let Some(target) = &event.supersedes {
521            conn.execute(
522                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
523                rusqlite::params![event.event_id, target],
524            )?;
525        }
526    }
527
528    if event.event_type == EventType::Evidence {
529        let strength_str = event
530            .evidence_strength
531            .map(|s| {
532                serde_json::to_value(s)
533                    .unwrap()
534                    .as_str()
535                    .unwrap()
536                    .to_string()
537            })
538            .unwrap_or_else(|| "medium".into());
539        conn.execute(
540            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
541             VALUES (?1, ?2, ?3, ?4)",
542            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
543        )?;
544    }
545
546    // Invalidate any cached pack for this task.
547    conn.execute(
548        "DELETE FROM task_pack_cache WHERE task_id=?1",
549        rusqlite::params![event.task_id],
550    )?;
551
552    Ok(())
553}
554
555pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
556    if let Some(parent) = path.as_ref().parent() {
557        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
558    }
559    let conn =
560        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
561    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
562    apply_migrations(&conn).context("apply schema migrations")?;
563    Ok(conn)
564}
565
566/// One row of the task list rendered by the TUI: enough to render the
567/// list view without round-tripping for each task. `event_count` joins
568/// `events_index` so we don't need a second query per row.
569#[derive(Debug, Clone)]
570pub struct TaskRow {
571    pub task_id: String,
572    pub title: String,
573    pub status: String,
574    pub last_event_at: String,
575    pub event_count: usize,
576}
577
578/// All tasks for a project, ordered with open ones first (by recency)
579/// then closed ones. The TUI list view binds directly to this — there
580/// is no other consumer, so the shape is tuned for that callsite.
581pub fn list_tasks_by_project(
582    conn: &Connection,
583    project_hash: &str,
584) -> anyhow::Result<Vec<TaskRow>> {
585    let mut stmt = conn.prepare(
586        "SELECT t.task_id, t.title, t.status, t.last_event_at,
587                COALESCE(c.cnt, 0) AS event_count
588         FROM tasks t
589         LEFT JOIN (
590             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
591         ) c ON c.task_id = t.task_id
592         WHERE t.project_hash = ?1
593         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
594    )?;
595    let rows = stmt
596        .query_map(rusqlite::params![project_hash], |r| {
597            Ok(TaskRow {
598                task_id: r.get::<_, String>(0)?,
599                title: r.get::<_, String>(1)?,
600                status: r.get::<_, String>(2)?,
601                last_event_at: r.get::<_, String>(3)?,
602                event_count: r.get::<_, i64>(4)? as usize,
603            })
604        })?
605        .collect::<Result<Vec<_>, _>>()?;
606    Ok(rows)
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use tempfile::TempDir;
613
614    #[test]
615    fn task_exists_returns_true_for_known_id_false_otherwise() {
616        let d = TempDir::new().unwrap();
617        let conn = open(d.path().join("s.sqlite")).unwrap();
618
619        assert!(!task_exists(&conn, "tj-nope").unwrap());
620
621        let e = make_open_event("tj-yes", "Hello");
622        upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
623        index_event(&conn, &e).unwrap();
624
625        assert!(task_exists(&conn, "tj-yes").unwrap());
626        assert!(!task_exists(&conn, "tj-nope").unwrap());
627    }
628
629    #[test]
630    fn fresh_db_runs_all_migrations() {
631        let d = TempDir::new().unwrap();
632        let p = d.path().join("state.sqlite");
633        let conn = open(&p).unwrap();
634
635        let applied: Vec<i64> = conn
636            .prepare("SELECT version FROM schema_migrations ORDER BY version")
637            .unwrap()
638            .query_map([], |r| r.get::<_, i64>(0))
639            .unwrap()
640            .collect::<Result<_, _>>()
641            .unwrap();
642        assert_eq!(
643            applied,
644            (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
645            "every declared migration must be recorded"
646        );
647    }
648
649    #[test]
650    fn apply_migrations_is_idempotent_across_reopens() {
651        let d = TempDir::new().unwrap();
652        let p = d.path().join("state.sqlite");
653        let _ = open(&p).unwrap();
654        let _ = open(&p).unwrap();
655
656        let count: i64 = open(&p)
657            .unwrap()
658            .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
659            .unwrap();
660        assert_eq!(
661            count,
662            MIGRATIONS.len() as i64,
663            "schema_migrations must contain exactly one row per declared migration after repeated opens"
664        );
665    }
666
667    #[test]
668    fn open_creates_all_tables() {
669        let d = TempDir::new().unwrap();
670        let p = d.path().join("state.sqlite");
671        let conn = open(&p).unwrap();
672
673        let names: Vec<String> = conn
674            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
675            .unwrap()
676            .query_map([], |r| r.get::<_, String>(0))
677            .unwrap()
678            .collect::<Result<_, _>>()
679            .unwrap();
680
681        for required in [
682            "decisions",
683            "events_index",
684            "evidence",
685            "task_pack_cache",
686            "tasks",
687            "search_fts",
688        ] {
689            assert!(
690                names.iter().any(|n| n == required),
691                "missing table {required}, have {names:?}"
692            );
693        }
694    }
695
696    #[test]
697    fn open_is_idempotent() {
698        let d = TempDir::new().unwrap();
699        let p = d.path().join("state.sqlite");
700        let _ = open(&p).unwrap();
701        let _ = open(&p).unwrap();
702    }
703
704    #[test]
705    fn index_event_projects_evidence() {
706        let d = TempDir::new().unwrap();
707        let conn = open(d.path().join("s.sqlite")).unwrap();
708        let mut open_e = crate::event::Event::new(
709            "tj-e",
710            crate::event::EventType::Open,
711            crate::event::Author::User,
712            crate::event::Source::Cli,
713            "x".into(),
714        );
715        open_e.meta = serde_json::json!({"title": "T"});
716        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
717        index_event(&conn, &open_e).unwrap();
718
719        let mut ev = crate::event::Event::new(
720            "tj-e",
721            crate::event::EventType::Evidence,
722            crate::event::Author::Agent,
723            crate::event::Source::Chat,
724            "Hook startup measured at 12ms".into(),
725        );
726        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
727        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
728        index_event(&conn, &ev).unwrap();
729
730        let (text, strength): (String, String) = conn
731            .query_row(
732                "SELECT text, strength FROM evidence WHERE task_id=?1",
733                rusqlite::params!["tj-e"],
734                |r| Ok((r.get(0)?, r.get(1)?)),
735            )
736            .unwrap();
737        assert!(text.contains("12ms"));
738        assert_eq!(strength, "strong");
739    }
740
741    #[test]
742    fn supersede_event_marks_decision_superseded() {
743        let d = TempDir::new().unwrap();
744        let conn = open(d.path().join("s.sqlite")).unwrap();
745        let mut open_e = crate::event::Event::new(
746            "tj-s",
747            crate::event::EventType::Open,
748            crate::event::Author::User,
749            crate::event::Source::Cli,
750            "x".into(),
751        );
752        open_e.meta = serde_json::json!({"title": "T"});
753        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
754        index_event(&conn, &open_e).unwrap();
755
756        let dec = crate::event::Event::new(
757            "tj-s",
758            crate::event::EventType::Decision,
759            crate::event::Author::Agent,
760            crate::event::Source::Chat,
761            "Use TS".into(),
762        );
763        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
764        index_event(&conn, &dec).unwrap();
765
766        let mut sup = crate::event::Event::new(
767            "tj-s",
768            crate::event::EventType::Supersede,
769            crate::event::Author::Agent,
770            crate::event::Source::Chat,
771            "Replaced by Rust decision".into(),
772        );
773        sup.supersedes = Some(dec.event_id.clone());
774        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
775        index_event(&conn, &sup).unwrap();
776
777        let (status, by): (String, Option<String>) = conn
778            .query_row(
779                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
780                rusqlite::params![dec.event_id],
781                |r| Ok((r.get(0)?, r.get(1)?)),
782            )
783            .unwrap();
784        assert_eq!(status, "superseded");
785        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
786    }
787
788    #[test]
789    fn index_event_projects_decision_to_decisions_table() {
790        let d = TempDir::new().unwrap();
791        let conn = open(d.path().join("s.sqlite")).unwrap();
792
793        let mut open_e = crate::event::Event::new(
794            "tj-d",
795            crate::event::EventType::Open,
796            crate::event::Author::User,
797            crate::event::Source::Cli,
798            "x".into(),
799        );
800        open_e.meta = serde_json::json!({"title": "T"});
801        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
802        index_event(&conn, &open_e).unwrap();
803
804        let dec = crate::event::Event::new(
805            "tj-d",
806            crate::event::EventType::Decision,
807            crate::event::Author::Agent,
808            crate::event::Source::Chat,
809            "Adopt Rust".into(),
810        );
811        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
812        index_event(&conn, &dec).unwrap();
813
814        let (id, text, status): (String, String, String) = conn
815            .query_row(
816                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
817                rusqlite::params!["tj-d"],
818                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
819            )
820            .unwrap();
821        assert_eq!(id, dec.event_id);
822        assert_eq!(text, "Adopt Rust");
823        assert_eq!(status, "active");
824    }
825
826    #[test]
827    fn index_event_is_idempotent_no_search_fts_duplicates() {
828        let d = TempDir::new().unwrap();
829        let conn = open(d.path().join("s.sqlite")).unwrap();
830        let mut open_e = crate::event::Event::new(
831            "tj-id",
832            crate::event::EventType::Open,
833            crate::event::Author::User,
834            crate::event::Source::Cli,
835            "x".into(),
836        );
837        open_e.meta = serde_json::json!({"title": "Idempotent"});
838        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
839
840        // Index three times — simulates rebuild_state replays.
841        index_event(&conn, &open_e).unwrap();
842        index_event(&conn, &open_e).unwrap();
843        index_event(&conn, &open_e).unwrap();
844
845        let n: i64 = conn
846            .query_row(
847                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
848                rusqlite::params![open_e.event_id],
849                |r| r.get(0),
850            )
851            .unwrap();
852        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
853    }
854
855    #[test]
856    fn list_all_projects_returns_hashes_from_state_dir() {
857        use std::fs::File;
858        let d = TempDir::new().unwrap();
859        let state_dir = d.path().join("state");
860        std::fs::create_dir_all(&state_dir).unwrap();
861        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
862        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
863        File::create(state_dir.join("not-a-project.txt")).unwrap();
864
865        let mut hashes = list_all_projects(&state_dir).unwrap();
866        hashes.sort();
867        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
868    }
869
870    fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
871        use std::io::Write;
872        writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
873    }
874
875    fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
876        let mut e = crate::event::Event::new(
877            task_id,
878            crate::event::EventType::Open,
879            crate::event::Author::User,
880            crate::event::Source::Cli,
881            "x".into(),
882        );
883        e.meta = serde_json::json!({"title": title});
884        e
885    }
886
887    #[test]
888    fn ingest_new_events_picks_up_only_new_lines() {
889        let d = TempDir::new().unwrap();
890        let jsonl = d.path().join("events.jsonl");
891        let db = d.path().join("s.sqlite");
892        let project = "deadbeefdeadbeef";
893
894        let e1 = make_open_event("tj-i1", "first");
895        let e2 = make_open_event("tj-i2", "second");
896        let e3 = make_open_event("tj-i3", "third");
897
898        let mut f = std::fs::File::create(&jsonl).unwrap();
899        write_event_line(&mut f, &e1);
900        write_event_line(&mut f, &e2);
901        write_event_line(&mut f, &e3);
902        drop(f);
903
904        // First pass — no marker yet, falls back to a full rebuild.
905        let conn = open(&db).unwrap();
906        let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
907        assert_eq!(n_first, 3);
908
909        // Append two more events.
910        let e4 = make_open_event("tj-i4", "fourth");
911        let e5 = make_open_event("tj-i5", "fifth");
912        let mut f = std::fs::OpenOptions::new()
913            .append(true)
914            .open(&jsonl)
915            .unwrap();
916        write_event_line(&mut f, &e4);
917        write_event_line(&mut f, &e5);
918        drop(f);
919
920        // Second pass — marker = e3, only e4 + e5 must be processed.
921        let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
922        assert_eq!(n_second, 2, "incremental ingest must read only the tail");
923
924        let total: i64 = conn
925            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
926            .unwrap();
927        assert_eq!(total, 5);
928
929        let marker: String = conn
930            .query_row(
931                "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
932                rusqlite::params![project],
933                |r| r.get(0),
934            )
935            .unwrap();
936        assert_eq!(marker, e5.event_id);
937    }
938
939    #[test]
940    fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
941        let d = TempDir::new().unwrap();
942        let jsonl = d.path().join("events.jsonl");
943        let db = d.path().join("s.sqlite");
944        let project = "feedfacefeedface";
945
946        let e1 = make_open_event("tj-r1", "first");
947        let mut f = std::fs::File::create(&jsonl).unwrap();
948        write_event_line(&mut f, &e1);
949        drop(f);
950
951        let conn = open(&db).unwrap();
952        ingest_new_events(&conn, &jsonl, project).unwrap();
953
954        // Replace the file entirely so the marker (e1.event_id) no longer
955        // appears anywhere — simulates corruption / hand-edit.
956        let e2 = make_open_event("tj-r2", "after-corruption");
957        let e3 = make_open_event("tj-r3", "after-corruption-2");
958        let mut f = std::fs::File::create(&jsonl).unwrap();
959        write_event_line(&mut f, &e2);
960        write_event_line(&mut f, &e3);
961        drop(f);
962
963        let n = ingest_new_events(&conn, &jsonl, project).unwrap();
964        assert_eq!(n, 2, "missing marker must trigger full rebuild");
965    }
966
967    #[test]
968    fn rebuild_state_and_ingest_new_events_produce_same_state() {
969        let d = TempDir::new().unwrap();
970        let jsonl_a = d.path().join("a.jsonl");
971        let jsonl_b = d.path().join("b.jsonl");
972        let db_a = d.path().join("a.sqlite");
973        let db_b = d.path().join("b.sqlite");
974
975        let events: Vec<_> = (0..5)
976            .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
977            .collect();
978        for path in [&jsonl_a, &jsonl_b] {
979            let mut f = std::fs::File::create(path).unwrap();
980            for e in &events {
981                write_event_line(&mut f, e);
982            }
983        }
984
985        let conn_a = open(&db_a).unwrap();
986        let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
987
988        let conn_b = open(&db_b).unwrap();
989        let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
990
991        assert_eq!(n_a, n_b);
992        assert_eq!(n_a, 5);
993
994        for table in ["tasks", "events_index"] {
995            let q = format!("SELECT COUNT(*) FROM {table}");
996            let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
997            let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
998            assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
999        }
1000    }
1001
1002    #[test]
1003    fn rebuild_state_skips_malformed_jsonl_lines() {
1004        use std::io::Write;
1005        let d = TempDir::new().unwrap();
1006        let events_path = d.path().join("events.jsonl");
1007        let db_path = d.path().join("s.sqlite");
1008
1009        let mut f = std::fs::File::create(&events_path).unwrap();
1010
1011        let mut e1 = crate::event::Event::new(
1012            "tj-skip",
1013            crate::event::EventType::Open,
1014            crate::event::Author::User,
1015            crate::event::Source::Cli,
1016            "x".into(),
1017        );
1018        e1.meta = serde_json::json!({"title": "Skip test"});
1019        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1020
1021        // Garbage that is not even JSON.
1022        writeln!(f, "this is not a json event line").unwrap();
1023
1024        // Valid JSON but not a valid Event (missing required fields).
1025        writeln!(f, "{{\"foo\": 1}}").unwrap();
1026
1027        let e3 = crate::event::Event::new(
1028            "tj-skip",
1029            crate::event::EventType::Decision,
1030            crate::event::Author::Agent,
1031            crate::event::Source::Chat,
1032            "Adopt Rust".into(),
1033        );
1034        writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1035        drop(f);
1036
1037        let conn = open(&db_path).unwrap();
1038        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1039            .expect("rebuild_state must succeed despite malformed lines");
1040        assert_eq!(
1041            n, 2,
1042            "expected 2 valid events indexed (2 malformed skipped)"
1043        );
1044
1045        let indexed: i64 = conn
1046            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1047            .unwrap();
1048        assert_eq!(indexed, 2);
1049    }
1050
1051    #[test]
1052    fn rebuild_state_reads_jsonl_and_populates_db() {
1053        use std::io::Write;
1054        let d = TempDir::new().unwrap();
1055        let events_path = d.path().join("events.jsonl");
1056        let db_path = d.path().join("s.sqlite");
1057
1058        let mut f = std::fs::File::create(&events_path).unwrap();
1059        let mut e1 = crate::event::Event::new(
1060            "tj-9",
1061            crate::event::EventType::Open,
1062            crate::event::Author::User,
1063            crate::event::Source::Cli,
1064            "x".into(),
1065        );
1066        e1.meta = serde_json::json!({"title": "Nine"});
1067        let e2 = crate::event::Event::new(
1068            "tj-9",
1069            crate::event::EventType::Decision,
1070            crate::event::Author::Agent,
1071            crate::event::Source::Chat,
1072            "Adopt Rust".into(),
1073        );
1074        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1075        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1076        drop(f);
1077
1078        let conn = open(&db_path).unwrap();
1079        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1080        assert_eq!(n, 2);
1081
1082        let n: i64 = conn
1083            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1084            .unwrap();
1085        assert_eq!(n, 1);
1086        let n: i64 = conn
1087            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1088            .unwrap();
1089        assert_eq!(n, 2);
1090    }
1091
1092    #[test]
1093    fn index_event_writes_index_and_fts() {
1094        let d = TempDir::new().unwrap();
1095        let conn = open(d.path().join("s.sqlite")).unwrap();
1096        let mut open_e = crate::event::Event::new(
1097            "tj-1",
1098            crate::event::EventType::Open,
1099            crate::event::Author::User,
1100            crate::event::Source::Cli,
1101            "Title".into(),
1102        );
1103        open_e.meta = serde_json::json!({"title": "Title"});
1104        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1105        index_event(&conn, &open_e).unwrap();
1106
1107        let mut decision = crate::event::Event::new(
1108            "tj-1",
1109            crate::event::EventType::Decision,
1110            crate::event::Author::Agent,
1111            crate::event::Source::Chat,
1112            "Adopt Rust".into(),
1113        );
1114        decision.confidence = Some(0.92);
1115        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1116        index_event(&conn, &decision).unwrap();
1117
1118        let count: i64 = conn
1119            .query_row(
1120                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1121                rusqlite::params!["tj-1"],
1122                |r| r.get(0),
1123            )
1124            .unwrap();
1125        assert_eq!(count, 2);
1126
1127        let mut stmt = conn
1128            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1129            .unwrap();
1130        let hits: Vec<String> = stmt
1131            .query_map(rusqlite::params!["Rust"], |r| {
1132                let s: String = r.get(0)?;
1133                Ok(s)
1134            })
1135            .unwrap()
1136            .collect::<Result<Vec<_>, _>>()
1137            .unwrap();
1138        assert_eq!(hits.len(), 1);
1139        assert_eq!(hits[0], decision.event_id);
1140    }
1141
1142    #[test]
1143    fn upsert_task_from_open_event_inserts_row() {
1144        let d = TempDir::new().unwrap();
1145        let conn = open(d.path().join("s.sqlite")).unwrap();
1146
1147        let mut e = crate::event::Event::new(
1148            "tj-7f3a",
1149            crate::event::EventType::Open,
1150            crate::event::Author::User,
1151            crate::event::Source::Cli,
1152            "Add OAuth".into(),
1153        );
1154        e.meta = serde_json::json!({ "title": "Add OAuth login" });
1155
1156        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1157
1158        let (id, title, status): (String, String, String) = conn
1159            .query_row(
1160                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1161                ["tj-7f3a"],
1162                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1163            )
1164            .unwrap();
1165
1166        assert_eq!(id, "tj-7f3a");
1167        assert_eq!(title, "Add OAuth login");
1168        assert_eq!(status, "open");
1169    }
1170}