Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6/// One forward-only schema migration. Migrations are applied in `version`
7/// order; each is recorded in `schema_migrations` so re-running `open()`
8/// is idempotent.
9struct Migration {
10    version: i64,
11    sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16  task_id        TEXT PRIMARY KEY,
17  title          TEXT NOT NULL,
18  status         TEXT NOT NULL,
19  project_hash   TEXT NOT NULL,
20  opened_at      TEXT NOT NULL,
21  closed_at      TEXT,
22  last_event_at  TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27  event_id    TEXT PRIMARY KEY,
28  task_id     TEXT NOT NULL,
29  type        TEXT NOT NULL,
30  timestamp   TEXT NOT NULL,
31  confidence  REAL,
32  status      TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37  decision_id    TEXT PRIMARY KEY,
38  task_id        TEXT NOT NULL,
39  text           TEXT NOT NULL,
40  status         TEXT NOT NULL,
41  superseded_by  TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45  evidence_id           TEXT PRIMARY KEY,
46  task_id               TEXT NOT NULL,
47  text                  TEXT NOT NULL,
48  strength              TEXT NOT NULL,
49  refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53  task_id             TEXT NOT NULL,
54  mode                TEXT NOT NULL,
55  text                TEXT NOT NULL,
56  generated_at        TEXT NOT NULL,
57  source_event_count  INTEGER NOT NULL,
58  PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62  task_id UNINDEXED,
63  event_id UNINDEXED,
64  text,
65  type
66);
67"#;
68
69/// Tracks how far we've ingested the JSONL log per project so subsequent
70/// `ingest_new_events` calls can read only the tail rather than rescanning
71/// the entire file. `last_indexed_event_id` is the `event_id` of the most
72/// recent event written to `events_index`.
73const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75  project_hash          TEXT PRIMARY KEY,
76  last_indexed_event_id TEXT NOT NULL,
77  updated_at            TEXT NOT NULL
78);
79"#;
80
81/// All schema migrations in version order. Append new entries here; never
82/// edit a published migration's `sql` — write a new one instead.
83const MIGRATIONS: &[Migration] = &[
84    Migration {
85        version: 1,
86        sql: MIGRATION_001,
87    },
88    Migration {
89        version: 2,
90        sql: MIGRATION_002,
91    },
92];
93
94fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
95    conn.execute_batch(
96        "CREATE TABLE IF NOT EXISTS schema_migrations (
97            version    INTEGER PRIMARY KEY,
98            applied_at TEXT NOT NULL
99        )",
100    )
101    .context("create schema_migrations table")?;
102
103    let applied: HashSet<i64> = {
104        let mut stmt = conn
105            .prepare("SELECT version FROM schema_migrations")
106            .context("select applied versions")?;
107        let rows = stmt
108            .query_map([], |r| r.get::<_, i64>(0))
109            .context("iterate schema_migrations")?;
110        rows.collect::<rusqlite::Result<HashSet<_>>>()
111            .context("collect applied versions")?
112    };
113
114    for migration in MIGRATIONS {
115        if applied.contains(&migration.version) {
116            continue;
117        }
118        conn.execute_batch(migration.sql)
119            .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
120        conn.execute(
121            "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
122            rusqlite::params![
123                migration.version,
124                chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
125            ],
126        )
127        .with_context(|| {
128            format!(
129                "record schema migration v{:03} as applied",
130                migration.version
131            )
132        })?;
133    }
134    Ok(())
135}
136
137use crate::event::{Event, EventType};
138
139pub fn upsert_task_from_event(
140    conn: &Connection,
141    event: &Event,
142    project_hash: &str,
143) -> anyhow::Result<()> {
144    match event.event_type {
145        EventType::Open => {
146            let title = event
147                .meta
148                .get("title")
149                .and_then(|v| v.as_str())
150                .unwrap_or(&event.text)
151                .to_string();
152            conn.execute(
153                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
154                 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
155                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
156                rusqlite::params![event.task_id, title, project_hash, event.timestamp],
157            )?;
158        }
159        EventType::Close => {
160            conn.execute(
161                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
162                rusqlite::params![event.task_id, event.timestamp],
163            )?;
164        }
165        EventType::Reopen => {
166            conn.execute(
167                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
168                rusqlite::params![event.task_id, event.timestamp],
169            )?;
170        }
171        _ => {
172            conn.execute(
173                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
174                rusqlite::params![event.task_id, event.timestamp],
175            )?;
176        }
177    }
178    Ok(())
179}
180
181use std::io::BufRead;
182
183pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
184    let dir = state_dir.as_ref();
185    if !dir.exists() {
186        return Ok(vec![]);
187    }
188    let mut out = Vec::new();
189    for entry in std::fs::read_dir(dir)? {
190        let entry = entry?;
191        let path = entry.path();
192        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
193            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
194                out.push(stem.to_string());
195            }
196        }
197    }
198    Ok(out)
199}
200
201pub fn rebuild_state(
202    conn: &Connection,
203    jsonl_path: impl AsRef<Path>,
204    project_hash: &str,
205) -> anyhow::Result<usize> {
206    let f = std::fs::File::open(&jsonl_path)
207        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
208    let reader = std::io::BufReader::new(f);
209
210    let tx = conn.unchecked_transaction()?;
211    let mut count = 0;
212    let mut last_event_id: Option<String> = None;
213    for (i, line) in reader.lines().enumerate() {
214        let line = line.with_context(|| format!("read line {i}"))?;
215        if line.trim().is_empty() {
216            continue;
217        }
218        // Malformed JSONL lines are skipped with a warning so that one bad
219        // event cannot abort an otherwise-recoverable rebuild. SQL errors
220        // still propagate — those indicate schema/integrity problems.
221        let event: Event = match serde_json::from_str(&line) {
222            Ok(e) => e,
223            Err(err) => {
224                tracing::warn!(
225                    line_number = i + 1,
226                    error = %err,
227                    "skipping malformed JSONL line in rebuild_state"
228                );
229                continue;
230            }
231        };
232        upsert_task_from_event(&tx, &event, project_hash)?;
233        index_event(&tx, &event)?;
234        last_event_id = Some(event.event_id.clone());
235        count += 1;
236    }
237    if let Some(eid) = last_event_id.as_deref() {
238        record_last_indexed(&tx, project_hash, eid)?;
239    }
240    tx.commit()?;
241    Ok(count)
242}
243
244/// Returns whether a task with this id has been recorded in the derived
245/// state. Cheap O(1) lookup against the `tasks` primary key. Callers
246/// should run [`ingest_new_events`] first if they want to see the latest
247/// JSONL state.
248pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
249    let count: i64 = conn.query_row(
250        "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
251        rusqlite::params![task_id],
252        |r| r.get(0),
253    )?;
254    Ok(count > 0)
255}
256
257/// Status string for an existing task (e.g. "open", "closed"). Returns
258/// `None` when the task is unknown — caller decides whether that's a
259/// hard error or a route-to-pending case.
260pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
261    let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
262    let mut rows = stmt.query(rusqlite::params![task_id])?;
263    Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
264}
265
266/// Look up the most recent `event_id` we've ingested for this project.
267/// Returns `None` when the project has never been indexed (first call,
268/// or migration v002 just landed on an existing 0.1.x DB).
269fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
270    let mut stmt =
271        conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
272    let mut rows = stmt.query(rusqlite::params![project_hash])?;
273    if let Some(row) = rows.next()? {
274        Ok(Some(row.get::<_, String>(0)?))
275    } else {
276        Ok(None)
277    }
278}
279
280fn record_last_indexed(
281    conn: &Connection,
282    project_hash: &str,
283    event_id: &str,
284) -> anyhow::Result<()> {
285    conn.execute(
286        "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
287         VALUES (?1, ?2, ?3)
288         ON CONFLICT(project_hash) DO UPDATE SET
289             last_indexed_event_id = excluded.last_indexed_event_id,
290             updated_at = excluded.updated_at",
291        rusqlite::params![
292            project_hash,
293            event_id,
294            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
295        ],
296    )?;
297    Ok(())
298}
299
300/// Read only the tail of the JSONL log since the last call. The cheap path
301/// for hot loops (every MCP tool invocation): scan to the marker, ingest
302/// the rest, update the marker.
303///
304/// Falls back to a full [`rebuild_state`] in two cases:
305/// - No marker yet for this project (first call after migration v002 or
306///   on a brand-new install).
307/// - The stored marker is not present in the JSONL (corrupted / truncated
308///   file). A `tracing::warn!` is emitted so the operator notices.
309pub fn ingest_new_events(
310    conn: &Connection,
311    jsonl_path: impl AsRef<Path>,
312    project_hash: &str,
313) -> anyhow::Result<usize> {
314    let marker = match last_indexed_event_id(conn, project_hash)? {
315        Some(id) => id,
316        None => return rebuild_state(conn, jsonl_path, project_hash),
317    };
318
319    let f = std::fs::File::open(&jsonl_path)
320        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
321    let reader = std::io::BufReader::new(f);
322
323    // First pass: confirm the marker still exists in the file. If it does
324    // not, the JSONL has been rewritten under us — we can't trust the
325    // marker, so we fall back to a full rebuild.
326    let tx = conn.unchecked_transaction()?;
327    let mut found_marker = false;
328    let mut count = 0;
329    let mut last_event_id: Option<String> = None;
330    for (i, line) in reader.lines().enumerate() {
331        let line = line.with_context(|| format!("read line {i}"))?;
332        if line.trim().is_empty() {
333            continue;
334        }
335        let event: Event = match serde_json::from_str(&line) {
336            Ok(e) => e,
337            Err(err) => {
338                tracing::warn!(
339                    line_number = i + 1,
340                    error = %err,
341                    "skipping malformed JSONL line in ingest_new_events"
342                );
343                continue;
344            }
345        };
346        if !found_marker {
347            if event.event_id == marker {
348                found_marker = true;
349            }
350            continue;
351        }
352        upsert_task_from_event(&tx, &event, project_hash)?;
353        index_event(&tx, &event)?;
354        last_event_id = Some(event.event_id.clone());
355        count += 1;
356    }
357
358    if !found_marker {
359        // Discard the (empty) tx and rebuild from scratch.
360        drop(tx);
361        tracing::warn!(
362            project_hash = project_hash,
363            marker = marker.as_str(),
364            "last_indexed_event_id not found in JSONL — falling back to full rebuild"
365        );
366        return rebuild_state(conn, jsonl_path, project_hash);
367    }
368
369    if let Some(eid) = last_event_id.as_deref() {
370        record_last_indexed(&tx, project_hash, eid)?;
371    }
372    tx.commit()?;
373    Ok(count)
374}
375
376pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
377    let type_str = serde_json::to_value(event.event_type)?
378        .as_str()
379        .unwrap()
380        .to_string();
381    let status_str = serde_json::to_value(event.status)?
382        .as_str()
383        .unwrap()
384        .to_string();
385    conn.execute(
386        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status)
387         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
388        rusqlite::params![
389            event.event_id, event.task_id, type_str,
390            event.timestamp, event.confidence, status_str
391        ],
392    )?;
393    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
394    conn.execute(
395        "DELETE FROM search_fts WHERE event_id=?1",
396        rusqlite::params![event.event_id],
397    )?;
398    conn.execute(
399        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
400        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
401    )?;
402
403    if event.event_type == EventType::Decision {
404        conn.execute(
405            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
406             VALUES (?1, ?2, ?3, 'active')",
407            rusqlite::params![event.event_id, event.task_id, event.text],
408        )?;
409    }
410
411    if event.event_type == EventType::Supersede {
412        if let Some(target) = &event.supersedes {
413            conn.execute(
414                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
415                rusqlite::params![event.event_id, target],
416            )?;
417        }
418    }
419
420    if event.event_type == EventType::Evidence {
421        let strength_str = event
422            .evidence_strength
423            .map(|s| {
424                serde_json::to_value(s)
425                    .unwrap()
426                    .as_str()
427                    .unwrap()
428                    .to_string()
429            })
430            .unwrap_or_else(|| "medium".into());
431        conn.execute(
432            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
433             VALUES (?1, ?2, ?3, ?4)",
434            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
435        )?;
436    }
437
438    // Invalidate any cached pack for this task.
439    conn.execute(
440        "DELETE FROM task_pack_cache WHERE task_id=?1",
441        rusqlite::params![event.task_id],
442    )?;
443
444    Ok(())
445}
446
447pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
448    if let Some(parent) = path.as_ref().parent() {
449        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
450    }
451    let conn =
452        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
453    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
454    apply_migrations(&conn).context("apply schema migrations")?;
455    Ok(conn)
456}
457
458/// One row of the task list rendered by the TUI: enough to render the
459/// list view without round-tripping for each task. `event_count` joins
460/// `events_index` so we don't need a second query per row.
461#[derive(Debug, Clone)]
462pub struct TaskRow {
463    pub task_id: String,
464    pub title: String,
465    pub status: String,
466    pub last_event_at: String,
467    pub event_count: usize,
468}
469
470/// All tasks for a project, ordered with open ones first (by recency)
471/// then closed ones. The TUI list view binds directly to this — there
472/// is no other consumer, so the shape is tuned for that callsite.
473pub fn list_tasks_by_project(
474    conn: &Connection,
475    project_hash: &str,
476) -> anyhow::Result<Vec<TaskRow>> {
477    let mut stmt = conn.prepare(
478        "SELECT t.task_id, t.title, t.status, t.last_event_at,
479                COALESCE(c.cnt, 0) AS event_count
480         FROM tasks t
481         LEFT JOIN (
482             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
483         ) c ON c.task_id = t.task_id
484         WHERE t.project_hash = ?1
485         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
486    )?;
487    let rows = stmt
488        .query_map(rusqlite::params![project_hash], |r| {
489            Ok(TaskRow {
490                task_id: r.get::<_, String>(0)?,
491                title: r.get::<_, String>(1)?,
492                status: r.get::<_, String>(2)?,
493                last_event_at: r.get::<_, String>(3)?,
494                event_count: r.get::<_, i64>(4)? as usize,
495            })
496        })?
497        .collect::<Result<Vec<_>, _>>()?;
498    Ok(rows)
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use tempfile::TempDir;
505
506    #[test]
507    fn task_exists_returns_true_for_known_id_false_otherwise() {
508        let d = TempDir::new().unwrap();
509        let conn = open(d.path().join("s.sqlite")).unwrap();
510
511        assert!(!task_exists(&conn, "tj-nope").unwrap());
512
513        let e = make_open_event("tj-yes", "Hello");
514        upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
515        index_event(&conn, &e).unwrap();
516
517        assert!(task_exists(&conn, "tj-yes").unwrap());
518        assert!(!task_exists(&conn, "tj-nope").unwrap());
519    }
520
521    #[test]
522    fn fresh_db_runs_all_migrations() {
523        let d = TempDir::new().unwrap();
524        let p = d.path().join("state.sqlite");
525        let conn = open(&p).unwrap();
526
527        let applied: Vec<i64> = conn
528            .prepare("SELECT version FROM schema_migrations ORDER BY version")
529            .unwrap()
530            .query_map([], |r| r.get::<_, i64>(0))
531            .unwrap()
532            .collect::<Result<_, _>>()
533            .unwrap();
534        assert_eq!(
535            applied,
536            (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
537            "every declared migration must be recorded"
538        );
539    }
540
541    #[test]
542    fn apply_migrations_is_idempotent_across_reopens() {
543        let d = TempDir::new().unwrap();
544        let p = d.path().join("state.sqlite");
545        let _ = open(&p).unwrap();
546        let _ = open(&p).unwrap();
547
548        let count: i64 = open(&p)
549            .unwrap()
550            .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
551            .unwrap();
552        assert_eq!(
553            count,
554            MIGRATIONS.len() as i64,
555            "schema_migrations must contain exactly one row per declared migration after repeated opens"
556        );
557    }
558
559    #[test]
560    fn open_creates_all_tables() {
561        let d = TempDir::new().unwrap();
562        let p = d.path().join("state.sqlite");
563        let conn = open(&p).unwrap();
564
565        let names: Vec<String> = conn
566            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
567            .unwrap()
568            .query_map([], |r| r.get::<_, String>(0))
569            .unwrap()
570            .collect::<Result<_, _>>()
571            .unwrap();
572
573        for required in [
574            "decisions",
575            "events_index",
576            "evidence",
577            "task_pack_cache",
578            "tasks",
579            "search_fts",
580        ] {
581            assert!(
582                names.iter().any(|n| n == required),
583                "missing table {required}, have {names:?}"
584            );
585        }
586    }
587
588    #[test]
589    fn open_is_idempotent() {
590        let d = TempDir::new().unwrap();
591        let p = d.path().join("state.sqlite");
592        let _ = open(&p).unwrap();
593        let _ = open(&p).unwrap();
594    }
595
596    #[test]
597    fn index_event_projects_evidence() {
598        let d = TempDir::new().unwrap();
599        let conn = open(d.path().join("s.sqlite")).unwrap();
600        let mut open_e = crate::event::Event::new(
601            "tj-e",
602            crate::event::EventType::Open,
603            crate::event::Author::User,
604            crate::event::Source::Cli,
605            "x".into(),
606        );
607        open_e.meta = serde_json::json!({"title": "T"});
608        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
609        index_event(&conn, &open_e).unwrap();
610
611        let mut ev = crate::event::Event::new(
612            "tj-e",
613            crate::event::EventType::Evidence,
614            crate::event::Author::Agent,
615            crate::event::Source::Chat,
616            "Hook startup measured at 12ms".into(),
617        );
618        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
619        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
620        index_event(&conn, &ev).unwrap();
621
622        let (text, strength): (String, String) = conn
623            .query_row(
624                "SELECT text, strength FROM evidence WHERE task_id=?1",
625                rusqlite::params!["tj-e"],
626                |r| Ok((r.get(0)?, r.get(1)?)),
627            )
628            .unwrap();
629        assert!(text.contains("12ms"));
630        assert_eq!(strength, "strong");
631    }
632
633    #[test]
634    fn supersede_event_marks_decision_superseded() {
635        let d = TempDir::new().unwrap();
636        let conn = open(d.path().join("s.sqlite")).unwrap();
637        let mut open_e = crate::event::Event::new(
638            "tj-s",
639            crate::event::EventType::Open,
640            crate::event::Author::User,
641            crate::event::Source::Cli,
642            "x".into(),
643        );
644        open_e.meta = serde_json::json!({"title": "T"});
645        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
646        index_event(&conn, &open_e).unwrap();
647
648        let dec = crate::event::Event::new(
649            "tj-s",
650            crate::event::EventType::Decision,
651            crate::event::Author::Agent,
652            crate::event::Source::Chat,
653            "Use TS".into(),
654        );
655        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
656        index_event(&conn, &dec).unwrap();
657
658        let mut sup = crate::event::Event::new(
659            "tj-s",
660            crate::event::EventType::Supersede,
661            crate::event::Author::Agent,
662            crate::event::Source::Chat,
663            "Replaced by Rust decision".into(),
664        );
665        sup.supersedes = Some(dec.event_id.clone());
666        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
667        index_event(&conn, &sup).unwrap();
668
669        let (status, by): (String, Option<String>) = conn
670            .query_row(
671                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
672                rusqlite::params![dec.event_id],
673                |r| Ok((r.get(0)?, r.get(1)?)),
674            )
675            .unwrap();
676        assert_eq!(status, "superseded");
677        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
678    }
679
680    #[test]
681    fn index_event_projects_decision_to_decisions_table() {
682        let d = TempDir::new().unwrap();
683        let conn = open(d.path().join("s.sqlite")).unwrap();
684
685        let mut open_e = crate::event::Event::new(
686            "tj-d",
687            crate::event::EventType::Open,
688            crate::event::Author::User,
689            crate::event::Source::Cli,
690            "x".into(),
691        );
692        open_e.meta = serde_json::json!({"title": "T"});
693        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
694        index_event(&conn, &open_e).unwrap();
695
696        let dec = crate::event::Event::new(
697            "tj-d",
698            crate::event::EventType::Decision,
699            crate::event::Author::Agent,
700            crate::event::Source::Chat,
701            "Adopt Rust".into(),
702        );
703        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
704        index_event(&conn, &dec).unwrap();
705
706        let (id, text, status): (String, String, String) = conn
707            .query_row(
708                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
709                rusqlite::params!["tj-d"],
710                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
711            )
712            .unwrap();
713        assert_eq!(id, dec.event_id);
714        assert_eq!(text, "Adopt Rust");
715        assert_eq!(status, "active");
716    }
717
718    #[test]
719    fn index_event_is_idempotent_no_search_fts_duplicates() {
720        let d = TempDir::new().unwrap();
721        let conn = open(d.path().join("s.sqlite")).unwrap();
722        let mut open_e = crate::event::Event::new(
723            "tj-id",
724            crate::event::EventType::Open,
725            crate::event::Author::User,
726            crate::event::Source::Cli,
727            "x".into(),
728        );
729        open_e.meta = serde_json::json!({"title": "Idempotent"});
730        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
731
732        // Index three times — simulates rebuild_state replays.
733        index_event(&conn, &open_e).unwrap();
734        index_event(&conn, &open_e).unwrap();
735        index_event(&conn, &open_e).unwrap();
736
737        let n: i64 = conn
738            .query_row(
739                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
740                rusqlite::params![open_e.event_id],
741                |r| r.get(0),
742            )
743            .unwrap();
744        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
745    }
746
747    #[test]
748    fn list_all_projects_returns_hashes_from_state_dir() {
749        use std::fs::File;
750        let d = TempDir::new().unwrap();
751        let state_dir = d.path().join("state");
752        std::fs::create_dir_all(&state_dir).unwrap();
753        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
754        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
755        File::create(state_dir.join("not-a-project.txt")).unwrap();
756
757        let mut hashes = list_all_projects(&state_dir).unwrap();
758        hashes.sort();
759        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
760    }
761
762    fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
763        use std::io::Write;
764        writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
765    }
766
767    fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
768        let mut e = crate::event::Event::new(
769            task_id,
770            crate::event::EventType::Open,
771            crate::event::Author::User,
772            crate::event::Source::Cli,
773            "x".into(),
774        );
775        e.meta = serde_json::json!({"title": title});
776        e
777    }
778
779    #[test]
780    fn ingest_new_events_picks_up_only_new_lines() {
781        let d = TempDir::new().unwrap();
782        let jsonl = d.path().join("events.jsonl");
783        let db = d.path().join("s.sqlite");
784        let project = "deadbeefdeadbeef";
785
786        let e1 = make_open_event("tj-i1", "first");
787        let e2 = make_open_event("tj-i2", "second");
788        let e3 = make_open_event("tj-i3", "third");
789
790        let mut f = std::fs::File::create(&jsonl).unwrap();
791        write_event_line(&mut f, &e1);
792        write_event_line(&mut f, &e2);
793        write_event_line(&mut f, &e3);
794        drop(f);
795
796        // First pass — no marker yet, falls back to a full rebuild.
797        let conn = open(&db).unwrap();
798        let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
799        assert_eq!(n_first, 3);
800
801        // Append two more events.
802        let e4 = make_open_event("tj-i4", "fourth");
803        let e5 = make_open_event("tj-i5", "fifth");
804        let mut f = std::fs::OpenOptions::new()
805            .append(true)
806            .open(&jsonl)
807            .unwrap();
808        write_event_line(&mut f, &e4);
809        write_event_line(&mut f, &e5);
810        drop(f);
811
812        // Second pass — marker = e3, only e4 + e5 must be processed.
813        let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
814        assert_eq!(n_second, 2, "incremental ingest must read only the tail");
815
816        let total: i64 = conn
817            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
818            .unwrap();
819        assert_eq!(total, 5);
820
821        let marker: String = conn
822            .query_row(
823                "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
824                rusqlite::params![project],
825                |r| r.get(0),
826            )
827            .unwrap();
828        assert_eq!(marker, e5.event_id);
829    }
830
831    #[test]
832    fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
833        let d = TempDir::new().unwrap();
834        let jsonl = d.path().join("events.jsonl");
835        let db = d.path().join("s.sqlite");
836        let project = "feedfacefeedface";
837
838        let e1 = make_open_event("tj-r1", "first");
839        let mut f = std::fs::File::create(&jsonl).unwrap();
840        write_event_line(&mut f, &e1);
841        drop(f);
842
843        let conn = open(&db).unwrap();
844        ingest_new_events(&conn, &jsonl, project).unwrap();
845
846        // Replace the file entirely so the marker (e1.event_id) no longer
847        // appears anywhere — simulates corruption / hand-edit.
848        let e2 = make_open_event("tj-r2", "after-corruption");
849        let e3 = make_open_event("tj-r3", "after-corruption-2");
850        let mut f = std::fs::File::create(&jsonl).unwrap();
851        write_event_line(&mut f, &e2);
852        write_event_line(&mut f, &e3);
853        drop(f);
854
855        let n = ingest_new_events(&conn, &jsonl, project).unwrap();
856        assert_eq!(n, 2, "missing marker must trigger full rebuild");
857    }
858
859    #[test]
860    fn rebuild_state_and_ingest_new_events_produce_same_state() {
861        let d = TempDir::new().unwrap();
862        let jsonl_a = d.path().join("a.jsonl");
863        let jsonl_b = d.path().join("b.jsonl");
864        let db_a = d.path().join("a.sqlite");
865        let db_b = d.path().join("b.sqlite");
866
867        let events: Vec<_> = (0..5)
868            .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
869            .collect();
870        for path in [&jsonl_a, &jsonl_b] {
871            let mut f = std::fs::File::create(path).unwrap();
872            for e in &events {
873                write_event_line(&mut f, e);
874            }
875        }
876
877        let conn_a = open(&db_a).unwrap();
878        let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
879
880        let conn_b = open(&db_b).unwrap();
881        let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
882
883        assert_eq!(n_a, n_b);
884        assert_eq!(n_a, 5);
885
886        for table in ["tasks", "events_index"] {
887            let q = format!("SELECT COUNT(*) FROM {table}");
888            let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
889            let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
890            assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
891        }
892    }
893
894    #[test]
895    fn rebuild_state_skips_malformed_jsonl_lines() {
896        use std::io::Write;
897        let d = TempDir::new().unwrap();
898        let events_path = d.path().join("events.jsonl");
899        let db_path = d.path().join("s.sqlite");
900
901        let mut f = std::fs::File::create(&events_path).unwrap();
902
903        let mut e1 = crate::event::Event::new(
904            "tj-skip",
905            crate::event::EventType::Open,
906            crate::event::Author::User,
907            crate::event::Source::Cli,
908            "x".into(),
909        );
910        e1.meta = serde_json::json!({"title": "Skip test"});
911        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
912
913        // Garbage that is not even JSON.
914        writeln!(f, "this is not a json event line").unwrap();
915
916        // Valid JSON but not a valid Event (missing required fields).
917        writeln!(f, "{{\"foo\": 1}}").unwrap();
918
919        let e3 = crate::event::Event::new(
920            "tj-skip",
921            crate::event::EventType::Decision,
922            crate::event::Author::Agent,
923            crate::event::Source::Chat,
924            "Adopt Rust".into(),
925        );
926        writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
927        drop(f);
928
929        let conn = open(&db_path).unwrap();
930        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
931            .expect("rebuild_state must succeed despite malformed lines");
932        assert_eq!(
933            n, 2,
934            "expected 2 valid events indexed (2 malformed skipped)"
935        );
936
937        let indexed: i64 = conn
938            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
939            .unwrap();
940        assert_eq!(indexed, 2);
941    }
942
943    #[test]
944    fn rebuild_state_reads_jsonl_and_populates_db() {
945        use std::io::Write;
946        let d = TempDir::new().unwrap();
947        let events_path = d.path().join("events.jsonl");
948        let db_path = d.path().join("s.sqlite");
949
950        let mut f = std::fs::File::create(&events_path).unwrap();
951        let mut e1 = crate::event::Event::new(
952            "tj-9",
953            crate::event::EventType::Open,
954            crate::event::Author::User,
955            crate::event::Source::Cli,
956            "x".into(),
957        );
958        e1.meta = serde_json::json!({"title": "Nine"});
959        let e2 = crate::event::Event::new(
960            "tj-9",
961            crate::event::EventType::Decision,
962            crate::event::Author::Agent,
963            crate::event::Source::Chat,
964            "Adopt Rust".into(),
965        );
966        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
967        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
968        drop(f);
969
970        let conn = open(&db_path).unwrap();
971        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
972        assert_eq!(n, 2);
973
974        let n: i64 = conn
975            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
976            .unwrap();
977        assert_eq!(n, 1);
978        let n: i64 = conn
979            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
980            .unwrap();
981        assert_eq!(n, 2);
982    }
983
984    #[test]
985    fn index_event_writes_index_and_fts() {
986        let d = TempDir::new().unwrap();
987        let conn = open(d.path().join("s.sqlite")).unwrap();
988        let mut open_e = crate::event::Event::new(
989            "tj-1",
990            crate::event::EventType::Open,
991            crate::event::Author::User,
992            crate::event::Source::Cli,
993            "Title".into(),
994        );
995        open_e.meta = serde_json::json!({"title": "Title"});
996        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
997        index_event(&conn, &open_e).unwrap();
998
999        let mut decision = crate::event::Event::new(
1000            "tj-1",
1001            crate::event::EventType::Decision,
1002            crate::event::Author::Agent,
1003            crate::event::Source::Chat,
1004            "Adopt Rust".into(),
1005        );
1006        decision.confidence = Some(0.92);
1007        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1008        index_event(&conn, &decision).unwrap();
1009
1010        let count: i64 = conn
1011            .query_row(
1012                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1013                rusqlite::params!["tj-1"],
1014                |r| r.get(0),
1015            )
1016            .unwrap();
1017        assert_eq!(count, 2);
1018
1019        let mut stmt = conn
1020            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1021            .unwrap();
1022        let hits: Vec<String> = stmt
1023            .query_map(rusqlite::params!["Rust"], |r| {
1024                let s: String = r.get(0)?;
1025                Ok(s)
1026            })
1027            .unwrap()
1028            .collect::<Result<Vec<_>, _>>()
1029            .unwrap();
1030        assert_eq!(hits.len(), 1);
1031        assert_eq!(hits[0], decision.event_id);
1032    }
1033
1034    #[test]
1035    fn upsert_task_from_open_event_inserts_row() {
1036        let d = TempDir::new().unwrap();
1037        let conn = open(d.path().join("s.sqlite")).unwrap();
1038
1039        let mut e = crate::event::Event::new(
1040            "tj-7f3a",
1041            crate::event::EventType::Open,
1042            crate::event::Author::User,
1043            crate::event::Source::Cli,
1044            "Add OAuth".into(),
1045        );
1046        e.meta = serde_json::json!({ "title": "Add OAuth login" });
1047
1048        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1049
1050        let (id, title, status): (String, String, String) = conn
1051            .query_row(
1052                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1053                ["tj-7f3a"],
1054                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1055            )
1056            .unwrap();
1057
1058        assert_eq!(id, "tj-7f3a");
1059        assert_eq!(title, "Add OAuth login");
1060        assert_eq!(status, "open");
1061    }
1062}