Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6/// One forward-only schema migration. Migrations are applied in `version`
7/// order; each is recorded in `schema_migrations` so re-running `open()`
8/// is idempotent.
9struct Migration {
10    version: i64,
11    sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16  task_id        TEXT PRIMARY KEY,
17  title          TEXT NOT NULL,
18  status         TEXT NOT NULL,
19  project_hash   TEXT NOT NULL,
20  opened_at      TEXT NOT NULL,
21  closed_at      TEXT,
22  last_event_at  TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27  event_id    TEXT PRIMARY KEY,
28  task_id     TEXT NOT NULL,
29  type        TEXT NOT NULL,
30  timestamp   TEXT NOT NULL,
31  confidence  REAL,
32  status      TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37  decision_id    TEXT PRIMARY KEY,
38  task_id        TEXT NOT NULL,
39  text           TEXT NOT NULL,
40  status         TEXT NOT NULL,
41  superseded_by  TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45  evidence_id           TEXT PRIMARY KEY,
46  task_id               TEXT NOT NULL,
47  text                  TEXT NOT NULL,
48  strength              TEXT NOT NULL,
49  refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53  task_id             TEXT NOT NULL,
54  mode                TEXT NOT NULL,
55  text                TEXT NOT NULL,
56  generated_at        TEXT NOT NULL,
57  source_event_count  INTEGER NOT NULL,
58  PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62  task_id UNINDEXED,
63  event_id UNINDEXED,
64  text,
65  type
66);
67"#;
68
69/// Tracks how far we've ingested the JSONL log per project so subsequent
70/// `ingest_new_events` calls can read only the tail rather than rescanning
71/// the entire file. `last_indexed_event_id` is the `event_id` of the most
72/// recent event written to `events_index`.
73const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75  project_hash          TEXT PRIMARY KEY,
76  last_indexed_event_id TEXT NOT NULL,
77  updated_at            TEXT NOT NULL
78);
79"#;
80
81/// v0.4.0 task-as-goal redesign: explicit goal/outcome on tasks +
82/// typed artifacts on events. NULLable so existing rows survive
83/// without backfill. Wipes the pack cache so old packs (rendered
84/// without Goal/Outcome blocks) regenerate on next view.
85const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal        TEXT;
87ALTER TABLE tasks ADD COLUMN outcome     TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external    TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94// v0.5.0 Phase B — artifacts auto-extract on ingest. The column was
95// added in v003 but stayed NULL for everyone; v004 just wipes the
96// pack cache so newly-extracted artifacts surface in the next pack
97// render. Existing events stay NULL until `reclassify` (Phase B+) or
98// `rebuild-state` is run.
99const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103/// v0.12.0 dream Pass A — per-project watermark of the last successful
104/// dream run. Sessions modified after this are in scope for the next run.
105const MIGRATION_005: &str = r#"
106CREATE TABLE IF NOT EXISTS dream_state (
107  project_hash    TEXT PRIMARY KEY,
108  last_dream_at   TEXT NOT NULL,
109  updated_at      TEXT NOT NULL
110);
111"#;
112
113/// v0.12.0 subtask hierarchy — nullable `parent_id` carries the parent
114/// task on the `open` event's `meta.parent_id`. Existing flat tasks stay
115/// NULL. Index supports `children_of` lookups.
116const MIGRATION_006: &str = r#"
117ALTER TABLE tasks ADD COLUMN parent_id TEXT;
118CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_id);
119"#;
120
121/// v0.12.0 structured decision alternatives — nullable `alternatives`
122/// carries the JSON array from a decision event's `meta.alternatives`
123/// (objects like `{option, chosen, rationale}`). Existing decisions stay
124/// NULL; the append-only log is untouched. Wipes the pack cache so packs
125/// re-render with the alternatives block once events carry it.
126const MIGRATION_007: &str = r#"
127ALTER TABLE decisions ADD COLUMN alternatives TEXT;
128DELETE FROM task_pack_cache;
129"#;
130
131/// All schema migrations in version order. Append new entries here; never
132/// edit a published migration's `sql` — write a new one instead.
133const MIGRATIONS: &[Migration] = &[
134    Migration {
135        version: 1,
136        sql: MIGRATION_001,
137    },
138    Migration {
139        version: 2,
140        sql: MIGRATION_002,
141    },
142    Migration {
143        version: 3,
144        sql: MIGRATION_003,
145    },
146    Migration {
147        version: 4,
148        sql: MIGRATION_004,
149    },
150    Migration {
151        version: 5,
152        sql: MIGRATION_005,
153    },
154    Migration {
155        version: 6,
156        sql: MIGRATION_006,
157    },
158    Migration {
159        version: 7,
160        sql: MIGRATION_007,
161    },
162];
163
164fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
165    conn.execute_batch(
166        "CREATE TABLE IF NOT EXISTS schema_migrations (
167            version    INTEGER PRIMARY KEY,
168            applied_at TEXT NOT NULL
169        )",
170    )
171    .context("create schema_migrations table")?;
172
173    let applied: HashSet<i64> = {
174        let mut stmt = conn
175            .prepare("SELECT version FROM schema_migrations")
176            .context("select applied versions")?;
177        let rows = stmt
178            .query_map([], |r| r.get::<_, i64>(0))
179            .context("iterate schema_migrations")?;
180        rows.collect::<rusqlite::Result<HashSet<_>>>()
181            .context("collect applied versions")?
182    };
183
184    for migration in MIGRATIONS {
185        if applied.contains(&migration.version) {
186            continue;
187        }
188        conn.execute_batch(migration.sql)
189            .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
190        conn.execute(
191            "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
192            rusqlite::params![
193                migration.version,
194                chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
195            ],
196        )
197        .with_context(|| {
198            format!(
199                "record schema migration v{:03} as applied",
200                migration.version
201            )
202        })?;
203    }
204    Ok(())
205}
206
207use crate::event::{Event, EventType};
208
209pub fn upsert_task_from_event(
210    conn: &Connection,
211    event: &Event,
212    project_hash: &str,
213) -> anyhow::Result<()> {
214    match event.event_type {
215        EventType::Open => {
216            let title = event
217                .meta
218                .get("title")
219                .and_then(|v| v.as_str())
220                .unwrap_or(&event.text)
221                .to_string();
222            let parent_id = event
223                .meta
224                .get("parent_id")
225                .and_then(|v| v.as_str())
226                .map(str::to_string);
227            // ON CONFLICT intentionally does not overwrite parent_id — parent
228            // is set once at creation; re-parenting is a separate future path.
229            conn.execute(
230                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at, parent_id)
231                 VALUES (?1, ?2, 'open', ?3, ?4, ?4, ?5)
232                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
233                rusqlite::params![event.task_id, title, project_hash, event.timestamp, parent_id],
234            )?;
235        }
236        EventType::Close => {
237            conn.execute(
238                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
239                rusqlite::params![event.task_id, event.timestamp],
240            )?;
241        }
242        EventType::Reopen => {
243            conn.execute(
244                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
245                rusqlite::params![event.task_id, event.timestamp],
246            )?;
247        }
248        _ => {
249            conn.execute(
250                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
251                rusqlite::params![event.task_id, event.timestamp],
252            )?;
253        }
254    }
255    Ok(())
256}
257
258use std::io::BufRead;
259
260pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
261    let dir = state_dir.as_ref();
262    if !dir.exists() {
263        return Ok(vec![]);
264    }
265    let mut out = Vec::new();
266    for entry in std::fs::read_dir(dir)? {
267        let entry = entry?;
268        let path = entry.path();
269        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
270            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
271                out.push(stem.to_string());
272            }
273        }
274    }
275    Ok(out)
276}
277
278pub fn rebuild_state(
279    conn: &Connection,
280    jsonl_path: impl AsRef<Path>,
281    project_hash: &str,
282) -> anyhow::Result<usize> {
283    let f = std::fs::File::open(&jsonl_path)
284        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
285    let reader = std::io::BufReader::new(f);
286
287    let tx = conn.unchecked_transaction()?;
288    let mut count = 0;
289    let mut last_event_id: Option<String> = None;
290    for (i, line) in reader.lines().enumerate() {
291        let line = line.with_context(|| format!("read line {i}"))?;
292        if line.trim().is_empty() {
293            continue;
294        }
295        // Malformed JSONL lines are skipped with a warning so that one bad
296        // event cannot abort an otherwise-recoverable rebuild. SQL errors
297        // still propagate — those indicate schema/integrity problems.
298        let event: Event = match serde_json::from_str(&line) {
299            Ok(e) => e,
300            Err(err) => {
301                tracing::warn!(
302                    line_number = i + 1,
303                    error = %err,
304                    "skipping malformed JSONL line in rebuild_state"
305                );
306                continue;
307            }
308        };
309        upsert_task_from_event(&tx, &event, project_hash)?;
310        index_event(&tx, &event)?;
311        last_event_id = Some(event.event_id.clone());
312        count += 1;
313    }
314    if let Some(eid) = last_event_id.as_deref() {
315        record_last_indexed(&tx, project_hash, eid)?;
316    }
317    tx.commit()?;
318    Ok(count)
319}
320
321/// Returns whether a task with this id has been recorded in the derived
322/// state. Cheap O(1) lookup against the `tasks` primary key. Callers
323/// should run [`ingest_new_events`] first if they want to see the latest
324/// JSONL state.
325pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
326    let count: i64 = conn.query_row(
327        "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
328        rusqlite::params![task_id],
329        |r| r.get(0),
330    )?;
331    Ok(count > 0)
332}
333
334/// Status string for an existing task (e.g. "open", "closed"). Returns
335/// `None` when the task is unknown — caller decides whether that's a
336/// hard error or a route-to-pending case.
337pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
338    let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
339    let mut rows = stmt.query(rusqlite::params![task_id])?;
340    Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
341}
342
343/// Set or replace `tasks.goal` for an existing task. Caller is
344/// expected to have validated the task exists (via `task_exists`); we
345/// don't error on no-op rows so the upsert pattern is uniform.
346pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
347    conn.execute(
348        "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
349        rusqlite::params![goal, task_id],
350    )
351    .with_context(|| format!("set goal for {task_id}"))?;
352    // Pack cache is now stale for this task — drop the entry so the
353    // next render picks up the new goal.
354    conn.execute(
355        "DELETE FROM task_pack_cache WHERE task_id = ?1",
356        rusqlite::params![task_id],
357    )?;
358    Ok(())
359}
360
361/// Set or replace the closure metadata. Pass `None` for `outcome_tag`
362/// to leave it unset; pass `Some("done"|"abandoned"|"superseded")`
363/// for a structured tag. Free-text `outcome` is the primary field.
364pub fn set_task_outcome(
365    conn: &Connection,
366    task_id: &str,
367    outcome: &str,
368    outcome_tag: Option<&str>,
369) -> anyhow::Result<()> {
370    conn.execute(
371        "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
372        rusqlite::params![outcome, outcome_tag, task_id],
373    )
374    .with_context(|| format!("set outcome for {task_id}"))?;
375    conn.execute(
376        "DELETE FROM task_pack_cache WHERE task_id = ?1",
377        rusqlite::params![task_id],
378    )?;
379    Ok(())
380}
381
382/// Append an external reference to `tasks.external`. The column is
383/// stored as a comma-separated list — small, append-mostly, no
384/// uniqueness constraint. Acceptable shapes (loose, not enforced):
385/// `beads:claude-memory-rsw`, `github:#42`, `jira:PROJ-1234`.
386pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
387    let current: Option<String> = conn
388        .query_row(
389            "SELECT external FROM tasks WHERE task_id = ?1",
390            rusqlite::params![task_id],
391            |r| r.get::<_, Option<String>>(0),
392        )
393        .with_context(|| format!("read external for {task_id}"))?;
394    let next = match current {
395        Some(s) if !s.is_empty() => format!("{s},{reference}"),
396        _ => reference.to_string(),
397    };
398    conn.execute(
399        "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
400        rusqlite::params![next, task_id],
401    )?;
402    conn.execute(
403        "DELETE FROM task_pack_cache WHERE task_id = ?1",
404        rusqlite::params![task_id],
405    )?;
406    Ok(())
407}
408
409/// Read-only metadata bundle used by pack rendering (and TUI list
410/// teasers in v0.4.0+). Returns `None` for unknown tasks.
411#[derive(Debug, Clone, Default)]
412pub struct TaskMetadata {
413    pub goal: Option<String>,
414    pub outcome: Option<String>,
415    pub outcome_tag: Option<String>,
416    pub external: Option<String>,
417}
418
419pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
420    let mut stmt =
421        conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
422    let mut rows = stmt.query(rusqlite::params![task_id])?;
423    Ok(match rows.next()? {
424        Some(r) => Some(TaskMetadata {
425            goal: r.get::<_, Option<String>>(0)?,
426            outcome: r.get::<_, Option<String>>(1)?,
427            outcome_tag: r.get::<_, Option<String>>(2)?,
428            external: r.get::<_, Option<String>>(3)?,
429        }),
430        None => None,
431    })
432}
433
434/// One row of the stale-task report: an open task whose last event
435/// crossed the inactivity threshold.
436#[derive(Debug, Clone)]
437pub struct StaleTask {
438    pub task_id: String,
439    pub title: String,
440    pub last_event_at: String,
441    pub days_idle: i64,
442}
443
444/// Find open tasks with no event in the last `days` days. Sorted by
445/// idle time descending so the user sees the most ancient first.
446pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
447    let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
448    let cutoff_str = cutoff.to_rfc3339();
449    let mut stmt = conn.prepare(
450        "SELECT task_id, title, last_event_at FROM tasks
451         WHERE status = 'open' AND last_event_at < ?1
452         ORDER BY last_event_at ASC",
453    )?;
454    let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
455        Ok((
456            r.get::<_, String>(0)?,
457            r.get::<_, String>(1)?,
458            r.get::<_, String>(2)?,
459        ))
460    })?;
461    let now = chrono::Utc::now();
462    let mut out = Vec::new();
463    for row in rows {
464        let (task_id, title, last_at) = row?;
465        let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
466            .map(|d| d.with_timezone(&chrono::Utc))
467            .unwrap_or(now);
468        let days_idle = (now - dt).num_days();
469        out.push(StaleTask {
470            task_id,
471            title,
472            last_event_at: last_at,
473            days_idle,
474        });
475    }
476    Ok(out)
477}
478
479/// Score-weighted relationship between a fresh prompt's artifacts and
480/// every prior task's artifacts. Higher score = stronger continuation
481/// signal. Threshold tuning is the caller's job; v0.6.0 auto-link
482/// keeps anything with score > 0.0.
483#[derive(Debug, Clone)]
484pub struct RelatedTask {
485    pub task_id: String,
486    pub status: String,
487    pub score: f64,
488}
489
490/// Find tasks whose events overlap the given artifacts on any
491/// dimension we have a signal for. Weights:
492///   shared linked_issue → +1.0   (strongest, ticket id is unique)
493///   shared commit_hash  → +0.8   (commits are nearly unique)
494///   shared file path    → +0.3   (files churn across tasks)
495///
496/// The scan reads `events_index.artifacts` (JSON) directly with LIKE
497/// substring matches — JSON1 would be cleaner but keeps the codepath
498/// dependency-free. Returns top hits sorted by score desc; ties keep
499/// the most-recent task first.
500pub fn find_related_tasks(
501    conn: &Connection,
502    arts: &crate::artifacts::Artifacts,
503) -> anyhow::Result<Vec<RelatedTask>> {
504    use std::collections::HashMap;
505    if arts.is_empty() {
506        return Ok(Vec::new());
507    }
508    let mut scores: HashMap<String, f64> = HashMap::new();
509    let mut last_seen: HashMap<String, String> = HashMap::new();
510
511    let needles: Vec<(String, f64)> = arts
512        .linked_issues
513        .iter()
514        .map(|s| (s.clone(), 1.0))
515        .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
516        .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
517        .collect();
518
519    for (needle, weight) in needles {
520        let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
521        let mut stmt = conn.prepare(
522            "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
523             WHERE artifacts LIKE ?1
524             GROUP BY task_id
525             ORDER BY ts DESC",
526        )?;
527        let rows = stmt.query_map(rusqlite::params![pattern], |r| {
528            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
529        })?;
530        for row in rows {
531            let (id, ts) = row?;
532            *scores.entry(id.clone()).or_insert(0.0) += weight;
533            last_seen.insert(id, ts);
534        }
535    }
536
537    let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
538    for (id, score) in scores {
539        let status: Option<String> = conn
540            .query_row(
541                "SELECT status FROM tasks WHERE task_id = ?1",
542                rusqlite::params![&id],
543                |r| r.get(0),
544            )
545            .ok();
546        if let Some(status) = status {
547            out.push(RelatedTask {
548                task_id: id,
549                status,
550                score,
551            });
552        }
553    }
554    out.sort_by(|a, b| {
555        b.score
556            .partial_cmp(&a.score)
557            .unwrap_or(std::cmp::Ordering::Equal)
558            .then_with(|| {
559                let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
560                let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
561                ts_b.cmp(&ts_a)
562            })
563    });
564    Ok(out)
565}
566
567/// Find tasks (open or closed) whose events reference any of the given
568/// issue identifiers (FIN-868, JIRA-123, INC-7…). Looks at the
569/// per-event `artifacts.linked_issues` column populated on ingest.
570/// Returns `(task_id, status)` deduplicated, most-recent first. Used
571/// by the v0.5.0 Phase C auto-link flow to recognise that a fresh
572/// prompt is a continuation of a prior task.
573pub fn find_tasks_by_linked_issues(
574    conn: &Connection,
575    issues: &[String],
576) -> anyhow::Result<Vec<(String, String)>> {
577    if issues.is_empty() {
578        return Ok(Vec::new());
579    }
580    // Stage A: collect candidate task_ids whose events_index.artifacts
581    // contains any of the requested issue strings. JSON1 is overkill
582    // here — a substring LIKE on the raw JSON is correct given the
583    // ticket id format ("FIN-868") never appears outside its own
584    // linked_issues array.
585    let mut candidate_ids: Vec<String> = Vec::new();
586    for issue in issues {
587        let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
588        let mut stmt = conn.prepare(
589            "SELECT DISTINCT task_id FROM events_index
590             WHERE artifacts LIKE ?1
591             ORDER BY timestamp DESC",
592        )?;
593        let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
594        for r in rows {
595            let id = r?;
596            if !candidate_ids.contains(&id) {
597                candidate_ids.push(id);
598            }
599        }
600    }
601    // Stage B: hydrate status for each candidate.
602    let mut out = Vec::with_capacity(candidate_ids.len());
603    for id in candidate_ids {
604        let status: Option<String> = conn
605            .query_row(
606                "SELECT status FROM tasks WHERE task_id = ?1",
607                rusqlite::params![&id],
608                |r| r.get(0),
609            )
610            .ok();
611        if let Some(s) = status {
612            out.push((id, s));
613        }
614    }
615    Ok(out)
616}
617
618/// Re-run artifact extraction over every event of a task and write the
619/// result back to `events_index.artifacts`. Used to backfill events
620/// that were ingested before Phase B landed. Returns the number of
621/// events touched. Wipes the pack cache for the task so the next
622/// render reflects the freshly extracted artifacts.
623pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
624    let mut stmt = conn.prepare(
625        "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
626         LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
627         WHERE ei.task_id = ?1",
628    )?;
629    let rows: Vec<(String, String)> = stmt
630        .query_map(rusqlite::params![task_id], |r| {
631            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
632        })?
633        .collect::<Result<_, _>>()?;
634    let count = rows.len();
635    for (event_id, text) in rows {
636        let arts = crate::artifacts::extract(&text);
637        let json = if arts.is_empty() {
638            None
639        } else {
640            Some(serde_json::to_string(&arts)?)
641        };
642        conn.execute(
643            "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
644            rusqlite::params![json, event_id],
645        )?;
646    }
647    invalidate_pack_cascade(conn, task_id)?;
648    Ok(count)
649}
650
651/// Aggregate artifacts (commit hashes, PR URLs, ticket IDs, files,
652/// branches) across every event of a task, deduplicated. Reads the
653/// per-event JSON payload that `ingest_new_events` populated. Skips
654/// events whose `artifacts` column is NULL or unparseable rather than
655/// failing the pack render.
656pub fn task_artifacts(
657    conn: &Connection,
658    task_id: &str,
659) -> anyhow::Result<crate::artifacts::Artifacts> {
660    let mut stmt = conn.prepare(
661        "SELECT artifacts FROM events_index
662         WHERE task_id = ?1 AND artifacts IS NOT NULL
663         ORDER BY timestamp ASC",
664    )?;
665    let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
666    let mut acc = crate::artifacts::Artifacts::default();
667    for row in rows {
668        let json = row?;
669        if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
670            acc.merge(parsed);
671        }
672    }
673    Ok(acc)
674}
675
676/// Look up the most recent `event_id` we've ingested for this project.
677/// Returns `None` when the project has never been indexed (first call,
678/// or migration v002 just landed on an existing 0.1.x DB).
679fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
680    let mut stmt =
681        conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
682    let mut rows = stmt.query(rusqlite::params![project_hash])?;
683    if let Some(row) = rows.next()? {
684        Ok(Some(row.get::<_, String>(0)?))
685    } else {
686        Ok(None)
687    }
688}
689
690fn record_last_indexed(
691    conn: &Connection,
692    project_hash: &str,
693    event_id: &str,
694) -> anyhow::Result<()> {
695    conn.execute(
696        "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
697         VALUES (?1, ?2, ?3)
698         ON CONFLICT(project_hash) DO UPDATE SET
699             last_indexed_event_id = excluded.last_indexed_event_id,
700             updated_at = excluded.updated_at",
701        rusqlite::params![
702            project_hash,
703            event_id,
704            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
705        ],
706    )?;
707    Ok(())
708}
709
710/// Read only the tail of the JSONL log since the last call. The cheap path
711/// for hot loops (every MCP tool invocation): scan to the marker, ingest
712/// the rest, update the marker.
713///
714/// Falls back to a full [`rebuild_state`] in two cases:
715/// - No marker yet for this project (first call after migration v002 or
716///   on a brand-new install).
717/// - The stored marker is not present in the JSONL (corrupted / truncated
718///   file). A `tracing::warn!` is emitted so the operator notices.
719pub fn ingest_new_events(
720    conn: &Connection,
721    jsonl_path: impl AsRef<Path>,
722    project_hash: &str,
723) -> anyhow::Result<usize> {
724    let marker = match last_indexed_event_id(conn, project_hash)? {
725        Some(id) => id,
726        None => return rebuild_state(conn, jsonl_path, project_hash),
727    };
728
729    let f = std::fs::File::open(&jsonl_path)
730        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
731    let reader = std::io::BufReader::new(f);
732
733    // First pass: confirm the marker still exists in the file. If it does
734    // not, the JSONL has been rewritten under us — we can't trust the
735    // marker, so we fall back to a full rebuild.
736    let tx = conn.unchecked_transaction()?;
737    let mut found_marker = false;
738    let mut count = 0;
739    let mut last_event_id: Option<String> = None;
740    for (i, line) in reader.lines().enumerate() {
741        let line = line.with_context(|| format!("read line {i}"))?;
742        if line.trim().is_empty() {
743            continue;
744        }
745        let event: Event = match serde_json::from_str(&line) {
746            Ok(e) => e,
747            Err(err) => {
748                tracing::warn!(
749                    line_number = i + 1,
750                    error = %err,
751                    "skipping malformed JSONL line in ingest_new_events"
752                );
753                continue;
754            }
755        };
756        if !found_marker {
757            if event.event_id == marker {
758                found_marker = true;
759            }
760            continue;
761        }
762        upsert_task_from_event(&tx, &event, project_hash)?;
763        index_event(&tx, &event)?;
764        last_event_id = Some(event.event_id.clone());
765        count += 1;
766    }
767
768    if !found_marker {
769        // Discard the (empty) tx and rebuild from scratch.
770        drop(tx);
771        tracing::warn!(
772            project_hash = project_hash,
773            marker = marker.as_str(),
774            "last_indexed_event_id not found in JSONL — falling back to full rebuild"
775        );
776        return rebuild_state(conn, jsonl_path, project_hash);
777    }
778
779    if let Some(eid) = last_event_id.as_deref() {
780        record_last_indexed(&tx, project_hash, eid)?;
781    }
782    tx.commit()?;
783    Ok(count)
784}
785
786pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
787    let type_str = serde_json::to_value(event.event_type)?
788        .as_str()
789        .unwrap()
790        .to_string();
791    let status_str = serde_json::to_value(event.status)?
792        .as_str()
793        .unwrap()
794        .to_string();
795    // v0.5.0 Phase B: scrape artifacts (commit hashes, PR URLs, ticket
796    // IDs, file paths, branch names) out of the event text. Storing
797    // per-event so reclassify can recompute without touching foreign
798    // events; pack aggregates and dedupes across events at render time.
799    let artifacts = crate::artifacts::extract(&event.text);
800    let artifacts_json = if artifacts.is_empty() {
801        None
802    } else {
803        Some(serde_json::to_string(&artifacts)?)
804    };
805    conn.execute(
806        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
807         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
808        rusqlite::params![
809            event.event_id, event.task_id, type_str,
810            event.timestamp, event.confidence, status_str, artifacts_json
811        ],
812    )?;
813    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
814    conn.execute(
815        "DELETE FROM search_fts WHERE event_id=?1",
816        rusqlite::params![event.event_id],
817    )?;
818    conn.execute(
819        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
820        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
821    )?;
822
823    if event.event_type == EventType::Decision {
824        // v0.12.0: project structured alternatives (meta.alternatives) into
825        // a dedicated column so pack can render "considered A/B/C, chose X".
826        // Stored as the verbatim JSON of the meta value; NULL when absent.
827        let alternatives_json = match event.meta.get("alternatives") {
828            Some(v) if !v.is_null() => Some(serde_json::to_string(v)?),
829            _ => None,
830        };
831        conn.execute(
832            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status, alternatives)
833             VALUES (?1, ?2, ?3, 'active', ?4)",
834            rusqlite::params![event.event_id, event.task_id, event.text, alternatives_json],
835        )?;
836    }
837
838    if event.event_type == EventType::Supersede {
839        if let Some(target) = &event.supersedes {
840            conn.execute(
841                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
842                rusqlite::params![event.event_id, target],
843            )?;
844        }
845    }
846
847    if event.event_type == EventType::Evidence {
848        let strength_str = event
849            .evidence_strength
850            .map(|s| {
851                serde_json::to_value(s)
852                    .unwrap()
853                    .as_str()
854                    .unwrap()
855                    .to_string()
856            })
857            .unwrap_or_else(|| "medium".into());
858        conn.execute(
859            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
860             VALUES (?1, ?2, ?3, ?4)",
861            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
862        )?;
863    }
864
865    // Invalidate any cached pack for this task — and its parent, whose
866    // Subtasks roll-up depends on this child.
867    invalidate_pack_cascade(conn, &event.task_id)?;
868
869    Ok(())
870}
871
872pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
873    if let Some(parent) = path.as_ref().parent() {
874        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
875    }
876    let conn =
877        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
878    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
879    apply_migrations(&conn).context("apply schema migrations")?;
880    Ok(conn)
881}
882
883/// One row of the task list rendered by the TUI: enough to render the
884/// list view without round-tripping for each task. `event_count` joins
885/// `events_index` so we don't need a second query per row.
886#[derive(Debug, Clone)]
887pub struct TaskRow {
888    pub task_id: String,
889    pub title: String,
890    pub status: String,
891    pub last_event_at: String,
892    pub event_count: usize,
893}
894
895/// All tasks for a project, ordered with open ones first (by recency)
896/// then closed ones. The TUI list view binds directly to this — there
897/// is no other consumer, so the shape is tuned for that callsite.
898pub fn list_tasks_by_project(
899    conn: &Connection,
900    project_hash: &str,
901) -> anyhow::Result<Vec<TaskRow>> {
902    let mut stmt = conn.prepare(
903        "SELECT t.task_id, t.title, t.status, t.last_event_at,
904                COALESCE(c.cnt, 0) AS event_count
905         FROM tasks t
906         LEFT JOIN (
907             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
908         ) c ON c.task_id = t.task_id
909         WHERE t.project_hash = ?1
910         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
911    )?;
912    let rows = stmt
913        .query_map(rusqlite::params![project_hash], |r| {
914            Ok(TaskRow {
915                task_id: r.get::<_, String>(0)?,
916                title: r.get::<_, String>(1)?,
917                status: r.get::<_, String>(2)?,
918                last_event_at: r.get::<_, String>(3)?,
919                event_count: r.get::<_, i64>(4)? as usize,
920            })
921        })?
922        .collect::<Result<Vec<_>, _>>()?;
923    Ok(rows)
924}
925
926/// Top-level tasks for a project (those with no parent), ordered like
927/// `list_tasks_by_project` — open first, then by recency. The roots of
928/// the `list --tree` view.
929pub fn top_level_tasks(conn: &Connection, project_hash: &str) -> anyhow::Result<Vec<TaskRow>> {
930    let mut stmt = conn.prepare(
931        "SELECT t.task_id, t.title, t.status, t.last_event_at,
932                COALESCE(c.cnt, 0) AS event_count
933         FROM tasks t
934         LEFT JOIN (
935             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
936         ) c ON c.task_id = t.task_id
937         WHERE t.project_hash = ?1 AND t.parent_id IS NULL
938         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
939    )?;
940    let rows = stmt
941        .query_map(rusqlite::params![project_hash], |r| {
942            Ok(TaskRow {
943                task_id: r.get::<_, String>(0)?,
944                title: r.get::<_, String>(1)?,
945                status: r.get::<_, String>(2)?,
946                last_event_at: r.get::<_, String>(3)?,
947                event_count: r.get::<_, i64>(4)? as usize,
948            })
949        })?
950        .collect::<Result<Vec<_>, _>>()?;
951    Ok(rows)
952}
953
954/// Direct children of a task (one level), newest activity first.
955pub fn children_of(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<TaskRow>> {
956    let mut stmt = conn.prepare(
957        "SELECT t.task_id, t.title, t.status, t.last_event_at,
958                COALESCE(c.cnt, 0) AS event_count
959         FROM tasks t
960         LEFT JOIN (
961             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
962         ) c ON c.task_id = t.task_id
963         WHERE t.parent_id = ?1
964         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
965    )?;
966    let rows = stmt
967        .query_map(rusqlite::params![task_id], |r| {
968            Ok(TaskRow {
969                task_id: r.get::<_, String>(0)?,
970                title: r.get::<_, String>(1)?,
971                status: r.get::<_, String>(2)?,
972                last_event_at: r.get::<_, String>(3)?,
973                event_count: r.get::<_, i64>(4)? as usize,
974            })
975        })?
976        .collect::<Result<Vec<_>, _>>()?;
977    Ok(rows)
978}
979
980/// The stored parent of a task, if any.
981pub fn parent_of(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
982    let mut stmt = conn.prepare("SELECT parent_id FROM tasks WHERE task_id = ?1")?;
983    let mut rows = stmt.query(rusqlite::params![task_id])?;
984    Ok(match rows.next()? {
985        Some(r) => r.get::<_, Option<String>>(0)?,
986        None => None,
987    })
988}
989
990/// True if setting `new_parent` as the parent of `task_id` would create a
991/// cycle (i.e. `new_parent` is `task_id` itself or a descendant of it).
992/// Walks ancestors of `new_parent`; a depth cap guards against pre-existing
993/// corrupt cycles.
994pub fn would_create_cycle(
995    conn: &Connection,
996    task_id: &str,
997    new_parent: &str,
998) -> anyhow::Result<bool> {
999    if task_id == new_parent {
1000        return Ok(true);
1001    }
1002    let mut cursor = Some(new_parent.to_string());
1003    for _ in 0..64 {
1004        let Some(cur) = cursor else {
1005            return Ok(false);
1006        };
1007        if cur == task_id {
1008            return Ok(true);
1009        }
1010        cursor = parent_of(conn, &cur)?;
1011    }
1012    // Depth cap exceeded — treat as a cycle to be safe.
1013    Ok(true)
1014}
1015
1016/// Number of direct children of `task_id` whose status is still open.
1017pub fn count_open_children(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
1018    let n: i64 = conn.query_row(
1019        "SELECT COUNT(*) FROM tasks WHERE parent_id = ?1 AND status = 'open'",
1020        rusqlite::params![task_id],
1021        |r| r.get(0),
1022    )?;
1023    Ok(n as usize)
1024}
1025
1026/// Clear the pack cache for a task and its parent (roll-up depends on both).
1027pub fn invalidate_pack_cascade(conn: &Connection, task_id: &str) -> anyhow::Result<()> {
1028    conn.execute(
1029        "DELETE FROM task_pack_cache WHERE task_id = ?1",
1030        rusqlite::params![task_id],
1031    )?;
1032    if let Some(parent) = parent_of(conn, task_id)? {
1033        conn.execute(
1034            "DELETE FROM task_pack_cache WHERE task_id = ?1",
1035            rusqlite::params![parent],
1036        )?;
1037    }
1038    Ok(())
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044    use tempfile::TempDir;
1045
1046    #[test]
1047    fn task_exists_returns_true_for_known_id_false_otherwise() {
1048        let d = TempDir::new().unwrap();
1049        let conn = open(d.path().join("s.sqlite")).unwrap();
1050
1051        assert!(!task_exists(&conn, "tj-nope").unwrap());
1052
1053        let e = make_open_event("tj-yes", "Hello");
1054        upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
1055        index_event(&conn, &e).unwrap();
1056
1057        assert!(task_exists(&conn, "tj-yes").unwrap());
1058        assert!(!task_exists(&conn, "tj-nope").unwrap());
1059    }
1060
1061    #[test]
1062    fn fresh_db_runs_all_migrations() {
1063        let d = TempDir::new().unwrap();
1064        let p = d.path().join("state.sqlite");
1065        let conn = open(&p).unwrap();
1066
1067        let applied: Vec<i64> = conn
1068            .prepare("SELECT version FROM schema_migrations ORDER BY version")
1069            .unwrap()
1070            .query_map([], |r| r.get::<_, i64>(0))
1071            .unwrap()
1072            .collect::<Result<_, _>>()
1073            .unwrap();
1074        assert_eq!(
1075            applied,
1076            (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
1077            "every declared migration must be recorded"
1078        );
1079    }
1080
1081    #[test]
1082    fn apply_migrations_is_idempotent_across_reopens() {
1083        let d = TempDir::new().unwrap();
1084        let p = d.path().join("state.sqlite");
1085        let _ = open(&p).unwrap();
1086        let _ = open(&p).unwrap();
1087
1088        let count: i64 = open(&p)
1089            .unwrap()
1090            .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
1091            .unwrap();
1092        assert_eq!(
1093            count,
1094            MIGRATIONS.len() as i64,
1095            "schema_migrations must contain exactly one row per declared migration after repeated opens"
1096        );
1097    }
1098
1099    #[test]
1100    fn open_creates_all_tables() {
1101        let d = TempDir::new().unwrap();
1102        let p = d.path().join("state.sqlite");
1103        let conn = open(&p).unwrap();
1104
1105        let names: Vec<String> = conn
1106            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
1107            .unwrap()
1108            .query_map([], |r| r.get::<_, String>(0))
1109            .unwrap()
1110            .collect::<Result<_, _>>()
1111            .unwrap();
1112
1113        for required in [
1114            "decisions",
1115            "events_index",
1116            "evidence",
1117            "task_pack_cache",
1118            "tasks",
1119            "search_fts",
1120        ] {
1121            assert!(
1122                names.iter().any(|n| n == required),
1123                "missing table {required}, have {names:?}"
1124            );
1125        }
1126    }
1127
1128    #[test]
1129    fn open_is_idempotent() {
1130        let d = TempDir::new().unwrap();
1131        let p = d.path().join("state.sqlite");
1132        let _ = open(&p).unwrap();
1133        let _ = open(&p).unwrap();
1134    }
1135
1136    #[test]
1137    fn index_event_projects_evidence() {
1138        let d = TempDir::new().unwrap();
1139        let conn = open(d.path().join("s.sqlite")).unwrap();
1140        let mut open_e = crate::event::Event::new(
1141            "tj-e",
1142            crate::event::EventType::Open,
1143            crate::event::Author::User,
1144            crate::event::Source::Cli,
1145            "x".into(),
1146        );
1147        open_e.meta = serde_json::json!({"title": "T"});
1148        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1149        index_event(&conn, &open_e).unwrap();
1150
1151        let mut ev = crate::event::Event::new(
1152            "tj-e",
1153            crate::event::EventType::Evidence,
1154            crate::event::Author::Agent,
1155            crate::event::Source::Chat,
1156            "Hook startup measured at 12ms".into(),
1157        );
1158        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
1159        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
1160        index_event(&conn, &ev).unwrap();
1161
1162        let (text, strength): (String, String) = conn
1163            .query_row(
1164                "SELECT text, strength FROM evidence WHERE task_id=?1",
1165                rusqlite::params!["tj-e"],
1166                |r| Ok((r.get(0)?, r.get(1)?)),
1167            )
1168            .unwrap();
1169        assert!(text.contains("12ms"));
1170        assert_eq!(strength, "strong");
1171    }
1172
1173    #[test]
1174    fn supersede_event_marks_decision_superseded() {
1175        let d = TempDir::new().unwrap();
1176        let conn = open(d.path().join("s.sqlite")).unwrap();
1177        let mut open_e = crate::event::Event::new(
1178            "tj-s",
1179            crate::event::EventType::Open,
1180            crate::event::Author::User,
1181            crate::event::Source::Cli,
1182            "x".into(),
1183        );
1184        open_e.meta = serde_json::json!({"title": "T"});
1185        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1186        index_event(&conn, &open_e).unwrap();
1187
1188        let dec = crate::event::Event::new(
1189            "tj-s",
1190            crate::event::EventType::Decision,
1191            crate::event::Author::Agent,
1192            crate::event::Source::Chat,
1193            "Use TS".into(),
1194        );
1195        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1196        index_event(&conn, &dec).unwrap();
1197
1198        let mut sup = crate::event::Event::new(
1199            "tj-s",
1200            crate::event::EventType::Supersede,
1201            crate::event::Author::Agent,
1202            crate::event::Source::Chat,
1203            "Replaced by Rust decision".into(),
1204        );
1205        sup.supersedes = Some(dec.event_id.clone());
1206        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1207        index_event(&conn, &sup).unwrap();
1208
1209        let (status, by): (String, Option<String>) = conn
1210            .query_row(
1211                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1212                rusqlite::params![dec.event_id],
1213                |r| Ok((r.get(0)?, r.get(1)?)),
1214            )
1215            .unwrap();
1216        assert_eq!(status, "superseded");
1217        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1218    }
1219
1220    #[test]
1221    fn index_event_projects_decision_to_decisions_table() {
1222        let d = TempDir::new().unwrap();
1223        let conn = open(d.path().join("s.sqlite")).unwrap();
1224
1225        let mut open_e = crate::event::Event::new(
1226            "tj-d",
1227            crate::event::EventType::Open,
1228            crate::event::Author::User,
1229            crate::event::Source::Cli,
1230            "x".into(),
1231        );
1232        open_e.meta = serde_json::json!({"title": "T"});
1233        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1234        index_event(&conn, &open_e).unwrap();
1235
1236        let dec = crate::event::Event::new(
1237            "tj-d",
1238            crate::event::EventType::Decision,
1239            crate::event::Author::Agent,
1240            crate::event::Source::Chat,
1241            "Adopt Rust".into(),
1242        );
1243        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1244        index_event(&conn, &dec).unwrap();
1245
1246        let (id, text, status): (String, String, String) = conn
1247            .query_row(
1248                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1249                rusqlite::params!["tj-d"],
1250                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1251            )
1252            .unwrap();
1253        assert_eq!(id, dec.event_id);
1254        assert_eq!(text, "Adopt Rust");
1255        assert_eq!(status, "active");
1256    }
1257
1258    #[test]
1259    fn index_event_projects_decision_alternatives_into_column() {
1260        let d = TempDir::new().unwrap();
1261        let conn = open(d.path().join("s.sqlite")).unwrap();
1262
1263        let mut dec = crate::event::Event::new(
1264            "tj-alt",
1265            crate::event::EventType::Decision,
1266            crate::event::Author::Agent,
1267            crate::event::Source::Chat,
1268            "Use SQLite".into(),
1269        );
1270        dec.meta = serde_json::json!({
1271            "alternatives": [
1272                {"option": "SQLite", "chosen": true, "rationale": "embedded, zero-ops"},
1273                {"option": "Postgres", "chosen": false, "rationale": "too heavy for local tool"}
1274            ]
1275        });
1276        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1277        index_event(&conn, &dec).unwrap();
1278
1279        let alts: Option<String> = conn
1280            .query_row(
1281                "SELECT alternatives FROM decisions WHERE decision_id=?1",
1282                rusqlite::params![dec.event_id],
1283                |r| r.get(0),
1284            )
1285            .unwrap();
1286        let alts = alts.expect("alternatives column should be populated");
1287        let parsed: serde_json::Value = serde_json::from_str(&alts).unwrap();
1288        assert_eq!(parsed.as_array().unwrap().len(), 2);
1289        assert_eq!(parsed[0]["option"], "SQLite");
1290        assert_eq!(parsed[0]["chosen"], true);
1291    }
1292
1293    #[test]
1294    fn index_event_decision_without_alternatives_leaves_column_null() {
1295        let d = TempDir::new().unwrap();
1296        let conn = open(d.path().join("s.sqlite")).unwrap();
1297
1298        let dec = crate::event::Event::new(
1299            "tj-noalt",
1300            crate::event::EventType::Decision,
1301            crate::event::Author::Agent,
1302            crate::event::Source::Chat,
1303            "Plain decision".into(),
1304        );
1305        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1306        index_event(&conn, &dec).unwrap();
1307
1308        let alts: Option<String> = conn
1309            .query_row(
1310                "SELECT alternatives FROM decisions WHERE decision_id=?1",
1311                rusqlite::params![dec.event_id],
1312                |r| r.get(0),
1313            )
1314            .unwrap();
1315        assert!(alts.is_none());
1316    }
1317
1318    #[test]
1319    fn index_event_is_idempotent_no_search_fts_duplicates() {
1320        let d = TempDir::new().unwrap();
1321        let conn = open(d.path().join("s.sqlite")).unwrap();
1322        let mut open_e = crate::event::Event::new(
1323            "tj-id",
1324            crate::event::EventType::Open,
1325            crate::event::Author::User,
1326            crate::event::Source::Cli,
1327            "x".into(),
1328        );
1329        open_e.meta = serde_json::json!({"title": "Idempotent"});
1330        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1331
1332        // Index three times — simulates rebuild_state replays.
1333        index_event(&conn, &open_e).unwrap();
1334        index_event(&conn, &open_e).unwrap();
1335        index_event(&conn, &open_e).unwrap();
1336
1337        let n: i64 = conn
1338            .query_row(
1339                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1340                rusqlite::params![open_e.event_id],
1341                |r| r.get(0),
1342            )
1343            .unwrap();
1344        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1345    }
1346
1347    #[test]
1348    fn list_all_projects_returns_hashes_from_state_dir() {
1349        use std::fs::File;
1350        let d = TempDir::new().unwrap();
1351        let state_dir = d.path().join("state");
1352        std::fs::create_dir_all(&state_dir).unwrap();
1353        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1354        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1355        File::create(state_dir.join("not-a-project.txt")).unwrap();
1356
1357        let mut hashes = list_all_projects(&state_dir).unwrap();
1358        hashes.sort();
1359        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1360    }
1361
1362    fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1363        use std::io::Write;
1364        writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1365    }
1366
1367    fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1368        let mut e = crate::event::Event::new(
1369            task_id,
1370            crate::event::EventType::Open,
1371            crate::event::Author::User,
1372            crate::event::Source::Cli,
1373            "x".into(),
1374        );
1375        e.meta = serde_json::json!({"title": title});
1376        e
1377    }
1378
1379    #[test]
1380    fn ingest_new_events_picks_up_only_new_lines() {
1381        let d = TempDir::new().unwrap();
1382        let jsonl = d.path().join("events.jsonl");
1383        let db = d.path().join("s.sqlite");
1384        let project = "deadbeefdeadbeef";
1385
1386        let e1 = make_open_event("tj-i1", "first");
1387        let e2 = make_open_event("tj-i2", "second");
1388        let e3 = make_open_event("tj-i3", "third");
1389
1390        let mut f = std::fs::File::create(&jsonl).unwrap();
1391        write_event_line(&mut f, &e1);
1392        write_event_line(&mut f, &e2);
1393        write_event_line(&mut f, &e3);
1394        drop(f);
1395
1396        // First pass — no marker yet, falls back to a full rebuild.
1397        let conn = open(&db).unwrap();
1398        let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1399        assert_eq!(n_first, 3);
1400
1401        // Append two more events.
1402        let e4 = make_open_event("tj-i4", "fourth");
1403        let e5 = make_open_event("tj-i5", "fifth");
1404        let mut f = std::fs::OpenOptions::new()
1405            .append(true)
1406            .open(&jsonl)
1407            .unwrap();
1408        write_event_line(&mut f, &e4);
1409        write_event_line(&mut f, &e5);
1410        drop(f);
1411
1412        // Second pass — marker = e3, only e4 + e5 must be processed.
1413        let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1414        assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1415
1416        let total: i64 = conn
1417            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1418            .unwrap();
1419        assert_eq!(total, 5);
1420
1421        let marker: String = conn
1422            .query_row(
1423                "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1424                rusqlite::params![project],
1425                |r| r.get(0),
1426            )
1427            .unwrap();
1428        assert_eq!(marker, e5.event_id);
1429    }
1430
1431    #[test]
1432    fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1433        let d = TempDir::new().unwrap();
1434        let jsonl = d.path().join("events.jsonl");
1435        let db = d.path().join("s.sqlite");
1436        let project = "feedfacefeedface";
1437
1438        let e1 = make_open_event("tj-r1", "first");
1439        let mut f = std::fs::File::create(&jsonl).unwrap();
1440        write_event_line(&mut f, &e1);
1441        drop(f);
1442
1443        let conn = open(&db).unwrap();
1444        ingest_new_events(&conn, &jsonl, project).unwrap();
1445
1446        // Replace the file entirely so the marker (e1.event_id) no longer
1447        // appears anywhere — simulates corruption / hand-edit.
1448        let e2 = make_open_event("tj-r2", "after-corruption");
1449        let e3 = make_open_event("tj-r3", "after-corruption-2");
1450        let mut f = std::fs::File::create(&jsonl).unwrap();
1451        write_event_line(&mut f, &e2);
1452        write_event_line(&mut f, &e3);
1453        drop(f);
1454
1455        let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1456        assert_eq!(n, 2, "missing marker must trigger full rebuild");
1457    }
1458
1459    #[test]
1460    fn rebuild_state_and_ingest_new_events_produce_same_state() {
1461        let d = TempDir::new().unwrap();
1462        let jsonl_a = d.path().join("a.jsonl");
1463        let jsonl_b = d.path().join("b.jsonl");
1464        let db_a = d.path().join("a.sqlite");
1465        let db_b = d.path().join("b.sqlite");
1466
1467        let events: Vec<_> = (0..5)
1468            .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1469            .collect();
1470        for path in [&jsonl_a, &jsonl_b] {
1471            let mut f = std::fs::File::create(path).unwrap();
1472            for e in &events {
1473                write_event_line(&mut f, e);
1474            }
1475        }
1476
1477        let conn_a = open(&db_a).unwrap();
1478        let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1479
1480        let conn_b = open(&db_b).unwrap();
1481        let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1482
1483        assert_eq!(n_a, n_b);
1484        assert_eq!(n_a, 5);
1485
1486        for table in ["tasks", "events_index"] {
1487            let q = format!("SELECT COUNT(*) FROM {table}");
1488            let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1489            let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1490            assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1491        }
1492    }
1493
1494    #[test]
1495    fn rebuild_state_skips_malformed_jsonl_lines() {
1496        use std::io::Write;
1497        let d = TempDir::new().unwrap();
1498        let events_path = d.path().join("events.jsonl");
1499        let db_path = d.path().join("s.sqlite");
1500
1501        let mut f = std::fs::File::create(&events_path).unwrap();
1502
1503        let mut e1 = crate::event::Event::new(
1504            "tj-skip",
1505            crate::event::EventType::Open,
1506            crate::event::Author::User,
1507            crate::event::Source::Cli,
1508            "x".into(),
1509        );
1510        e1.meta = serde_json::json!({"title": "Skip test"});
1511        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1512
1513        // Garbage that is not even JSON.
1514        writeln!(f, "this is not a json event line").unwrap();
1515
1516        // Valid JSON but not a valid Event (missing required fields).
1517        writeln!(f, "{{\"foo\": 1}}").unwrap();
1518
1519        let e3 = crate::event::Event::new(
1520            "tj-skip",
1521            crate::event::EventType::Decision,
1522            crate::event::Author::Agent,
1523            crate::event::Source::Chat,
1524            "Adopt Rust".into(),
1525        );
1526        writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1527        drop(f);
1528
1529        let conn = open(&db_path).unwrap();
1530        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1531            .expect("rebuild_state must succeed despite malformed lines");
1532        assert_eq!(
1533            n, 2,
1534            "expected 2 valid events indexed (2 malformed skipped)"
1535        );
1536
1537        let indexed: i64 = conn
1538            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1539            .unwrap();
1540        assert_eq!(indexed, 2);
1541    }
1542
1543    #[test]
1544    fn rebuild_state_reads_jsonl_and_populates_db() {
1545        use std::io::Write;
1546        let d = TempDir::new().unwrap();
1547        let events_path = d.path().join("events.jsonl");
1548        let db_path = d.path().join("s.sqlite");
1549
1550        let mut f = std::fs::File::create(&events_path).unwrap();
1551        let mut e1 = crate::event::Event::new(
1552            "tj-9",
1553            crate::event::EventType::Open,
1554            crate::event::Author::User,
1555            crate::event::Source::Cli,
1556            "x".into(),
1557        );
1558        e1.meta = serde_json::json!({"title": "Nine"});
1559        let e2 = crate::event::Event::new(
1560            "tj-9",
1561            crate::event::EventType::Decision,
1562            crate::event::Author::Agent,
1563            crate::event::Source::Chat,
1564            "Adopt Rust".into(),
1565        );
1566        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1567        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1568        drop(f);
1569
1570        let conn = open(&db_path).unwrap();
1571        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1572        assert_eq!(n, 2);
1573
1574        let n: i64 = conn
1575            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1576            .unwrap();
1577        assert_eq!(n, 1);
1578        let n: i64 = conn
1579            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1580            .unwrap();
1581        assert_eq!(n, 2);
1582    }
1583
1584    #[test]
1585    fn index_event_writes_index_and_fts() {
1586        let d = TempDir::new().unwrap();
1587        let conn = open(d.path().join("s.sqlite")).unwrap();
1588        let mut open_e = crate::event::Event::new(
1589            "tj-1",
1590            crate::event::EventType::Open,
1591            crate::event::Author::User,
1592            crate::event::Source::Cli,
1593            "Title".into(),
1594        );
1595        open_e.meta = serde_json::json!({"title": "Title"});
1596        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1597        index_event(&conn, &open_e).unwrap();
1598
1599        let mut decision = crate::event::Event::new(
1600            "tj-1",
1601            crate::event::EventType::Decision,
1602            crate::event::Author::Agent,
1603            crate::event::Source::Chat,
1604            "Adopt Rust".into(),
1605        );
1606        decision.confidence = Some(0.92);
1607        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1608        index_event(&conn, &decision).unwrap();
1609
1610        let count: i64 = conn
1611            .query_row(
1612                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1613                rusqlite::params!["tj-1"],
1614                |r| r.get(0),
1615            )
1616            .unwrap();
1617        assert_eq!(count, 2);
1618
1619        let mut stmt = conn
1620            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1621            .unwrap();
1622        let hits: Vec<String> = stmt
1623            .query_map(rusqlite::params!["Rust"], |r| {
1624                let s: String = r.get(0)?;
1625                Ok(s)
1626            })
1627            .unwrap()
1628            .collect::<Result<Vec<_>, _>>()
1629            .unwrap();
1630        assert_eq!(hits.len(), 1);
1631        assert_eq!(hits[0], decision.event_id);
1632    }
1633
1634    #[test]
1635    fn upsert_task_from_open_event_inserts_row() {
1636        let d = TempDir::new().unwrap();
1637        let conn = open(d.path().join("s.sqlite")).unwrap();
1638
1639        let mut e = crate::event::Event::new(
1640            "tj-7f3a",
1641            crate::event::EventType::Open,
1642            crate::event::Author::User,
1643            crate::event::Source::Cli,
1644            "Add OAuth".into(),
1645        );
1646        e.meta = serde_json::json!({ "title": "Add OAuth login" });
1647
1648        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1649
1650        let (id, title, status): (String, String, String) = conn
1651            .query_row(
1652                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1653                ["tj-7f3a"],
1654                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1655            )
1656            .unwrap();
1657
1658        assert_eq!(id, "tj-7f3a");
1659        assert_eq!(title, "Add OAuth login");
1660        assert_eq!(status, "open");
1661    }
1662
1663    #[test]
1664    fn migration_adds_parent_id_column_nullable() {
1665        let d = tempfile::TempDir::new().unwrap();
1666        let conn = open(d.path().join("s.sqlite")).unwrap();
1667
1668        // Seed a task via an open event (no parent).
1669        let e = make_open_event("tj-a", "Top");
1670        upsert_task_from_event(&conn, &e, "ph").unwrap();
1671
1672        let parent: Option<String> = conn
1673            .query_row(
1674                "SELECT parent_id FROM tasks WHERE task_id = ?1",
1675                rusqlite::params!["tj-a"],
1676                |r| r.get(0),
1677            )
1678            .unwrap();
1679        assert_eq!(parent, None);
1680    }
1681
1682    #[test]
1683    fn open_event_meta_parent_id_is_persisted() {
1684        let d = tempfile::TempDir::new().unwrap();
1685        let conn = open(d.path().join("s.sqlite")).unwrap();
1686
1687        // Parent first.
1688        upsert_task_from_event(&conn, &make_open_event("tj-parent", "Parent"), "ph").unwrap();
1689
1690        // Child carries meta.parent_id.
1691        let mut child = make_open_event("tj-child", "Child");
1692        child.meta = serde_json::json!({"title": "Child", "parent_id": "tj-parent"});
1693        upsert_task_from_event(&conn, &child, "ph").unwrap();
1694
1695        let parent: Option<String> = conn
1696            .query_row(
1697                "SELECT parent_id FROM tasks WHERE task_id = ?1",
1698                rusqlite::params!["tj-child"],
1699                |r| r.get(0),
1700            )
1701            .unwrap();
1702        assert_eq!(parent.as_deref(), Some("tj-parent"));
1703    }
1704
1705    #[test]
1706    fn children_of_and_parent_of_work() {
1707        let d = tempfile::TempDir::new().unwrap();
1708        let conn = open(d.path().join("s.sqlite")).unwrap();
1709        upsert_task_from_event(&conn, &make_open_event("p", "Parent"), "ph").unwrap();
1710
1711        let mut c1 = make_open_event("c1", "Child1");
1712        c1.meta = serde_json::json!({"title": "Child1", "parent_id": "p"});
1713        upsert_task_from_event(&conn, &c1, "ph").unwrap();
1714        let mut c2 = make_open_event("c2", "Child2");
1715        c2.meta = serde_json::json!({"title": "Child2", "parent_id": "p"});
1716        upsert_task_from_event(&conn, &c2, "ph").unwrap();
1717
1718        let kids = children_of(&conn, "p").unwrap();
1719        let ids: Vec<&str> = kids.iter().map(|t| t.task_id.as_str()).collect();
1720        assert!(ids.contains(&"c1") && ids.contains(&"c2"));
1721        assert_eq!(kids.len(), 2);
1722
1723        assert_eq!(parent_of(&conn, "c1").unwrap().as_deref(), Some("p"));
1724        assert_eq!(parent_of(&conn, "p").unwrap(), None);
1725    }
1726
1727    #[test]
1728    fn cycle_guard_rejects_self_and_ancestor() {
1729        let d = tempfile::TempDir::new().unwrap();
1730        let conn = open(d.path().join("s.sqlite")).unwrap();
1731        upsert_task_from_event(&conn, &make_open_event("a", "A"), "ph").unwrap();
1732        let mut b = make_open_event("b", "B");
1733        b.meta = serde_json::json!({"title": "B", "parent_id": "a"});
1734        upsert_task_from_event(&conn, &b, "ph").unwrap();
1735
1736        // a is b's ancestor → making a a child of b is a cycle.
1737        assert!(would_create_cycle(&conn, "a", "b").unwrap());
1738        // self-parent is a cycle.
1739        assert!(would_create_cycle(&conn, "a", "a").unwrap());
1740        // unrelated parent is fine.
1741        upsert_task_from_event(&conn, &make_open_event("x", "X"), "ph").unwrap();
1742        assert!(!would_create_cycle(&conn, "x", "a").unwrap());
1743    }
1744
1745    #[test]
1746    fn invalidate_cascade_clears_parent_pack() {
1747        let d = tempfile::TempDir::new().unwrap();
1748        let conn = open(d.path().join("s.sqlite")).unwrap();
1749        upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
1750        let mut c = make_open_event("c", "C");
1751        c.meta = serde_json::json!({"title": "C", "parent_id": "p"});
1752        upsert_task_from_event(&conn, &c, "ph").unwrap();
1753
1754        // Seed pack cache rows for both.
1755        for id in ["p", "c"] {
1756            conn.execute(
1757                "INSERT INTO task_pack_cache(task_id, mode, text, generated_at, source_event_count)
1758                 VALUES (?1, 'compact', 'x', '2026-01-01T00:00:00Z', 1)",
1759                rusqlite::params![id],
1760            )
1761            .unwrap();
1762        }
1763
1764        invalidate_pack_cascade(&conn, "c").unwrap();
1765
1766        let remaining: i64 = conn
1767            .query_row("SELECT COUNT(*) FROM task_pack_cache", [], |r| r.get(0))
1768            .unwrap();
1769        assert_eq!(remaining, 0, "both child and parent pack caches cleared");
1770    }
1771
1772    #[test]
1773    fn count_open_children_counts_only_open() {
1774        let d = tempfile::TempDir::new().unwrap();
1775        let conn = open(d.path().join("s.sqlite")).unwrap();
1776        upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
1777        let mut c1 = make_open_event("c1", "C1");
1778        c1.meta = serde_json::json!({"title": "C1", "parent_id": "p"});
1779        upsert_task_from_event(&conn, &c1, "ph").unwrap();
1780        // Close c1.
1781        let mut close = crate::event::Event::new(
1782            "c1",
1783            crate::event::EventType::Close,
1784            crate::event::Author::User,
1785            crate::event::Source::Cli,
1786            "done".into(),
1787        );
1788        close.timestamp = "2026-01-02T00:00:00Z".into();
1789        upsert_task_from_event(&conn, &close, "ph").unwrap();
1790        let mut c2 = make_open_event("c2", "C2");
1791        c2.meta = serde_json::json!({"title": "C2", "parent_id": "p"});
1792        upsert_task_from_event(&conn, &c2, "ph").unwrap();
1793
1794        assert_eq!(count_open_children(&conn, "p").unwrap(), 1); // only c2
1795    }
1796}