Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6/// One forward-only schema migration. Migrations are applied in `version`
7/// order; each is recorded in `schema_migrations` so re-running `open()`
8/// is idempotent.
9struct Migration {
10    version: i64,
11    sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16  task_id        TEXT PRIMARY KEY,
17  title          TEXT NOT NULL,
18  status         TEXT NOT NULL,
19  project_hash   TEXT NOT NULL,
20  opened_at      TEXT NOT NULL,
21  closed_at      TEXT,
22  last_event_at  TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27  event_id    TEXT PRIMARY KEY,
28  task_id     TEXT NOT NULL,
29  type        TEXT NOT NULL,
30  timestamp   TEXT NOT NULL,
31  confidence  REAL,
32  status      TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37  decision_id    TEXT PRIMARY KEY,
38  task_id        TEXT NOT NULL,
39  text           TEXT NOT NULL,
40  status         TEXT NOT NULL,
41  superseded_by  TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45  evidence_id           TEXT PRIMARY KEY,
46  task_id               TEXT NOT NULL,
47  text                  TEXT NOT NULL,
48  strength              TEXT NOT NULL,
49  refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53  task_id             TEXT NOT NULL,
54  mode                TEXT NOT NULL,
55  text                TEXT NOT NULL,
56  generated_at        TEXT NOT NULL,
57  source_event_count  INTEGER NOT NULL,
58  PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62  task_id UNINDEXED,
63  event_id UNINDEXED,
64  text,
65  type
66);
67"#;
68
69/// Tracks how far we've ingested the JSONL log per project so subsequent
70/// `ingest_new_events` calls can read only the tail rather than rescanning
71/// the entire file. `last_indexed_event_id` is the `event_id` of the most
72/// recent event written to `events_index`.
73const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75  project_hash          TEXT PRIMARY KEY,
76  last_indexed_event_id TEXT NOT NULL,
77  updated_at            TEXT NOT NULL
78);
79"#;
80
81/// v0.4.0 task-as-goal redesign: explicit goal/outcome on tasks +
82/// typed artifacts on events. NULLable so existing rows survive
83/// without backfill. Wipes the pack cache so old packs (rendered
84/// without Goal/Outcome blocks) regenerate on next view.
85const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal        TEXT;
87ALTER TABLE tasks ADD COLUMN outcome     TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external    TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94// v0.5.0 Phase B — artifacts auto-extract on ingest. The column was
95// added in v003 but stayed NULL for everyone; v004 just wipes the
96// pack cache so newly-extracted artifacts surface in the next pack
97// render. Existing events stay NULL until `reclassify` (Phase B+) or
98// `rebuild-state` is run.
99const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103/// v0.12.0 dream Pass A — per-project watermark of the last successful
104/// dream run. Sessions modified after this are in scope for the next run.
105const MIGRATION_005: &str = r#"
106CREATE TABLE IF NOT EXISTS dream_state (
107  project_hash    TEXT PRIMARY KEY,
108  last_dream_at   TEXT NOT NULL,
109  updated_at      TEXT NOT NULL
110);
111"#;
112
113/// v0.12.0 subtask hierarchy — nullable `parent_id` carries the parent
114/// task on the `open` event's `meta.parent_id`. Existing flat tasks stay
115/// NULL. Index supports `children_of` lookups.
116const MIGRATION_006: &str = r#"
117ALTER TABLE tasks ADD COLUMN parent_id TEXT;
118CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_id);
119"#;
120
121/// v0.12.0 structured decision alternatives — nullable `alternatives`
122/// carries the JSON array from a decision event's `meta.alternatives`
123/// (objects like `{option, chosen, rationale}`). Existing decisions stay
124/// NULL; the append-only log is untouched. Wipes the pack cache so packs
125/// re-render with the alternatives block once events carry it.
126const MIGRATION_007: &str = r#"
127ALTER TABLE decisions ADD COLUMN alternatives TEXT;
128DELETE FROM task_pack_cache;
129"#;
130
131/// v0.15.0 semantic-memory substrate (Pillar A). `embeddings` stores one
132/// vector per event as a little-endian f32 BLOB, tagged with the model id +
133/// dim so we never compare across models and can re-embed on a model change.
134/// `memory_tier` is denormalised onto `events_index` for cheap tier filtering
135/// (episodic by default; semantic/procedural/preference added in Phase 3).
136/// Purely additive — existing rows default to `episodic`, the append-only log
137/// is untouched, and an absent embedder simply leaves `embeddings` empty.
138const MIGRATION_008: &str = r#"
139CREATE TABLE IF NOT EXISTS embeddings (
140  event_id     TEXT PRIMARY KEY,
141  task_id      TEXT NOT NULL,
142  project_hash TEXT NOT NULL,
143  tier         TEXT NOT NULL DEFAULT 'episodic',
144  model        TEXT NOT NULL,
145  dim          INTEGER NOT NULL,
146  vec          BLOB NOT NULL,
147  created_at   TEXT NOT NULL
148);
149CREATE INDEX IF NOT EXISTS idx_emb_project_tier ON embeddings(project_hash, tier);
150ALTER TABLE events_index ADD COLUMN memory_tier TEXT NOT NULL DEFAULT 'episodic';
151"#;
152
153/// All schema migrations in version order. Append new entries here; never
154/// edit a published migration's `sql` — write a new one instead.
155const MIGRATIONS: &[Migration] = &[
156    Migration {
157        version: 1,
158        sql: MIGRATION_001,
159    },
160    Migration {
161        version: 2,
162        sql: MIGRATION_002,
163    },
164    Migration {
165        version: 3,
166        sql: MIGRATION_003,
167    },
168    Migration {
169        version: 4,
170        sql: MIGRATION_004,
171    },
172    Migration {
173        version: 5,
174        sql: MIGRATION_005,
175    },
176    Migration {
177        version: 6,
178        sql: MIGRATION_006,
179    },
180    Migration {
181        version: 7,
182        sql: MIGRATION_007,
183    },
184    Migration {
185        version: 8,
186        sql: MIGRATION_008,
187    },
188];
189
190fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
191    conn.execute_batch(
192        "CREATE TABLE IF NOT EXISTS schema_migrations (
193            version    INTEGER PRIMARY KEY,
194            applied_at TEXT NOT NULL
195        )",
196    )
197    .context("create schema_migrations table")?;
198
199    let applied: HashSet<i64> = {
200        let mut stmt = conn
201            .prepare("SELECT version FROM schema_migrations")
202            .context("select applied versions")?;
203        let rows = stmt
204            .query_map([], |r| r.get::<_, i64>(0))
205            .context("iterate schema_migrations")?;
206        rows.collect::<rusqlite::Result<HashSet<_>>>()
207            .context("collect applied versions")?
208    };
209
210    for migration in MIGRATIONS {
211        if applied.contains(&migration.version) {
212            continue;
213        }
214        conn.execute_batch(migration.sql)
215            .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
216        conn.execute(
217            "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
218            rusqlite::params![
219                migration.version,
220                chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
221            ],
222        )
223        .with_context(|| {
224            format!(
225                "record schema migration v{:03} as applied",
226                migration.version
227            )
228        })?;
229    }
230    Ok(())
231}
232
233use crate::event::{Event, EventType};
234
235pub fn upsert_task_from_event(
236    conn: &Connection,
237    event: &Event,
238    project_hash: &str,
239) -> anyhow::Result<()> {
240    match event.event_type {
241        EventType::Open => {
242            let title = event
243                .meta
244                .get("title")
245                .and_then(|v| v.as_str())
246                .unwrap_or(&event.text)
247                .to_string();
248            let parent_id = event
249                .meta
250                .get("parent_id")
251                .and_then(|v| v.as_str())
252                .map(str::to_string);
253            // ON CONFLICT intentionally does not overwrite parent_id — parent
254            // is set once at creation; re-parenting is a separate future path.
255            conn.execute(
256                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at, parent_id)
257                 VALUES (?1, ?2, 'open', ?3, ?4, ?4, ?5)
258                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
259                rusqlite::params![event.task_id, title, project_hash, event.timestamp, parent_id],
260            )?;
261        }
262        EventType::Close => {
263            conn.execute(
264                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
265                rusqlite::params![event.task_id, event.timestamp],
266            )?;
267        }
268        EventType::Reopen => {
269            conn.execute(
270                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
271                rusqlite::params![event.task_id, event.timestamp],
272            )?;
273        }
274        _ => {
275            conn.execute(
276                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
277                rusqlite::params![event.task_id, event.timestamp],
278            )?;
279        }
280    }
281    Ok(())
282}
283
284use std::io::BufRead;
285
286pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
287    let dir = state_dir.as_ref();
288    if !dir.exists() {
289        return Ok(vec![]);
290    }
291    let mut out = Vec::new();
292    for entry in std::fs::read_dir(dir)? {
293        let entry = entry?;
294        let path = entry.path();
295        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
296            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
297                out.push(stem.to_string());
298            }
299        }
300    }
301    Ok(out)
302}
303
304pub fn rebuild_state(
305    conn: &Connection,
306    jsonl_path: impl AsRef<Path>,
307    project_hash: &str,
308) -> anyhow::Result<usize> {
309    let f = std::fs::File::open(&jsonl_path)
310        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
311    let reader = std::io::BufReader::new(f);
312
313    let tx = conn.unchecked_transaction()?;
314    let mut count = 0;
315    let mut last_event_id: Option<String> = None;
316    for (i, line) in reader.lines().enumerate() {
317        let line = line.with_context(|| format!("read line {i}"))?;
318        if line.trim().is_empty() {
319            continue;
320        }
321        // Malformed JSONL lines are skipped with a warning so that one bad
322        // event cannot abort an otherwise-recoverable rebuild. SQL errors
323        // still propagate — those indicate schema/integrity problems.
324        let event: Event = match serde_json::from_str(&line) {
325            Ok(e) => e,
326            Err(err) => {
327                tracing::warn!(
328                    line_number = i + 1,
329                    error = %err,
330                    "skipping malformed JSONL line in rebuild_state"
331                );
332                continue;
333            }
334        };
335        upsert_task_from_event(&tx, &event, project_hash)?;
336        index_event(&tx, &event)?;
337        last_event_id = Some(event.event_id.clone());
338        count += 1;
339    }
340    if let Some(eid) = last_event_id.as_deref() {
341        record_last_indexed(&tx, project_hash, eid)?;
342    }
343    tx.commit()?;
344    Ok(count)
345}
346
347/// Returns whether a task with this id has been recorded in the derived
348/// state. Cheap O(1) lookup against the `tasks` primary key. Callers
349/// should run [`ingest_new_events`] first if they want to see the latest
350/// JSONL state.
351pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
352    let count: i64 = conn.query_row(
353        "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
354        rusqlite::params![task_id],
355        |r| r.get(0),
356    )?;
357    Ok(count > 0)
358}
359
360/// Status string for an existing task (e.g. "open", "closed"). Returns
361/// `None` when the task is unknown — caller decides whether that's a
362/// hard error or a route-to-pending case.
363pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
364    let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
365    let mut rows = stmt.query(rusqlite::params![task_id])?;
366    Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
367}
368
369/// Set or replace `tasks.goal` for an existing task. Caller is
370/// expected to have validated the task exists (via `task_exists`); we
371/// don't error on no-op rows so the upsert pattern is uniform.
372pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
373    conn.execute(
374        "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
375        rusqlite::params![goal, task_id],
376    )
377    .with_context(|| format!("set goal for {task_id}"))?;
378    // Pack cache is now stale for this task — drop the entry so the
379    // next render picks up the new goal.
380    conn.execute(
381        "DELETE FROM task_pack_cache WHERE task_id = ?1",
382        rusqlite::params![task_id],
383    )?;
384    Ok(())
385}
386
387/// Set or replace the closure metadata. Pass `None` for `outcome_tag`
388/// to leave it unset; pass `Some("done"|"abandoned"|"superseded")`
389/// for a structured tag. Free-text `outcome` is the primary field.
390pub fn set_task_outcome(
391    conn: &Connection,
392    task_id: &str,
393    outcome: &str,
394    outcome_tag: Option<&str>,
395) -> anyhow::Result<()> {
396    conn.execute(
397        "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
398        rusqlite::params![outcome, outcome_tag, task_id],
399    )
400    .with_context(|| format!("set outcome for {task_id}"))?;
401    conn.execute(
402        "DELETE FROM task_pack_cache WHERE task_id = ?1",
403        rusqlite::params![task_id],
404    )?;
405    Ok(())
406}
407
408/// Append an external reference to `tasks.external`. The column is
409/// stored as a comma-separated list — small, append-mostly, no
410/// uniqueness constraint. Acceptable shapes (loose, not enforced):
411/// `beads:claude-memory-rsw`, `github:#42`, `jira:PROJ-1234`.
412pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
413    let current: Option<String> = conn
414        .query_row(
415            "SELECT external FROM tasks WHERE task_id = ?1",
416            rusqlite::params![task_id],
417            |r| r.get::<_, Option<String>>(0),
418        )
419        .with_context(|| format!("read external for {task_id}"))?;
420    let next = match current {
421        Some(s) if !s.is_empty() => format!("{s},{reference}"),
422        _ => reference.to_string(),
423    };
424    conn.execute(
425        "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
426        rusqlite::params![next, task_id],
427    )?;
428    conn.execute(
429        "DELETE FROM task_pack_cache WHERE task_id = ?1",
430        rusqlite::params![task_id],
431    )?;
432    Ok(())
433}
434
435/// Read-only metadata bundle used by pack rendering (and TUI list
436/// teasers in v0.4.0+). Returns `None` for unknown tasks.
437#[derive(Debug, Clone, Default)]
438pub struct TaskMetadata {
439    pub goal: Option<String>,
440    pub outcome: Option<String>,
441    pub outcome_tag: Option<String>,
442    pub external: Option<String>,
443}
444
445pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
446    let mut stmt =
447        conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
448    let mut rows = stmt.query(rusqlite::params![task_id])?;
449    Ok(match rows.next()? {
450        Some(r) => Some(TaskMetadata {
451            goal: r.get::<_, Option<String>>(0)?,
452            outcome: r.get::<_, Option<String>>(1)?,
453            outcome_tag: r.get::<_, Option<String>>(2)?,
454            external: r.get::<_, Option<String>>(3)?,
455        }),
456        None => None,
457    })
458}
459
460/// One row of the stale-task report: an open task whose last event
461/// crossed the inactivity threshold.
462#[derive(Debug, Clone)]
463pub struct StaleTask {
464    pub task_id: String,
465    pub title: String,
466    pub last_event_at: String,
467    pub days_idle: i64,
468}
469
470/// Find open tasks with no event in the last `days` days. Sorted by
471/// idle time descending so the user sees the most ancient first.
472pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
473    let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
474    let cutoff_str = cutoff.to_rfc3339();
475    let mut stmt = conn.prepare(
476        "SELECT task_id, title, last_event_at FROM tasks
477         WHERE status = 'open' AND last_event_at < ?1
478         ORDER BY last_event_at ASC",
479    )?;
480    let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
481        Ok((
482            r.get::<_, String>(0)?,
483            r.get::<_, String>(1)?,
484            r.get::<_, String>(2)?,
485        ))
486    })?;
487    let now = chrono::Utc::now();
488    let mut out = Vec::new();
489    for row in rows {
490        let (task_id, title, last_at) = row?;
491        let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
492            .map(|d| d.with_timezone(&chrono::Utc))
493            .unwrap_or(now);
494        let days_idle = (now - dt).num_days();
495        out.push(StaleTask {
496            task_id,
497            title,
498            last_event_at: last_at,
499            days_idle,
500        });
501    }
502    Ok(out)
503}
504
505/// Score-weighted relationship between a fresh prompt's artifacts and
506/// every prior task's artifacts. Higher score = stronger continuation
507/// signal. Threshold tuning is the caller's job; v0.6.0 auto-link
508/// keeps anything with score > 0.0.
509#[derive(Debug, Clone)]
510pub struct RelatedTask {
511    pub task_id: String,
512    pub status: String,
513    pub score: f64,
514}
515
516/// Find tasks whose events overlap the given artifacts on any
517/// dimension we have a signal for. Weights:
518///   shared linked_issue → +1.0   (strongest, ticket id is unique)
519///   shared commit_hash  → +0.8   (commits are nearly unique)
520///   shared file path    → +0.3   (files churn across tasks)
521///
522/// The scan reads `events_index.artifacts` (JSON) directly with LIKE
523/// substring matches — JSON1 would be cleaner but keeps the codepath
524/// dependency-free. Returns top hits sorted by score desc; ties keep
525/// the most-recent task first.
526pub fn find_related_tasks(
527    conn: &Connection,
528    arts: &crate::artifacts::Artifacts,
529) -> anyhow::Result<Vec<RelatedTask>> {
530    use std::collections::HashMap;
531    if arts.is_empty() {
532        return Ok(Vec::new());
533    }
534    let mut scores: HashMap<String, f64> = HashMap::new();
535    let mut last_seen: HashMap<String, String> = HashMap::new();
536
537    let needles: Vec<(String, f64)> = arts
538        .linked_issues
539        .iter()
540        .map(|s| (s.clone(), 1.0))
541        .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
542        .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
543        .collect();
544
545    for (needle, weight) in needles {
546        let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
547        let mut stmt = conn.prepare(
548            "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
549             WHERE artifacts LIKE ?1
550             GROUP BY task_id
551             ORDER BY ts DESC",
552        )?;
553        let rows = stmt.query_map(rusqlite::params![pattern], |r| {
554            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
555        })?;
556        for row in rows {
557            let (id, ts) = row?;
558            *scores.entry(id.clone()).or_insert(0.0) += weight;
559            last_seen.insert(id, ts);
560        }
561    }
562
563    let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
564    for (id, score) in scores {
565        let status: Option<String> = conn
566            .query_row(
567                "SELECT status FROM tasks WHERE task_id = ?1",
568                rusqlite::params![&id],
569                |r| r.get(0),
570            )
571            .ok();
572        if let Some(status) = status {
573            out.push(RelatedTask {
574                task_id: id,
575                status,
576                score,
577            });
578        }
579    }
580    out.sort_by(|a, b| {
581        b.score
582            .partial_cmp(&a.score)
583            .unwrap_or(std::cmp::Ordering::Equal)
584            .then_with(|| {
585                let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
586                let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
587                ts_b.cmp(&ts_a)
588            })
589    });
590    Ok(out)
591}
592
593/// Find tasks (open or closed) whose events reference any of the given
594/// issue identifiers (FIN-868, JIRA-123, INC-7…). Looks at the
595/// per-event `artifacts.linked_issues` column populated on ingest.
596/// Returns `(task_id, status)` deduplicated, most-recent first. Used
597/// by the v0.5.0 Phase C auto-link flow to recognise that a fresh
598/// prompt is a continuation of a prior task.
599pub fn find_tasks_by_linked_issues(
600    conn: &Connection,
601    issues: &[String],
602) -> anyhow::Result<Vec<(String, String)>> {
603    if issues.is_empty() {
604        return Ok(Vec::new());
605    }
606    // Stage A: collect candidate task_ids whose events_index.artifacts
607    // contains any of the requested issue strings. JSON1 is overkill
608    // here — a substring LIKE on the raw JSON is correct given the
609    // ticket id format ("FIN-868") never appears outside its own
610    // linked_issues array.
611    let mut candidate_ids: Vec<String> = Vec::new();
612    for issue in issues {
613        let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
614        let mut stmt = conn.prepare(
615            "SELECT DISTINCT task_id FROM events_index
616             WHERE artifacts LIKE ?1
617             ORDER BY timestamp DESC",
618        )?;
619        let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
620        for r in rows {
621            let id = r?;
622            if !candidate_ids.contains(&id) {
623                candidate_ids.push(id);
624            }
625        }
626    }
627    // Stage B: hydrate status for each candidate.
628    let mut out = Vec::with_capacity(candidate_ids.len());
629    for id in candidate_ids {
630        let status: Option<String> = conn
631            .query_row(
632                "SELECT status FROM tasks WHERE task_id = ?1",
633                rusqlite::params![&id],
634                |r| r.get(0),
635            )
636            .ok();
637        if let Some(s) = status {
638            out.push((id, s));
639        }
640    }
641    Ok(out)
642}
643
644/// Re-run artifact extraction over every event of a task and write the
645/// result back to `events_index.artifacts`. Used to backfill events
646/// that were ingested before Phase B landed. Returns the number of
647/// events touched. Wipes the pack cache for the task so the next
648/// render reflects the freshly extracted artifacts.
649pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
650    let mut stmt = conn.prepare(
651        "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
652         LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
653         WHERE ei.task_id = ?1",
654    )?;
655    let rows: Vec<(String, String)> = stmt
656        .query_map(rusqlite::params![task_id], |r| {
657            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
658        })?
659        .collect::<Result<_, _>>()?;
660    let count = rows.len();
661    for (event_id, text) in rows {
662        let arts = crate::artifacts::extract(&text);
663        let json = if arts.is_empty() {
664            None
665        } else {
666            Some(serde_json::to_string(&arts)?)
667        };
668        conn.execute(
669            "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
670            rusqlite::params![json, event_id],
671        )?;
672    }
673    invalidate_pack_cascade(conn, task_id)?;
674    Ok(count)
675}
676
677/// Aggregate artifacts (commit hashes, PR URLs, ticket IDs, files,
678/// branches) across every event of a task, deduplicated. Reads the
679/// per-event JSON payload that `ingest_new_events` populated. Skips
680/// events whose `artifacts` column is NULL or unparseable rather than
681/// failing the pack render.
682pub fn task_artifacts(
683    conn: &Connection,
684    task_id: &str,
685) -> anyhow::Result<crate::artifacts::Artifacts> {
686    let mut stmt = conn.prepare(
687        "SELECT artifacts FROM events_index
688         WHERE task_id = ?1 AND artifacts IS NOT NULL
689         ORDER BY timestamp ASC",
690    )?;
691    let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
692    let mut acc = crate::artifacts::Artifacts::default();
693    for row in rows {
694        let json = row?;
695        if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
696            acc.merge(parsed);
697        }
698    }
699    Ok(acc)
700}
701
702/// Look up the most recent `event_id` we've ingested for this project.
703/// Returns `None` when the project has never been indexed (first call,
704/// or migration v002 just landed on an existing 0.1.x DB).
705fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
706    let mut stmt =
707        conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
708    let mut rows = stmt.query(rusqlite::params![project_hash])?;
709    if let Some(row) = rows.next()? {
710        Ok(Some(row.get::<_, String>(0)?))
711    } else {
712        Ok(None)
713    }
714}
715
716fn record_last_indexed(
717    conn: &Connection,
718    project_hash: &str,
719    event_id: &str,
720) -> anyhow::Result<()> {
721    conn.execute(
722        "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
723         VALUES (?1, ?2, ?3)
724         ON CONFLICT(project_hash) DO UPDATE SET
725             last_indexed_event_id = excluded.last_indexed_event_id,
726             updated_at = excluded.updated_at",
727        rusqlite::params![
728            project_hash,
729            event_id,
730            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
731        ],
732    )?;
733    Ok(())
734}
735
736/// Read only the tail of the JSONL log since the last call. The cheap path
737/// for hot loops (every MCP tool invocation): scan to the marker, ingest
738/// the rest, update the marker.
739///
740/// Falls back to a full [`rebuild_state`] in two cases:
741/// - No marker yet for this project (first call after migration v002 or
742///   on a brand-new install).
743/// - The stored marker is not present in the JSONL (corrupted / truncated
744///   file). A `tracing::warn!` is emitted so the operator notices.
745pub fn ingest_new_events(
746    conn: &Connection,
747    jsonl_path: impl AsRef<Path>,
748    project_hash: &str,
749) -> anyhow::Result<usize> {
750    let marker = match last_indexed_event_id(conn, project_hash)? {
751        Some(id) => id,
752        None => return rebuild_state(conn, jsonl_path, project_hash),
753    };
754
755    let f = std::fs::File::open(&jsonl_path)
756        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
757    let reader = std::io::BufReader::new(f);
758
759    // First pass: confirm the marker still exists in the file. If it does
760    // not, the JSONL has been rewritten under us — we can't trust the
761    // marker, so we fall back to a full rebuild.
762    let tx = conn.unchecked_transaction()?;
763    let mut found_marker = false;
764    let mut count = 0;
765    let mut last_event_id: Option<String> = None;
766    for (i, line) in reader.lines().enumerate() {
767        let line = line.with_context(|| format!("read line {i}"))?;
768        if line.trim().is_empty() {
769            continue;
770        }
771        let event: Event = match serde_json::from_str(&line) {
772            Ok(e) => e,
773            Err(err) => {
774                tracing::warn!(
775                    line_number = i + 1,
776                    error = %err,
777                    "skipping malformed JSONL line in ingest_new_events"
778                );
779                continue;
780            }
781        };
782        if !found_marker {
783            if event.event_id == marker {
784                found_marker = true;
785            }
786            continue;
787        }
788        upsert_task_from_event(&tx, &event, project_hash)?;
789        index_event(&tx, &event)?;
790        last_event_id = Some(event.event_id.clone());
791        count += 1;
792    }
793
794    if !found_marker {
795        // Discard the (empty) tx and rebuild from scratch.
796        drop(tx);
797        tracing::warn!(
798            project_hash = project_hash,
799            marker = marker.as_str(),
800            "last_indexed_event_id not found in JSONL — falling back to full rebuild"
801        );
802        return rebuild_state(conn, jsonl_path, project_hash);
803    }
804
805    if let Some(eid) = last_event_id.as_deref() {
806        record_last_indexed(&tx, project_hash, eid)?;
807    }
808    tx.commit()?;
809    Ok(count)
810}
811
812pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
813    let type_str = serde_json::to_value(event.event_type)?
814        .as_str()
815        .unwrap()
816        .to_string();
817    let status_str = serde_json::to_value(event.status)?
818        .as_str()
819        .unwrap()
820        .to_string();
821    // v0.5.0 Phase B: scrape artifacts (commit hashes, PR URLs, ticket
822    // IDs, file paths, branch names) out of the event text. Storing
823    // per-event so reclassify can recompute without touching foreign
824    // events; pack aggregates and dedupes across events at render time.
825    let artifacts = crate::artifacts::extract(&event.text);
826    let artifacts_json = if artifacts.is_empty() {
827        None
828    } else {
829        Some(serde_json::to_string(&artifacts)?)
830    };
831    conn.execute(
832        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
833         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
834        rusqlite::params![
835            event.event_id, event.task_id, type_str,
836            event.timestamp, event.confidence, status_str, artifacts_json
837        ],
838    )?;
839    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
840    conn.execute(
841        "DELETE FROM search_fts WHERE event_id=?1",
842        rusqlite::params![event.event_id],
843    )?;
844    conn.execute(
845        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
846        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
847    )?;
848
849    if event.event_type == EventType::Decision {
850        // v0.12.0: project structured alternatives (meta.alternatives) into
851        // a dedicated column so pack can render "considered A/B/C, chose X".
852        // Stored as the verbatim JSON of the meta value; NULL when absent.
853        let alternatives_json = match event.meta.get("alternatives") {
854            Some(v) if !v.is_null() => Some(serde_json::to_string(v)?),
855            _ => None,
856        };
857        conn.execute(
858            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status, alternatives)
859             VALUES (?1, ?2, ?3, 'active', ?4)",
860            rusqlite::params![event.event_id, event.task_id, event.text, alternatives_json],
861        )?;
862    }
863
864    if event.event_type == EventType::Supersede {
865        if let Some(target) = &event.supersedes {
866            conn.execute(
867                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
868                rusqlite::params![event.event_id, target],
869            )?;
870        }
871    }
872
873    if event.event_type == EventType::Evidence {
874        let strength_str = event
875            .evidence_strength
876            .map(|s| {
877                serde_json::to_value(s)
878                    .unwrap()
879                    .as_str()
880                    .unwrap()
881                    .to_string()
882            })
883            .unwrap_or_else(|| "medium".into());
884        conn.execute(
885            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
886             VALUES (?1, ?2, ?3, ?4)",
887            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
888        )?;
889    }
890
891    // Invalidate any cached pack for this task — and its parent, whose
892    // Subtasks roll-up depends on this child.
893    invalidate_pack_cascade(conn, &event.task_id)?;
894
895    Ok(())
896}
897
898pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
899    if let Some(parent) = path.as_ref().parent() {
900        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
901    }
902    let conn =
903        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
904    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
905    apply_migrations(&conn).context("apply schema migrations")?;
906    Ok(conn)
907}
908
909/// One row of the task list rendered by the TUI: enough to render the
910/// list view without round-tripping for each task. `event_count` joins
911/// `events_index` so we don't need a second query per row.
912#[derive(Debug, Clone)]
913pub struct TaskRow {
914    pub task_id: String,
915    pub title: String,
916    pub status: String,
917    pub last_event_at: String,
918    pub event_count: usize,
919}
920
921/// All tasks for a project, ordered with open ones first (by recency)
922/// then closed ones. The TUI list view binds directly to this — there
923/// is no other consumer, so the shape is tuned for that callsite.
924pub fn list_tasks_by_project(
925    conn: &Connection,
926    project_hash: &str,
927) -> anyhow::Result<Vec<TaskRow>> {
928    let mut stmt = conn.prepare(
929        "SELECT t.task_id, t.title, t.status, t.last_event_at,
930                COALESCE(c.cnt, 0) AS event_count
931         FROM tasks t
932         LEFT JOIN (
933             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
934         ) c ON c.task_id = t.task_id
935         WHERE t.project_hash = ?1
936         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
937    )?;
938    let rows = stmt
939        .query_map(rusqlite::params![project_hash], |r| {
940            Ok(TaskRow {
941                task_id: r.get::<_, String>(0)?,
942                title: r.get::<_, String>(1)?,
943                status: r.get::<_, String>(2)?,
944                last_event_at: r.get::<_, String>(3)?,
945                event_count: r.get::<_, i64>(4)? as usize,
946            })
947        })?
948        .collect::<Result<Vec<_>, _>>()?;
949    Ok(rows)
950}
951
952/// Top-level tasks for a project (those with no parent), ordered like
953/// `list_tasks_by_project` — open first, then by recency. The roots of
954/// the `list --tree` view.
955pub fn top_level_tasks(conn: &Connection, project_hash: &str) -> anyhow::Result<Vec<TaskRow>> {
956    let mut stmt = conn.prepare(
957        "SELECT t.task_id, t.title, t.status, t.last_event_at,
958                COALESCE(c.cnt, 0) AS event_count
959         FROM tasks t
960         LEFT JOIN (
961             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
962         ) c ON c.task_id = t.task_id
963         WHERE t.project_hash = ?1 AND t.parent_id IS NULL
964         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
965    )?;
966    let rows = stmt
967        .query_map(rusqlite::params![project_hash], |r| {
968            Ok(TaskRow {
969                task_id: r.get::<_, String>(0)?,
970                title: r.get::<_, String>(1)?,
971                status: r.get::<_, String>(2)?,
972                last_event_at: r.get::<_, String>(3)?,
973                event_count: r.get::<_, i64>(4)? as usize,
974            })
975        })?
976        .collect::<Result<Vec<_>, _>>()?;
977    Ok(rows)
978}
979
980/// Direct children of a task (one level), newest activity first.
981pub fn children_of(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<TaskRow>> {
982    let mut stmt = conn.prepare(
983        "SELECT t.task_id, t.title, t.status, t.last_event_at,
984                COALESCE(c.cnt, 0) AS event_count
985         FROM tasks t
986         LEFT JOIN (
987             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
988         ) c ON c.task_id = t.task_id
989         WHERE t.parent_id = ?1
990         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
991    )?;
992    let rows = stmt
993        .query_map(rusqlite::params![task_id], |r| {
994            Ok(TaskRow {
995                task_id: r.get::<_, String>(0)?,
996                title: r.get::<_, String>(1)?,
997                status: r.get::<_, String>(2)?,
998                last_event_at: r.get::<_, String>(3)?,
999                event_count: r.get::<_, i64>(4)? as usize,
1000            })
1001        })?
1002        .collect::<Result<Vec<_>, _>>()?;
1003    Ok(rows)
1004}
1005
1006/// The stored parent of a task, if any.
1007pub fn parent_of(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
1008    let mut stmt = conn.prepare("SELECT parent_id FROM tasks WHERE task_id = ?1")?;
1009    let mut rows = stmt.query(rusqlite::params![task_id])?;
1010    Ok(match rows.next()? {
1011        Some(r) => r.get::<_, Option<String>>(0)?,
1012        None => None,
1013    })
1014}
1015
1016/// True if setting `new_parent` as the parent of `task_id` would create a
1017/// cycle (i.e. `new_parent` is `task_id` itself or a descendant of it).
1018/// Walks ancestors of `new_parent`; a depth cap guards against pre-existing
1019/// corrupt cycles.
1020pub fn would_create_cycle(
1021    conn: &Connection,
1022    task_id: &str,
1023    new_parent: &str,
1024) -> anyhow::Result<bool> {
1025    if task_id == new_parent {
1026        return Ok(true);
1027    }
1028    let mut cursor = Some(new_parent.to_string());
1029    for _ in 0..64 {
1030        let Some(cur) = cursor else {
1031            return Ok(false);
1032        };
1033        if cur == task_id {
1034            return Ok(true);
1035        }
1036        cursor = parent_of(conn, &cur)?;
1037    }
1038    // Depth cap exceeded — treat as a cycle to be safe.
1039    Ok(true)
1040}
1041
1042/// Number of direct children of `task_id` whose status is still open.
1043pub fn count_open_children(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
1044    let n: i64 = conn.query_row(
1045        "SELECT COUNT(*) FROM tasks WHERE parent_id = ?1 AND status = 'open'",
1046        rusqlite::params![task_id],
1047        |r| r.get(0),
1048    )?;
1049    Ok(n as usize)
1050}
1051
1052/// Clear the pack cache for a task and its parent (roll-up depends on both).
1053pub fn invalidate_pack_cascade(conn: &Connection, task_id: &str) -> anyhow::Result<()> {
1054    conn.execute(
1055        "DELETE FROM task_pack_cache WHERE task_id = ?1",
1056        rusqlite::params![task_id],
1057    )?;
1058    if let Some(parent) = parent_of(conn, task_id)? {
1059        conn.execute(
1060            "DELETE FROM task_pack_cache WHERE task_id = ?1",
1061            rusqlite::params![parent],
1062        )?;
1063    }
1064    Ok(())
1065}
1066
1067// ---------------------------------------------------------------------------
1068// Semantic-memory substrate (Pillar A / schema v008).
1069// ---------------------------------------------------------------------------
1070
1071/// One event awaiting an embedding: its id, task, and the text to embed.
1072pub struct PendingEmbed {
1073    pub event_id: String,
1074    pub task_id: String,
1075    pub text: String,
1076}
1077
1078/// Events that have no up-to-date embedding for `model` — either never embedded
1079/// or embedded by a different model. Pulls the text straight from `search_fts`.
1080/// `limit` bounds the batch; pass a large value to drain.
1081pub fn events_needing_embedding(
1082    conn: &Connection,
1083    model: &str,
1084    limit: usize,
1085) -> anyhow::Result<Vec<PendingEmbed>> {
1086    let mut stmt = conn.prepare(
1087        "SELECT f.event_id, f.task_id, f.text
1088           FROM search_fts f
1089           LEFT JOIN embeddings e ON e.event_id = f.event_id AND e.model = ?1
1090          WHERE e.event_id IS NULL
1091          LIMIT ?2",
1092    )?;
1093    let rows = stmt.query_map(rusqlite::params![model, limit as i64], |r| {
1094        Ok(PendingEmbed {
1095            event_id: r.get(0)?,
1096            task_id: r.get(1)?,
1097            text: r.get(2)?,
1098        })
1099    })?;
1100    let mut out = Vec::new();
1101    for r in rows {
1102        out.push(r?);
1103    }
1104    Ok(out)
1105}
1106
1107/// Upsert one vector. Keyed on `event_id`, so re-embedding (e.g. after a model
1108/// change) replaces the prior row idempotently across `rebuild_state` replays.
1109#[allow(clippy::too_many_arguments)]
1110pub fn upsert_embedding(
1111    conn: &Connection,
1112    event_id: &str,
1113    task_id: &str,
1114    project_hash: &str,
1115    tier: &str,
1116    model: &str,
1117    dim: usize,
1118    vec: &[f32],
1119    created_at: &str,
1120) -> anyhow::Result<()> {
1121    conn.execute(
1122        "INSERT OR REPLACE INTO embeddings(event_id, task_id, project_hash, tier, model, dim, vec, created_at)
1123         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1124        rusqlite::params![
1125            event_id,
1126            task_id,
1127            project_hash,
1128            tier,
1129            model,
1130            dim as i64,
1131            crate::embed::to_blob(vec),
1132            created_at
1133        ],
1134    )?;
1135    Ok(())
1136}
1137
1138/// High-signal events (decisions, constraints, rejections) for consolidation —
1139/// `(event_id, text)`, newest first, capped at `limit`.
1140pub fn high_signal_events(
1141    conn: &Connection,
1142    limit: usize,
1143) -> anyhow::Result<Vec<(String, String)>> {
1144    let mut stmt = conn.prepare(
1145        "SELECT f.event_id, f.text
1146           FROM search_fts f
1147           JOIN events_index ei ON ei.event_id = f.event_id
1148          WHERE f.type IN ('decision', 'constraint', 'rejection')
1149          ORDER BY ei.timestamp DESC
1150          LIMIT ?1",
1151    )?;
1152    let rows = stmt.query_map(rusqlite::params![limit as i64], |r| {
1153        Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1154    })?;
1155    let mut out = Vec::new();
1156    for r in rows {
1157        out.push(r?);
1158    }
1159    Ok(out)
1160}
1161
1162/// First task whose title exactly matches `title`, if any — used to find the
1163/// reusable per-project consolidation task.
1164pub fn find_task_by_title(conn: &Connection, title: &str) -> anyhow::Result<Option<String>> {
1165    let mut stmt = conn.prepare("SELECT task_id FROM tasks WHERE title = ?1 LIMIT 1")?;
1166    let mut rows = stmt.query(rusqlite::params![title])?;
1167    match rows.next()? {
1168        Some(row) => Ok(Some(row.get(0)?)),
1169        None => Ok(None),
1170    }
1171}
1172
1173/// Texts of all events under a task (for de-duplicating consolidated facts).
1174pub fn task_event_texts(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<String>> {
1175    let mut stmt = conn.prepare("SELECT text FROM search_fts WHERE task_id = ?1")?;
1176    let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
1177    let mut out = Vec::new();
1178    for r in rows {
1179        out.push(r?);
1180    }
1181    Ok(out)
1182}
1183
1184/// Number of stored embeddings for a project (test/stats helper).
1185pub fn count_embeddings(conn: &Connection, project_hash: &str) -> anyhow::Result<usize> {
1186    let n: i64 = conn.query_row(
1187        "SELECT COUNT(*) FROM embeddings WHERE project_hash = ?1",
1188        rusqlite::params![project_hash],
1189        |r| r.get(0),
1190    )?;
1191    Ok(n as usize)
1192}
1193
1194/// Embed up to `limit` events that still need a vector for the embedder's model,
1195/// and store them. Returns how many were embedded this call. Shared by
1196/// embed-on-ingest (small batch after `ingest_new_events`) and
1197/// `embed --backfill` (looped until it returns 0). Every pending text gets a
1198/// vector — including short boilerplate — so nothing is re-scanned next pass;
1199/// retrieval-side filtering ([`crate::embed::is_embeddable`]) decides what's
1200/// worth surfacing.
1201pub fn embed_pending(
1202    conn: &Connection,
1203    project_hash: &str,
1204    embedder: &dyn crate::embed::Embedder,
1205    created_at: &str,
1206    limit: usize,
1207) -> anyhow::Result<usize> {
1208    let pending = events_needing_embedding(conn, embedder.model_id(), limit)?;
1209    if pending.is_empty() {
1210        return Ok(0);
1211    }
1212    let texts: Vec<&str> = pending.iter().map(|p| p.text.as_str()).collect();
1213    let vecs = embedder.embed(&texts)?;
1214    let mut done = 0usize;
1215    for (p, v) in pending.iter().zip(vecs.iter()) {
1216        upsert_embedding(
1217            conn,
1218            &p.event_id,
1219            &p.task_id,
1220            project_hash,
1221            "episodic",
1222            embedder.model_id(),
1223            embedder.dim(),
1224            v,
1225            created_at,
1226        )?;
1227        done += 1;
1228    }
1229    Ok(done)
1230}
1231
1232/// A retrieval hit: the event, its task, and the relevance score.
1233pub struct ScoredHit {
1234    pub event_id: String,
1235    pub task_id: String,
1236    pub task_title: String,
1237    pub event_type: String,
1238    pub tier: String,
1239    pub text: String,
1240    pub score: f32,
1241}
1242
1243/// Semantic search over a project's embeddings. Scores every stored vector for
1244/// `model` against `query_vec` by cosine, returns the top `k` by score. The
1245/// caller embeds the query with the same embedder so the model ids match.
1246/// Pure vector ranking for now; recency / tier / contradiction weighting layer
1247/// on top in later phases.
1248pub fn semantic_search(
1249    conn: &Connection,
1250    project_hash: &str,
1251    query_vec: &[f32],
1252    model: &str,
1253    k: usize,
1254) -> anyhow::Result<Vec<ScoredHit>> {
1255    let mut stmt = conn.prepare(
1256        "SELECT e.event_id, e.task_id, e.tier, e.vec, f.text, f.type,
1257                COALESCE(t.title, '')
1258           FROM embeddings e
1259           JOIN search_fts f ON f.event_id = e.event_id
1260           LEFT JOIN tasks t ON t.task_id = e.task_id
1261          WHERE e.project_hash = ?1 AND e.model = ?2",
1262    )?;
1263    let rows = stmt.query_map(rusqlite::params![project_hash, model], |r| {
1264        let blob: Vec<u8> = r.get(3)?;
1265        Ok((
1266            r.get::<_, String>(0)?, // event_id
1267            r.get::<_, String>(1)?, // task_id
1268            r.get::<_, String>(2)?, // tier
1269            blob,
1270            r.get::<_, String>(4)?, // text
1271            r.get::<_, String>(5)?, // type
1272            r.get::<_, String>(6)?, // title
1273        ))
1274    })?;
1275
1276    let mut hits: Vec<ScoredHit> = Vec::new();
1277    for row in rows {
1278        let (event_id, task_id, tier, blob, text, event_type, task_title) = row?;
1279        let score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
1280        hits.push(ScoredHit {
1281            event_id,
1282            task_id,
1283            task_title,
1284            event_type,
1285            tier,
1286            text,
1287            score,
1288        });
1289    }
1290    hits.sort_by(|a, b| {
1291        b.score
1292            .partial_cmp(&a.score)
1293            .unwrap_or(std::cmp::Ordering::Equal)
1294    });
1295    hits.truncate(k);
1296    Ok(hits)
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use super::*;
1302    use crate::embed::Embedder;
1303    use tempfile::TempDir;
1304
1305    #[test]
1306    fn task_exists_returns_true_for_known_id_false_otherwise() {
1307        let d = TempDir::new().unwrap();
1308        let conn = open(d.path().join("s.sqlite")).unwrap();
1309
1310        assert!(!task_exists(&conn, "tj-nope").unwrap());
1311
1312        let e = make_open_event("tj-yes", "Hello");
1313        upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
1314        index_event(&conn, &e).unwrap();
1315
1316        assert!(task_exists(&conn, "tj-yes").unwrap());
1317        assert!(!task_exists(&conn, "tj-nope").unwrap());
1318    }
1319
1320    #[test]
1321    fn fresh_db_runs_all_migrations() {
1322        let d = TempDir::new().unwrap();
1323        let p = d.path().join("state.sqlite");
1324        let conn = open(&p).unwrap();
1325
1326        let applied: Vec<i64> = conn
1327            .prepare("SELECT version FROM schema_migrations ORDER BY version")
1328            .unwrap()
1329            .query_map([], |r| r.get::<_, i64>(0))
1330            .unwrap()
1331            .collect::<Result<_, _>>()
1332            .unwrap();
1333        assert_eq!(
1334            applied,
1335            (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
1336            "every declared migration must be recorded"
1337        );
1338    }
1339
1340    #[test]
1341    fn apply_migrations_is_idempotent_across_reopens() {
1342        let d = TempDir::new().unwrap();
1343        let p = d.path().join("state.sqlite");
1344        let _ = open(&p).unwrap();
1345        let _ = open(&p).unwrap();
1346
1347        let count: i64 = open(&p)
1348            .unwrap()
1349            .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
1350            .unwrap();
1351        assert_eq!(
1352            count,
1353            MIGRATIONS.len() as i64,
1354            "schema_migrations must contain exactly one row per declared migration after repeated opens"
1355        );
1356    }
1357
1358    fn make_text_event(text: &str) -> crate::event::Event {
1359        crate::event::Event::new(
1360            "tj-x",
1361            crate::event::EventType::Finding,
1362            crate::event::Author::User,
1363            crate::event::Source::Cli,
1364            text.into(),
1365        )
1366    }
1367
1368    #[test]
1369    fn embed_pending_embeds_all_then_is_idempotent() {
1370        let d = TempDir::new().unwrap();
1371        let conn = open(d.path().join("s.sqlite")).unwrap();
1372        let ph = "feedfacefeedface";
1373
1374        for text in [
1375            "implement payment refund deduplication",
1376            "add validation for negative order amounts",
1377        ] {
1378            index_event(&conn, &make_text_event(text)).unwrap();
1379        }
1380
1381        let emb = crate::embed::HashEmbedder::new(64);
1382        let at = "2026-06-12T00:00:00Z";
1383
1384        let n = embed_pending(&conn, ph, &emb, at, 100).unwrap();
1385        assert_eq!(n, 2, "both events embedded on first pass");
1386        assert_eq!(count_embeddings(&conn, ph).unwrap(), 2);
1387
1388        // Idempotent: nothing left for this model on a second pass.
1389        assert_eq!(embed_pending(&conn, ph, &emb, at, 100).unwrap(), 0);
1390
1391        // Model-scoped: a different model id sees them as un-embedded
1392        // (so a model change triggers a re-embed).
1393        assert_eq!(
1394            events_needing_embedding(&conn, "other-model", 100)
1395                .unwrap()
1396                .len(),
1397            2
1398        );
1399    }
1400
1401    #[test]
1402    fn semantic_search_ranks_relevant_event_first() {
1403        let d = TempDir::new().unwrap();
1404        let conn = open(d.path().join("s.sqlite")).unwrap();
1405        let ph = "feedfacefeedface";
1406
1407        for text in [
1408            "fix duplicate payment refund write on partial refund",
1409            "update the frontend button hover color",
1410            "add a database index for faster user lookup",
1411        ] {
1412            index_event(&conn, &make_text_event(text)).unwrap();
1413        }
1414        let emb = crate::embed::HashEmbedder::new(256);
1415        embed_pending(&conn, ph, &emb, "t", 100).unwrap();
1416
1417        let q = emb.embed_one("payment refund duplicated").unwrap();
1418        let hits = semantic_search(&conn, ph, &q, emb.model_id(), 3).unwrap();
1419
1420        assert_eq!(hits.len(), 3);
1421        assert!(
1422            hits[0].text.contains("refund"),
1423            "the refund event must rank first, got: {}",
1424            hits[0].text
1425        );
1426        assert!(
1427            hits[0].score >= hits[1].score,
1428            "hits must be sorted by score desc"
1429        );
1430    }
1431
1432    #[test]
1433    fn open_creates_all_tables() {
1434        let d = TempDir::new().unwrap();
1435        let p = d.path().join("state.sqlite");
1436        let conn = open(&p).unwrap();
1437
1438        let names: Vec<String> = conn
1439            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
1440            .unwrap()
1441            .query_map([], |r| r.get::<_, String>(0))
1442            .unwrap()
1443            .collect::<Result<_, _>>()
1444            .unwrap();
1445
1446        for required in [
1447            "decisions",
1448            "events_index",
1449            "evidence",
1450            "task_pack_cache",
1451            "tasks",
1452            "search_fts",
1453        ] {
1454            assert!(
1455                names.iter().any(|n| n == required),
1456                "missing table {required}, have {names:?}"
1457            );
1458        }
1459    }
1460
1461    #[test]
1462    fn open_is_idempotent() {
1463        let d = TempDir::new().unwrap();
1464        let p = d.path().join("state.sqlite");
1465        let _ = open(&p).unwrap();
1466        let _ = open(&p).unwrap();
1467    }
1468
1469    #[test]
1470    fn index_event_projects_evidence() {
1471        let d = TempDir::new().unwrap();
1472        let conn = open(d.path().join("s.sqlite")).unwrap();
1473        let mut open_e = crate::event::Event::new(
1474            "tj-e",
1475            crate::event::EventType::Open,
1476            crate::event::Author::User,
1477            crate::event::Source::Cli,
1478            "x".into(),
1479        );
1480        open_e.meta = serde_json::json!({"title": "T"});
1481        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1482        index_event(&conn, &open_e).unwrap();
1483
1484        let mut ev = crate::event::Event::new(
1485            "tj-e",
1486            crate::event::EventType::Evidence,
1487            crate::event::Author::Agent,
1488            crate::event::Source::Chat,
1489            "Hook startup measured at 12ms".into(),
1490        );
1491        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
1492        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
1493        index_event(&conn, &ev).unwrap();
1494
1495        let (text, strength): (String, String) = conn
1496            .query_row(
1497                "SELECT text, strength FROM evidence WHERE task_id=?1",
1498                rusqlite::params!["tj-e"],
1499                |r| Ok((r.get(0)?, r.get(1)?)),
1500            )
1501            .unwrap();
1502        assert!(text.contains("12ms"));
1503        assert_eq!(strength, "strong");
1504    }
1505
1506    #[test]
1507    fn supersede_event_marks_decision_superseded() {
1508        let d = TempDir::new().unwrap();
1509        let conn = open(d.path().join("s.sqlite")).unwrap();
1510        let mut open_e = crate::event::Event::new(
1511            "tj-s",
1512            crate::event::EventType::Open,
1513            crate::event::Author::User,
1514            crate::event::Source::Cli,
1515            "x".into(),
1516        );
1517        open_e.meta = serde_json::json!({"title": "T"});
1518        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1519        index_event(&conn, &open_e).unwrap();
1520
1521        let dec = crate::event::Event::new(
1522            "tj-s",
1523            crate::event::EventType::Decision,
1524            crate::event::Author::Agent,
1525            crate::event::Source::Chat,
1526            "Use TS".into(),
1527        );
1528        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1529        index_event(&conn, &dec).unwrap();
1530
1531        let mut sup = crate::event::Event::new(
1532            "tj-s",
1533            crate::event::EventType::Supersede,
1534            crate::event::Author::Agent,
1535            crate::event::Source::Chat,
1536            "Replaced by Rust decision".into(),
1537        );
1538        sup.supersedes = Some(dec.event_id.clone());
1539        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1540        index_event(&conn, &sup).unwrap();
1541
1542        let (status, by): (String, Option<String>) = conn
1543            .query_row(
1544                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1545                rusqlite::params![dec.event_id],
1546                |r| Ok((r.get(0)?, r.get(1)?)),
1547            )
1548            .unwrap();
1549        assert_eq!(status, "superseded");
1550        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1551    }
1552
1553    #[test]
1554    fn index_event_projects_decision_to_decisions_table() {
1555        let d = TempDir::new().unwrap();
1556        let conn = open(d.path().join("s.sqlite")).unwrap();
1557
1558        let mut open_e = crate::event::Event::new(
1559            "tj-d",
1560            crate::event::EventType::Open,
1561            crate::event::Author::User,
1562            crate::event::Source::Cli,
1563            "x".into(),
1564        );
1565        open_e.meta = serde_json::json!({"title": "T"});
1566        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1567        index_event(&conn, &open_e).unwrap();
1568
1569        let dec = crate::event::Event::new(
1570            "tj-d",
1571            crate::event::EventType::Decision,
1572            crate::event::Author::Agent,
1573            crate::event::Source::Chat,
1574            "Adopt Rust".into(),
1575        );
1576        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1577        index_event(&conn, &dec).unwrap();
1578
1579        let (id, text, status): (String, String, String) = conn
1580            .query_row(
1581                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1582                rusqlite::params!["tj-d"],
1583                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1584            )
1585            .unwrap();
1586        assert_eq!(id, dec.event_id);
1587        assert_eq!(text, "Adopt Rust");
1588        assert_eq!(status, "active");
1589    }
1590
1591    #[test]
1592    fn index_event_projects_decision_alternatives_into_column() {
1593        let d = TempDir::new().unwrap();
1594        let conn = open(d.path().join("s.sqlite")).unwrap();
1595
1596        let mut dec = crate::event::Event::new(
1597            "tj-alt",
1598            crate::event::EventType::Decision,
1599            crate::event::Author::Agent,
1600            crate::event::Source::Chat,
1601            "Use SQLite".into(),
1602        );
1603        dec.meta = serde_json::json!({
1604            "alternatives": [
1605                {"option": "SQLite", "chosen": true, "rationale": "embedded, zero-ops"},
1606                {"option": "Postgres", "chosen": false, "rationale": "too heavy for local tool"}
1607            ]
1608        });
1609        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1610        index_event(&conn, &dec).unwrap();
1611
1612        let alts: Option<String> = conn
1613            .query_row(
1614                "SELECT alternatives FROM decisions WHERE decision_id=?1",
1615                rusqlite::params![dec.event_id],
1616                |r| r.get(0),
1617            )
1618            .unwrap();
1619        let alts = alts.expect("alternatives column should be populated");
1620        let parsed: serde_json::Value = serde_json::from_str(&alts).unwrap();
1621        assert_eq!(parsed.as_array().unwrap().len(), 2);
1622        assert_eq!(parsed[0]["option"], "SQLite");
1623        assert_eq!(parsed[0]["chosen"], true);
1624    }
1625
1626    #[test]
1627    fn index_event_decision_without_alternatives_leaves_column_null() {
1628        let d = TempDir::new().unwrap();
1629        let conn = open(d.path().join("s.sqlite")).unwrap();
1630
1631        let dec = crate::event::Event::new(
1632            "tj-noalt",
1633            crate::event::EventType::Decision,
1634            crate::event::Author::Agent,
1635            crate::event::Source::Chat,
1636            "Plain decision".into(),
1637        );
1638        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1639        index_event(&conn, &dec).unwrap();
1640
1641        let alts: Option<String> = conn
1642            .query_row(
1643                "SELECT alternatives FROM decisions WHERE decision_id=?1",
1644                rusqlite::params![dec.event_id],
1645                |r| r.get(0),
1646            )
1647            .unwrap();
1648        assert!(alts.is_none());
1649    }
1650
1651    #[test]
1652    fn index_event_is_idempotent_no_search_fts_duplicates() {
1653        let d = TempDir::new().unwrap();
1654        let conn = open(d.path().join("s.sqlite")).unwrap();
1655        let mut open_e = crate::event::Event::new(
1656            "tj-id",
1657            crate::event::EventType::Open,
1658            crate::event::Author::User,
1659            crate::event::Source::Cli,
1660            "x".into(),
1661        );
1662        open_e.meta = serde_json::json!({"title": "Idempotent"});
1663        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1664
1665        // Index three times — simulates rebuild_state replays.
1666        index_event(&conn, &open_e).unwrap();
1667        index_event(&conn, &open_e).unwrap();
1668        index_event(&conn, &open_e).unwrap();
1669
1670        let n: i64 = conn
1671            .query_row(
1672                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1673                rusqlite::params![open_e.event_id],
1674                |r| r.get(0),
1675            )
1676            .unwrap();
1677        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1678    }
1679
1680    #[test]
1681    fn list_all_projects_returns_hashes_from_state_dir() {
1682        use std::fs::File;
1683        let d = TempDir::new().unwrap();
1684        let state_dir = d.path().join("state");
1685        std::fs::create_dir_all(&state_dir).unwrap();
1686        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1687        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1688        File::create(state_dir.join("not-a-project.txt")).unwrap();
1689
1690        let mut hashes = list_all_projects(&state_dir).unwrap();
1691        hashes.sort();
1692        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1693    }
1694
1695    fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1696        use std::io::Write;
1697        writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1698    }
1699
1700    fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1701        let mut e = crate::event::Event::new(
1702            task_id,
1703            crate::event::EventType::Open,
1704            crate::event::Author::User,
1705            crate::event::Source::Cli,
1706            "x".into(),
1707        );
1708        e.meta = serde_json::json!({"title": title});
1709        e
1710    }
1711
1712    #[test]
1713    fn ingest_new_events_picks_up_only_new_lines() {
1714        let d = TempDir::new().unwrap();
1715        let jsonl = d.path().join("events.jsonl");
1716        let db = d.path().join("s.sqlite");
1717        let project = "deadbeefdeadbeef";
1718
1719        let e1 = make_open_event("tj-i1", "first");
1720        let e2 = make_open_event("tj-i2", "second");
1721        let e3 = make_open_event("tj-i3", "third");
1722
1723        let mut f = std::fs::File::create(&jsonl).unwrap();
1724        write_event_line(&mut f, &e1);
1725        write_event_line(&mut f, &e2);
1726        write_event_line(&mut f, &e3);
1727        drop(f);
1728
1729        // First pass — no marker yet, falls back to a full rebuild.
1730        let conn = open(&db).unwrap();
1731        let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1732        assert_eq!(n_first, 3);
1733
1734        // Append two more events.
1735        let e4 = make_open_event("tj-i4", "fourth");
1736        let e5 = make_open_event("tj-i5", "fifth");
1737        let mut f = std::fs::OpenOptions::new()
1738            .append(true)
1739            .open(&jsonl)
1740            .unwrap();
1741        write_event_line(&mut f, &e4);
1742        write_event_line(&mut f, &e5);
1743        drop(f);
1744
1745        // Second pass — marker = e3, only e4 + e5 must be processed.
1746        let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1747        assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1748
1749        let total: i64 = conn
1750            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1751            .unwrap();
1752        assert_eq!(total, 5);
1753
1754        let marker: String = conn
1755            .query_row(
1756                "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1757                rusqlite::params![project],
1758                |r| r.get(0),
1759            )
1760            .unwrap();
1761        assert_eq!(marker, e5.event_id);
1762    }
1763
1764    #[test]
1765    fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1766        let d = TempDir::new().unwrap();
1767        let jsonl = d.path().join("events.jsonl");
1768        let db = d.path().join("s.sqlite");
1769        let project = "feedfacefeedface";
1770
1771        let e1 = make_open_event("tj-r1", "first");
1772        let mut f = std::fs::File::create(&jsonl).unwrap();
1773        write_event_line(&mut f, &e1);
1774        drop(f);
1775
1776        let conn = open(&db).unwrap();
1777        ingest_new_events(&conn, &jsonl, project).unwrap();
1778
1779        // Replace the file entirely so the marker (e1.event_id) no longer
1780        // appears anywhere — simulates corruption / hand-edit.
1781        let e2 = make_open_event("tj-r2", "after-corruption");
1782        let e3 = make_open_event("tj-r3", "after-corruption-2");
1783        let mut f = std::fs::File::create(&jsonl).unwrap();
1784        write_event_line(&mut f, &e2);
1785        write_event_line(&mut f, &e3);
1786        drop(f);
1787
1788        let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1789        assert_eq!(n, 2, "missing marker must trigger full rebuild");
1790    }
1791
1792    #[test]
1793    fn rebuild_state_and_ingest_new_events_produce_same_state() {
1794        let d = TempDir::new().unwrap();
1795        let jsonl_a = d.path().join("a.jsonl");
1796        let jsonl_b = d.path().join("b.jsonl");
1797        let db_a = d.path().join("a.sqlite");
1798        let db_b = d.path().join("b.sqlite");
1799
1800        let events: Vec<_> = (0..5)
1801            .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1802            .collect();
1803        for path in [&jsonl_a, &jsonl_b] {
1804            let mut f = std::fs::File::create(path).unwrap();
1805            for e in &events {
1806                write_event_line(&mut f, e);
1807            }
1808        }
1809
1810        let conn_a = open(&db_a).unwrap();
1811        let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1812
1813        let conn_b = open(&db_b).unwrap();
1814        let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1815
1816        assert_eq!(n_a, n_b);
1817        assert_eq!(n_a, 5);
1818
1819        for table in ["tasks", "events_index"] {
1820            let q = format!("SELECT COUNT(*) FROM {table}");
1821            let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1822            let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1823            assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1824        }
1825    }
1826
1827    #[test]
1828    fn rebuild_state_skips_malformed_jsonl_lines() {
1829        use std::io::Write;
1830        let d = TempDir::new().unwrap();
1831        let events_path = d.path().join("events.jsonl");
1832        let db_path = d.path().join("s.sqlite");
1833
1834        let mut f = std::fs::File::create(&events_path).unwrap();
1835
1836        let mut e1 = crate::event::Event::new(
1837            "tj-skip",
1838            crate::event::EventType::Open,
1839            crate::event::Author::User,
1840            crate::event::Source::Cli,
1841            "x".into(),
1842        );
1843        e1.meta = serde_json::json!({"title": "Skip test"});
1844        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1845
1846        // Garbage that is not even JSON.
1847        writeln!(f, "this is not a json event line").unwrap();
1848
1849        // Valid JSON but not a valid Event (missing required fields).
1850        writeln!(f, "{{\"foo\": 1}}").unwrap();
1851
1852        let e3 = crate::event::Event::new(
1853            "tj-skip",
1854            crate::event::EventType::Decision,
1855            crate::event::Author::Agent,
1856            crate::event::Source::Chat,
1857            "Adopt Rust".into(),
1858        );
1859        writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1860        drop(f);
1861
1862        let conn = open(&db_path).unwrap();
1863        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1864            .expect("rebuild_state must succeed despite malformed lines");
1865        assert_eq!(
1866            n, 2,
1867            "expected 2 valid events indexed (2 malformed skipped)"
1868        );
1869
1870        let indexed: i64 = conn
1871            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1872            .unwrap();
1873        assert_eq!(indexed, 2);
1874    }
1875
1876    #[test]
1877    fn rebuild_state_reads_jsonl_and_populates_db() {
1878        use std::io::Write;
1879        let d = TempDir::new().unwrap();
1880        let events_path = d.path().join("events.jsonl");
1881        let db_path = d.path().join("s.sqlite");
1882
1883        let mut f = std::fs::File::create(&events_path).unwrap();
1884        let mut e1 = crate::event::Event::new(
1885            "tj-9",
1886            crate::event::EventType::Open,
1887            crate::event::Author::User,
1888            crate::event::Source::Cli,
1889            "x".into(),
1890        );
1891        e1.meta = serde_json::json!({"title": "Nine"});
1892        let e2 = crate::event::Event::new(
1893            "tj-9",
1894            crate::event::EventType::Decision,
1895            crate::event::Author::Agent,
1896            crate::event::Source::Chat,
1897            "Adopt Rust".into(),
1898        );
1899        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1900        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1901        drop(f);
1902
1903        let conn = open(&db_path).unwrap();
1904        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1905        assert_eq!(n, 2);
1906
1907        let n: i64 = conn
1908            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1909            .unwrap();
1910        assert_eq!(n, 1);
1911        let n: i64 = conn
1912            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1913            .unwrap();
1914        assert_eq!(n, 2);
1915    }
1916
1917    #[test]
1918    fn index_event_writes_index_and_fts() {
1919        let d = TempDir::new().unwrap();
1920        let conn = open(d.path().join("s.sqlite")).unwrap();
1921        let mut open_e = crate::event::Event::new(
1922            "tj-1",
1923            crate::event::EventType::Open,
1924            crate::event::Author::User,
1925            crate::event::Source::Cli,
1926            "Title".into(),
1927        );
1928        open_e.meta = serde_json::json!({"title": "Title"});
1929        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1930        index_event(&conn, &open_e).unwrap();
1931
1932        let mut decision = crate::event::Event::new(
1933            "tj-1",
1934            crate::event::EventType::Decision,
1935            crate::event::Author::Agent,
1936            crate::event::Source::Chat,
1937            "Adopt Rust".into(),
1938        );
1939        decision.confidence = Some(0.92);
1940        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1941        index_event(&conn, &decision).unwrap();
1942
1943        let count: i64 = conn
1944            .query_row(
1945                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1946                rusqlite::params!["tj-1"],
1947                |r| r.get(0),
1948            )
1949            .unwrap();
1950        assert_eq!(count, 2);
1951
1952        let mut stmt = conn
1953            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1954            .unwrap();
1955        let hits: Vec<String> = stmt
1956            .query_map(rusqlite::params!["Rust"], |r| {
1957                let s: String = r.get(0)?;
1958                Ok(s)
1959            })
1960            .unwrap()
1961            .collect::<Result<Vec<_>, _>>()
1962            .unwrap();
1963        assert_eq!(hits.len(), 1);
1964        assert_eq!(hits[0], decision.event_id);
1965    }
1966
1967    #[test]
1968    fn upsert_task_from_open_event_inserts_row() {
1969        let d = TempDir::new().unwrap();
1970        let conn = open(d.path().join("s.sqlite")).unwrap();
1971
1972        let mut e = crate::event::Event::new(
1973            "tj-7f3a",
1974            crate::event::EventType::Open,
1975            crate::event::Author::User,
1976            crate::event::Source::Cli,
1977            "Add OAuth".into(),
1978        );
1979        e.meta = serde_json::json!({ "title": "Add OAuth login" });
1980
1981        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1982
1983        let (id, title, status): (String, String, String) = conn
1984            .query_row(
1985                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1986                ["tj-7f3a"],
1987                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1988            )
1989            .unwrap();
1990
1991        assert_eq!(id, "tj-7f3a");
1992        assert_eq!(title, "Add OAuth login");
1993        assert_eq!(status, "open");
1994    }
1995
1996    #[test]
1997    fn migration_adds_parent_id_column_nullable() {
1998        let d = tempfile::TempDir::new().unwrap();
1999        let conn = open(d.path().join("s.sqlite")).unwrap();
2000
2001        // Seed a task via an open event (no parent).
2002        let e = make_open_event("tj-a", "Top");
2003        upsert_task_from_event(&conn, &e, "ph").unwrap();
2004
2005        let parent: Option<String> = conn
2006            .query_row(
2007                "SELECT parent_id FROM tasks WHERE task_id = ?1",
2008                rusqlite::params!["tj-a"],
2009                |r| r.get(0),
2010            )
2011            .unwrap();
2012        assert_eq!(parent, None);
2013    }
2014
2015    #[test]
2016    fn open_event_meta_parent_id_is_persisted() {
2017        let d = tempfile::TempDir::new().unwrap();
2018        let conn = open(d.path().join("s.sqlite")).unwrap();
2019
2020        // Parent first.
2021        upsert_task_from_event(&conn, &make_open_event("tj-parent", "Parent"), "ph").unwrap();
2022
2023        // Child carries meta.parent_id.
2024        let mut child = make_open_event("tj-child", "Child");
2025        child.meta = serde_json::json!({"title": "Child", "parent_id": "tj-parent"});
2026        upsert_task_from_event(&conn, &child, "ph").unwrap();
2027
2028        let parent: Option<String> = conn
2029            .query_row(
2030                "SELECT parent_id FROM tasks WHERE task_id = ?1",
2031                rusqlite::params!["tj-child"],
2032                |r| r.get(0),
2033            )
2034            .unwrap();
2035        assert_eq!(parent.as_deref(), Some("tj-parent"));
2036    }
2037
2038    #[test]
2039    fn children_of_and_parent_of_work() {
2040        let d = tempfile::TempDir::new().unwrap();
2041        let conn = open(d.path().join("s.sqlite")).unwrap();
2042        upsert_task_from_event(&conn, &make_open_event("p", "Parent"), "ph").unwrap();
2043
2044        let mut c1 = make_open_event("c1", "Child1");
2045        c1.meta = serde_json::json!({"title": "Child1", "parent_id": "p"});
2046        upsert_task_from_event(&conn, &c1, "ph").unwrap();
2047        let mut c2 = make_open_event("c2", "Child2");
2048        c2.meta = serde_json::json!({"title": "Child2", "parent_id": "p"});
2049        upsert_task_from_event(&conn, &c2, "ph").unwrap();
2050
2051        let kids = children_of(&conn, "p").unwrap();
2052        let ids: Vec<&str> = kids.iter().map(|t| t.task_id.as_str()).collect();
2053        assert!(ids.contains(&"c1") && ids.contains(&"c2"));
2054        assert_eq!(kids.len(), 2);
2055
2056        assert_eq!(parent_of(&conn, "c1").unwrap().as_deref(), Some("p"));
2057        assert_eq!(parent_of(&conn, "p").unwrap(), None);
2058    }
2059
2060    #[test]
2061    fn cycle_guard_rejects_self_and_ancestor() {
2062        let d = tempfile::TempDir::new().unwrap();
2063        let conn = open(d.path().join("s.sqlite")).unwrap();
2064        upsert_task_from_event(&conn, &make_open_event("a", "A"), "ph").unwrap();
2065        let mut b = make_open_event("b", "B");
2066        b.meta = serde_json::json!({"title": "B", "parent_id": "a"});
2067        upsert_task_from_event(&conn, &b, "ph").unwrap();
2068
2069        // a is b's ancestor → making a a child of b is a cycle.
2070        assert!(would_create_cycle(&conn, "a", "b").unwrap());
2071        // self-parent is a cycle.
2072        assert!(would_create_cycle(&conn, "a", "a").unwrap());
2073        // unrelated parent is fine.
2074        upsert_task_from_event(&conn, &make_open_event("x", "X"), "ph").unwrap();
2075        assert!(!would_create_cycle(&conn, "x", "a").unwrap());
2076    }
2077
2078    #[test]
2079    fn invalidate_cascade_clears_parent_pack() {
2080        let d = tempfile::TempDir::new().unwrap();
2081        let conn = open(d.path().join("s.sqlite")).unwrap();
2082        upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2083        let mut c = make_open_event("c", "C");
2084        c.meta = serde_json::json!({"title": "C", "parent_id": "p"});
2085        upsert_task_from_event(&conn, &c, "ph").unwrap();
2086
2087        // Seed pack cache rows for both.
2088        for id in ["p", "c"] {
2089            conn.execute(
2090                "INSERT INTO task_pack_cache(task_id, mode, text, generated_at, source_event_count)
2091                 VALUES (?1, 'compact', 'x', '2026-01-01T00:00:00Z', 1)",
2092                rusqlite::params![id],
2093            )
2094            .unwrap();
2095        }
2096
2097        invalidate_pack_cascade(&conn, "c").unwrap();
2098
2099        let remaining: i64 = conn
2100            .query_row("SELECT COUNT(*) FROM task_pack_cache", [], |r| r.get(0))
2101            .unwrap();
2102        assert_eq!(remaining, 0, "both child and parent pack caches cleared");
2103    }
2104
2105    #[test]
2106    fn count_open_children_counts_only_open() {
2107        let d = tempfile::TempDir::new().unwrap();
2108        let conn = open(d.path().join("s.sqlite")).unwrap();
2109        upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2110        let mut c1 = make_open_event("c1", "C1");
2111        c1.meta = serde_json::json!({"title": "C1", "parent_id": "p"});
2112        upsert_task_from_event(&conn, &c1, "ph").unwrap();
2113        // Close c1.
2114        let mut close = crate::event::Event::new(
2115            "c1",
2116            crate::event::EventType::Close,
2117            crate::event::Author::User,
2118            crate::event::Source::Cli,
2119            "done".into(),
2120        );
2121        close.timestamp = "2026-01-02T00:00:00Z".into();
2122        upsert_task_from_event(&conn, &close, "ph").unwrap();
2123        let mut c2 = make_open_event("c2", "C2");
2124        c2.meta = serde_json::json!({"title": "C2", "parent_id": "p"});
2125        upsert_task_from_event(&conn, &c2, "ph").unwrap();
2126
2127        assert_eq!(count_open_children(&conn, "p").unwrap(), 1); // only c2
2128    }
2129}