Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6/// One forward-only schema migration. Migrations are applied in `version`
7/// order; each is recorded in `schema_migrations` so re-running `open()`
8/// is idempotent.
9struct Migration {
10    version: i64,
11    sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16  task_id        TEXT PRIMARY KEY,
17  title          TEXT NOT NULL,
18  status         TEXT NOT NULL,
19  project_hash   TEXT NOT NULL,
20  opened_at      TEXT NOT NULL,
21  closed_at      TEXT,
22  last_event_at  TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27  event_id    TEXT PRIMARY KEY,
28  task_id     TEXT NOT NULL,
29  type        TEXT NOT NULL,
30  timestamp   TEXT NOT NULL,
31  confidence  REAL,
32  status      TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37  decision_id    TEXT PRIMARY KEY,
38  task_id        TEXT NOT NULL,
39  text           TEXT NOT NULL,
40  status         TEXT NOT NULL,
41  superseded_by  TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45  evidence_id           TEXT PRIMARY KEY,
46  task_id               TEXT NOT NULL,
47  text                  TEXT NOT NULL,
48  strength              TEXT NOT NULL,
49  refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53  task_id             TEXT NOT NULL,
54  mode                TEXT NOT NULL,
55  text                TEXT NOT NULL,
56  generated_at        TEXT NOT NULL,
57  source_event_count  INTEGER NOT NULL,
58  PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62  task_id UNINDEXED,
63  event_id UNINDEXED,
64  text,
65  type
66);
67"#;
68
69/// Tracks how far we've ingested the JSONL log per project so subsequent
70/// `ingest_new_events` calls can read only the tail rather than rescanning
71/// the entire file. `last_indexed_event_id` is the `event_id` of the most
72/// recent event written to `events_index`.
73const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75  project_hash          TEXT PRIMARY KEY,
76  last_indexed_event_id TEXT NOT NULL,
77  updated_at            TEXT NOT NULL
78);
79"#;
80
81/// v0.4.0 task-as-goal redesign: explicit goal/outcome on tasks +
82/// typed artifacts on events. NULLable so existing rows survive
83/// without backfill. Wipes the pack cache so old packs (rendered
84/// without Goal/Outcome blocks) regenerate on next view.
85const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal        TEXT;
87ALTER TABLE tasks ADD COLUMN outcome     TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external    TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94// v0.5.0 Phase B — artifacts auto-extract on ingest. The column was
95// added in v003 but stayed NULL for everyone; v004 just wipes the
96// pack cache so newly-extracted artifacts surface in the next pack
97// render. Existing events stay NULL until `reclassify` (Phase B+) or
98// `rebuild-state` is run.
99const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103/// v0.12.0 dream Pass A — per-project watermark of the last successful
104/// dream run. Sessions modified after this are in scope for the next run.
105const MIGRATION_005: &str = r#"
106CREATE TABLE IF NOT EXISTS dream_state (
107  project_hash    TEXT PRIMARY KEY,
108  last_dream_at   TEXT NOT NULL,
109  updated_at      TEXT NOT NULL
110);
111"#;
112
113/// v0.12.0 subtask hierarchy — nullable `parent_id` carries the parent
114/// task on the `open` event's `meta.parent_id`. Existing flat tasks stay
115/// NULL. Index supports `children_of` lookups.
116const MIGRATION_006: &str = r#"
117ALTER TABLE tasks ADD COLUMN parent_id TEXT;
118CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_id);
119"#;
120
121/// v0.12.0 structured decision alternatives — nullable `alternatives`
122/// carries the JSON array from a decision event's `meta.alternatives`
123/// (objects like `{option, chosen, rationale}`). Existing decisions stay
124/// NULL; the append-only log is untouched. Wipes the pack cache so packs
125/// re-render with the alternatives block once events carry it.
126const MIGRATION_007: &str = r#"
127ALTER TABLE decisions ADD COLUMN alternatives TEXT;
128DELETE FROM task_pack_cache;
129"#;
130
131/// v0.15.0 semantic-memory substrate (Pillar A). `embeddings` stores one
132/// vector per event as a little-endian f32 BLOB, tagged with the model id +
133/// dim so we never compare across models and can re-embed on a model change.
134/// `memory_tier` is denormalised onto `events_index` for cheap tier filtering
135/// (episodic by default; semantic/procedural/preference added in Phase 3).
136/// Purely additive — existing rows default to `episodic`, the append-only log
137/// is untouched, and an absent embedder simply leaves `embeddings` empty.
138const MIGRATION_008: &str = r#"
139CREATE TABLE IF NOT EXISTS embeddings (
140  event_id     TEXT PRIMARY KEY,
141  task_id      TEXT NOT NULL,
142  project_hash TEXT NOT NULL,
143  tier         TEXT NOT NULL DEFAULT 'episodic',
144  model        TEXT NOT NULL,
145  dim          INTEGER NOT NULL,
146  vec          BLOB NOT NULL,
147  created_at   TEXT NOT NULL
148);
149CREATE INDEX IF NOT EXISTS idx_emb_project_tier ON embeddings(project_hash, tier);
150ALTER TABLE events_index ADD COLUMN memory_tier TEXT NOT NULL DEFAULT 'episodic';
151"#;
152
153/// All schema migrations in version order. Append new entries here; never
154/// edit a published migration's `sql` — write a new one instead.
155const MIGRATIONS: &[Migration] = &[
156    Migration {
157        version: 1,
158        sql: MIGRATION_001,
159    },
160    Migration {
161        version: 2,
162        sql: MIGRATION_002,
163    },
164    Migration {
165        version: 3,
166        sql: MIGRATION_003,
167    },
168    Migration {
169        version: 4,
170        sql: MIGRATION_004,
171    },
172    Migration {
173        version: 5,
174        sql: MIGRATION_005,
175    },
176    Migration {
177        version: 6,
178        sql: MIGRATION_006,
179    },
180    Migration {
181        version: 7,
182        sql: MIGRATION_007,
183    },
184    Migration {
185        version: 8,
186        sql: MIGRATION_008,
187    },
188];
189
190fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
191    conn.execute_batch(
192        "CREATE TABLE IF NOT EXISTS schema_migrations (
193            version    INTEGER PRIMARY KEY,
194            applied_at TEXT NOT NULL
195        )",
196    )
197    .context("create schema_migrations table")?;
198
199    let applied: HashSet<i64> = {
200        let mut stmt = conn
201            .prepare("SELECT version FROM schema_migrations")
202            .context("select applied versions")?;
203        let rows = stmt
204            .query_map([], |r| r.get::<_, i64>(0))
205            .context("iterate schema_migrations")?;
206        rows.collect::<rusqlite::Result<HashSet<_>>>()
207            .context("collect applied versions")?
208    };
209
210    for migration in MIGRATIONS {
211        if applied.contains(&migration.version) {
212            continue;
213        }
214        conn.execute_batch(migration.sql)
215            .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
216        conn.execute(
217            "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
218            rusqlite::params![
219                migration.version,
220                chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
221            ],
222        )
223        .with_context(|| {
224            format!(
225                "record schema migration v{:03} as applied",
226                migration.version
227            )
228        })?;
229    }
230    Ok(())
231}
232
233use crate::event::{Event, EventType};
234
235pub fn upsert_task_from_event(
236    conn: &Connection,
237    event: &Event,
238    project_hash: &str,
239) -> anyhow::Result<()> {
240    match event.event_type {
241        EventType::Open => {
242            let title = event
243                .meta
244                .get("title")
245                .and_then(|v| v.as_str())
246                .unwrap_or(&event.text)
247                .to_string();
248            let parent_id = event
249                .meta
250                .get("parent_id")
251                .and_then(|v| v.as_str())
252                .map(str::to_string);
253            // ON CONFLICT intentionally does not overwrite parent_id — parent
254            // is set once at creation; re-parenting is a separate future path.
255            conn.execute(
256                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at, parent_id)
257                 VALUES (?1, ?2, 'open', ?3, ?4, ?4, ?5)
258                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
259                rusqlite::params![event.task_id, title, project_hash, event.timestamp, parent_id],
260            )?;
261        }
262        EventType::Close => {
263            conn.execute(
264                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
265                rusqlite::params![event.task_id, event.timestamp],
266            )?;
267            // Restore closure metadata from the event so the recorded outcome
268            // survives a full rebuild_state replay — the tasks row is rebuilt
269            // from events, and set_task_outcome's direct DB write would be lost.
270            if let Some(outcome) = event.meta.get("outcome").and_then(|v| v.as_str()) {
271                let tag = event.meta.get("outcome_tag").and_then(|v| v.as_str());
272                conn.execute(
273                    "UPDATE tasks SET outcome=?2, outcome_tag=?3 WHERE task_id=?1",
274                    rusqlite::params![event.task_id, outcome, tag],
275                )?;
276            }
277        }
278        EventType::Reopen => {
279            conn.execute(
280                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
281                rusqlite::params![event.task_id, event.timestamp],
282            )?;
283        }
284        EventType::Rename => {
285            // The new human-readable title is the event text. Replaying the
286            // JSONL in order means the last Rename wins — exactly what we want.
287            let title = event
288                .meta
289                .get("title")
290                .and_then(|v| v.as_str())
291                .unwrap_or(&event.text);
292            conn.execute(
293                "UPDATE tasks SET title=?2, last_event_at=?3 WHERE task_id=?1",
294                rusqlite::params![event.task_id, title, event.timestamp],
295            )?;
296        }
297        _ => {
298            conn.execute(
299                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
300                rusqlite::params![event.task_id, event.timestamp],
301            )?;
302        }
303    }
304    Ok(())
305}
306
307use std::io::BufRead;
308
309pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
310    let dir = state_dir.as_ref();
311    if !dir.exists() {
312        return Ok(vec![]);
313    }
314    let mut out = Vec::new();
315    for entry in std::fs::read_dir(dir)? {
316        let entry = entry?;
317        let path = entry.path();
318        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
319            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
320                out.push(stem.to_string());
321            }
322        }
323    }
324    Ok(out)
325}
326
327pub fn rebuild_state(
328    conn: &Connection,
329    jsonl_path: impl AsRef<Path>,
330    project_hash: &str,
331) -> anyhow::Result<usize> {
332    let f = std::fs::File::open(&jsonl_path)
333        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
334    let reader = std::io::BufReader::new(f);
335
336    let tx = conn.unchecked_transaction()?;
337    let mut count = 0;
338    let mut last_event_id: Option<String> = None;
339    for (i, line) in reader.lines().enumerate() {
340        let line = line.with_context(|| format!("read line {i}"))?;
341        if line.trim().is_empty() {
342            continue;
343        }
344        // Malformed JSONL lines are skipped with a warning so that one bad
345        // event cannot abort an otherwise-recoverable rebuild. SQL errors
346        // still propagate — those indicate schema/integrity problems.
347        let event: Event = match serde_json::from_str(&line) {
348            Ok(e) => e,
349            Err(err) => {
350                tracing::warn!(
351                    line_number = i + 1,
352                    error = %err,
353                    "skipping malformed JSONL line in rebuild_state"
354                );
355                continue;
356            }
357        };
358        upsert_task_from_event(&tx, &event, project_hash)?;
359        index_event(&tx, &event)?;
360        last_event_id = Some(event.event_id.clone());
361        count += 1;
362    }
363    if let Some(eid) = last_event_id.as_deref() {
364        record_last_indexed(&tx, project_hash, eid)?;
365    }
366    tx.commit()?;
367    Ok(count)
368}
369
370/// Returns whether a task with this id has been recorded in the derived
371/// state. Cheap O(1) lookup against the `tasks` primary key. Callers
372/// should run [`ingest_new_events`] first if they want to see the latest
373/// JSONL state.
374pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
375    let count: i64 = conn.query_row(
376        "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
377        rusqlite::params![task_id],
378        |r| r.get(0),
379    )?;
380    Ok(count > 0)
381}
382
383/// Status string for an existing task (e.g. "open", "closed"). Returns
384/// `None` when the task is unknown — caller decides whether that's a
385/// hard error or a route-to-pending case.
386pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
387    let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
388    let mut rows = stmt.query(rusqlite::params![task_id])?;
389    Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
390}
391
392/// Set or replace `tasks.goal` for an existing task. Caller is
393/// expected to have validated the task exists (via `task_exists`); we
394/// don't error on no-op rows so the upsert pattern is uniform.
395pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
396    conn.execute(
397        "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
398        rusqlite::params![goal, task_id],
399    )
400    .with_context(|| format!("set goal for {task_id}"))?;
401    // Pack cache is now stale for this task — drop the entry so the
402    // next render picks up the new goal.
403    conn.execute(
404        "DELETE FROM task_pack_cache WHERE task_id = ?1",
405        rusqlite::params![task_id],
406    )?;
407    Ok(())
408}
409
410/// Set or replace the closure metadata. Pass `None` for `outcome_tag`
411/// to leave it unset; pass `Some("done"|"abandoned"|"superseded")`
412/// for a structured tag. Free-text `outcome` is the primary field.
413pub fn set_task_outcome(
414    conn: &Connection,
415    task_id: &str,
416    outcome: &str,
417    outcome_tag: Option<&str>,
418) -> anyhow::Result<()> {
419    conn.execute(
420        "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
421        rusqlite::params![outcome, outcome_tag, task_id],
422    )
423    .with_context(|| format!("set outcome for {task_id}"))?;
424    conn.execute(
425        "DELETE FROM task_pack_cache WHERE task_id = ?1",
426        rusqlite::params![task_id],
427    )?;
428    Ok(())
429}
430
431/// Append an external reference to `tasks.external`. The column is
432/// stored as a comma-separated list — small, append-mostly, no
433/// uniqueness constraint. Acceptable shapes (loose, not enforced):
434/// `beads:claude-memory-rsw`, `github:#42`, `jira:PROJ-1234`.
435pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
436    let current: Option<String> = conn
437        .query_row(
438            "SELECT external FROM tasks WHERE task_id = ?1",
439            rusqlite::params![task_id],
440            |r| r.get::<_, Option<String>>(0),
441        )
442        .with_context(|| format!("read external for {task_id}"))?;
443    let next = match current {
444        Some(s) if !s.is_empty() => format!("{s},{reference}"),
445        _ => reference.to_string(),
446    };
447    conn.execute(
448        "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
449        rusqlite::params![next, task_id],
450    )?;
451    conn.execute(
452        "DELETE FROM task_pack_cache WHERE task_id = ?1",
453        rusqlite::params![task_id],
454    )?;
455    Ok(())
456}
457
458/// Read-only metadata bundle used by pack rendering (and TUI list
459/// teasers in v0.4.0+). Returns `None` for unknown tasks.
460#[derive(Debug, Clone, Default)]
461pub struct TaskMetadata {
462    pub goal: Option<String>,
463    pub outcome: Option<String>,
464    pub outcome_tag: Option<String>,
465    pub external: Option<String>,
466}
467
468pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
469    let mut stmt =
470        conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
471    let mut rows = stmt.query(rusqlite::params![task_id])?;
472    Ok(match rows.next()? {
473        Some(r) => Some(TaskMetadata {
474            goal: r.get::<_, Option<String>>(0)?,
475            outcome: r.get::<_, Option<String>>(1)?,
476            outcome_tag: r.get::<_, Option<String>>(2)?,
477            external: r.get::<_, Option<String>>(3)?,
478        }),
479        None => None,
480    })
481}
482
483/// One row of the stale-task report: an open task whose last event
484/// crossed the inactivity threshold.
485#[derive(Debug, Clone)]
486pub struct StaleTask {
487    pub task_id: String,
488    pub title: String,
489    pub last_event_at: String,
490    pub days_idle: i64,
491}
492
493/// Find open tasks with no event in the last `days` days. Sorted by
494/// idle time descending so the user sees the most ancient first.
495pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
496    let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
497    let cutoff_str = cutoff.to_rfc3339();
498    let mut stmt = conn.prepare(
499        "SELECT task_id, title, last_event_at FROM tasks
500         WHERE status = 'open' AND last_event_at < ?1
501         ORDER BY last_event_at ASC",
502    )?;
503    let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
504        Ok((
505            r.get::<_, String>(0)?,
506            r.get::<_, String>(1)?,
507            r.get::<_, String>(2)?,
508        ))
509    })?;
510    let now = chrono::Utc::now();
511    let mut out = Vec::new();
512    for row in rows {
513        let (task_id, title, last_at) = row?;
514        let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
515            .map(|d| d.with_timezone(&chrono::Utc))
516            .unwrap_or(now);
517        let days_idle = (now - dt).num_days();
518        out.push(StaleTask {
519            task_id,
520            title,
521            last_event_at: last_at,
522            days_idle,
523        });
524    }
525    Ok(out)
526}
527
528/// Score-weighted relationship between a fresh prompt's artifacts and
529/// every prior task's artifacts. Higher score = stronger continuation
530/// signal. Threshold tuning is the caller's job; v0.6.0 auto-link
531/// keeps anything with score > 0.0.
532#[derive(Debug, Clone)]
533pub struct RelatedTask {
534    pub task_id: String,
535    pub status: String,
536    pub score: f64,
537}
538
539/// Find tasks whose events overlap the given artifacts on any
540/// dimension we have a signal for. Weights:
541///   shared linked_issue → +1.0   (strongest, ticket id is unique)
542///   shared commit_hash  → +0.8   (commits are nearly unique)
543///   shared file path    → +0.3   (files churn across tasks)
544///
545/// The scan reads `events_index.artifacts` (JSON) directly with LIKE
546/// substring matches — JSON1 would be cleaner but keeps the codepath
547/// dependency-free. Returns top hits sorted by score desc; ties keep
548/// the most-recent task first.
549pub fn find_related_tasks(
550    conn: &Connection,
551    arts: &crate::artifacts::Artifacts,
552) -> anyhow::Result<Vec<RelatedTask>> {
553    use std::collections::HashMap;
554    if arts.is_empty() {
555        return Ok(Vec::new());
556    }
557    let mut scores: HashMap<String, f64> = HashMap::new();
558    let mut last_seen: HashMap<String, String> = HashMap::new();
559
560    let needles: Vec<(String, f64)> = arts
561        .linked_issues
562        .iter()
563        .map(|s| (s.clone(), 1.0))
564        .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
565        .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
566        .collect();
567
568    for (needle, weight) in needles {
569        let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
570        let mut stmt = conn.prepare(
571            "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
572             WHERE artifacts LIKE ?1
573             GROUP BY task_id
574             ORDER BY ts DESC",
575        )?;
576        let rows = stmt.query_map(rusqlite::params![pattern], |r| {
577            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
578        })?;
579        for row in rows {
580            let (id, ts) = row?;
581            *scores.entry(id.clone()).or_insert(0.0) += weight;
582            last_seen.insert(id, ts);
583        }
584    }
585
586    let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
587    for (id, score) in scores {
588        let status: Option<String> = conn
589            .query_row(
590                "SELECT status FROM tasks WHERE task_id = ?1",
591                rusqlite::params![&id],
592                |r| r.get(0),
593            )
594            .ok();
595        if let Some(status) = status {
596            out.push(RelatedTask {
597                task_id: id,
598                status,
599                score,
600            });
601        }
602    }
603    out.sort_by(|a, b| {
604        b.score
605            .partial_cmp(&a.score)
606            .unwrap_or(std::cmp::Ordering::Equal)
607            .then_with(|| {
608                let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
609                let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
610                ts_b.cmp(&ts_a)
611            })
612    });
613    Ok(out)
614}
615
616/// Find tasks (open or closed) whose events reference any of the given
617/// issue identifiers (FIN-868, JIRA-123, INC-7…). Looks at the
618/// per-event `artifacts.linked_issues` column populated on ingest.
619/// Returns `(task_id, status)` deduplicated, most-recent first. Used
620/// by the v0.5.0 Phase C auto-link flow to recognise that a fresh
621/// prompt is a continuation of a prior task.
622pub fn find_tasks_by_linked_issues(
623    conn: &Connection,
624    issues: &[String],
625) -> anyhow::Result<Vec<(String, String)>> {
626    if issues.is_empty() {
627        return Ok(Vec::new());
628    }
629    // Stage A: collect candidate task_ids whose events_index.artifacts
630    // contains any of the requested issue strings. JSON1 is overkill
631    // here — a substring LIKE on the raw JSON is correct given the
632    // ticket id format ("FIN-868") never appears outside its own
633    // linked_issues array.
634    let mut candidate_ids: Vec<String> = Vec::new();
635    for issue in issues {
636        let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
637        let mut stmt = conn.prepare(
638            "SELECT DISTINCT task_id FROM events_index
639             WHERE artifacts LIKE ?1
640             ORDER BY timestamp DESC",
641        )?;
642        let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
643        for r in rows {
644            let id = r?;
645            if !candidate_ids.contains(&id) {
646                candidate_ids.push(id);
647            }
648        }
649    }
650    // Stage B: hydrate status for each candidate.
651    let mut out = Vec::with_capacity(candidate_ids.len());
652    for id in candidate_ids {
653        let status: Option<String> = conn
654            .query_row(
655                "SELECT status FROM tasks WHERE task_id = ?1",
656                rusqlite::params![&id],
657                |r| r.get(0),
658            )
659            .ok();
660        if let Some(s) = status {
661            out.push((id, s));
662        }
663    }
664    Ok(out)
665}
666
667/// Re-run artifact extraction over every event of a task and write the
668/// result back to `events_index.artifacts`. Used to backfill events
669/// that were ingested before Phase B landed. Returns the number of
670/// events touched. Wipes the pack cache for the task so the next
671/// render reflects the freshly extracted artifacts.
672pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
673    let mut stmt = conn.prepare(
674        "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
675         LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
676         WHERE ei.task_id = ?1",
677    )?;
678    let rows: Vec<(String, String)> = stmt
679        .query_map(rusqlite::params![task_id], |r| {
680            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
681        })?
682        .collect::<Result<_, _>>()?;
683    let count = rows.len();
684    for (event_id, text) in rows {
685        let arts = crate::artifacts::extract(&text);
686        let json = if arts.is_empty() {
687            None
688        } else {
689            Some(serde_json::to_string(&arts)?)
690        };
691        conn.execute(
692            "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
693            rusqlite::params![json, event_id],
694        )?;
695    }
696    invalidate_pack_cascade(conn, task_id)?;
697    Ok(count)
698}
699
700/// Aggregate artifacts (commit hashes, PR URLs, ticket IDs, files,
701/// branches) across every event of a task, deduplicated. Reads the
702/// per-event JSON payload that `ingest_new_events` populated. Skips
703/// events whose `artifacts` column is NULL or unparseable rather than
704/// failing the pack render.
705pub fn task_artifacts(
706    conn: &Connection,
707    task_id: &str,
708) -> anyhow::Result<crate::artifacts::Artifacts> {
709    let mut stmt = conn.prepare(
710        "SELECT artifacts FROM events_index
711         WHERE task_id = ?1 AND artifacts IS NOT NULL
712         ORDER BY timestamp ASC",
713    )?;
714    let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
715    let mut acc = crate::artifacts::Artifacts::default();
716    for row in rows {
717        let json = row?;
718        if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
719            acc.merge(parsed);
720        }
721    }
722    Ok(acc)
723}
724
725/// Look up the most recent `event_id` we've ingested for this project.
726/// Returns `None` when the project has never been indexed (first call,
727/// or migration v002 just landed on an existing 0.1.x DB).
728fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
729    let mut stmt =
730        conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
731    let mut rows = stmt.query(rusqlite::params![project_hash])?;
732    if let Some(row) = rows.next()? {
733        Ok(Some(row.get::<_, String>(0)?))
734    } else {
735        Ok(None)
736    }
737}
738
739fn record_last_indexed(
740    conn: &Connection,
741    project_hash: &str,
742    event_id: &str,
743) -> anyhow::Result<()> {
744    conn.execute(
745        "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
746         VALUES (?1, ?2, ?3)
747         ON CONFLICT(project_hash) DO UPDATE SET
748             last_indexed_event_id = excluded.last_indexed_event_id,
749             updated_at = excluded.updated_at",
750        rusqlite::params![
751            project_hash,
752            event_id,
753            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
754        ],
755    )?;
756    Ok(())
757}
758
759/// Read only the tail of the JSONL log since the last call. The cheap path
760/// for hot loops (every MCP tool invocation): scan to the marker, ingest
761/// the rest, update the marker.
762///
763/// Falls back to a full [`rebuild_state`] in two cases:
764/// - No marker yet for this project (first call after migration v002 or
765///   on a brand-new install).
766/// - The stored marker is not present in the JSONL (corrupted / truncated
767///   file). A `tracing::warn!` is emitted so the operator notices.
768pub fn ingest_new_events(
769    conn: &Connection,
770    jsonl_path: impl AsRef<Path>,
771    project_hash: &str,
772) -> anyhow::Result<usize> {
773    let marker = match last_indexed_event_id(conn, project_hash)? {
774        Some(id) => id,
775        None => return rebuild_state(conn, jsonl_path, project_hash),
776    };
777
778    let f = std::fs::File::open(&jsonl_path)
779        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
780    let reader = std::io::BufReader::new(f);
781
782    // First pass: confirm the marker still exists in the file. If it does
783    // not, the JSONL has been rewritten under us — we can't trust the
784    // marker, so we fall back to a full rebuild.
785    let tx = conn.unchecked_transaction()?;
786    let mut found_marker = false;
787    let mut count = 0;
788    let mut last_event_id: Option<String> = None;
789    for (i, line) in reader.lines().enumerate() {
790        let line = line.with_context(|| format!("read line {i}"))?;
791        if line.trim().is_empty() {
792            continue;
793        }
794        let event: Event = match serde_json::from_str(&line) {
795            Ok(e) => e,
796            Err(err) => {
797                tracing::warn!(
798                    line_number = i + 1,
799                    error = %err,
800                    "skipping malformed JSONL line in ingest_new_events"
801                );
802                continue;
803            }
804        };
805        if !found_marker {
806            if event.event_id == marker {
807                found_marker = true;
808            }
809            continue;
810        }
811        upsert_task_from_event(&tx, &event, project_hash)?;
812        index_event(&tx, &event)?;
813        last_event_id = Some(event.event_id.clone());
814        count += 1;
815    }
816
817    if !found_marker {
818        // Discard the (empty) tx and rebuild from scratch.
819        drop(tx);
820        tracing::warn!(
821            project_hash = project_hash,
822            marker = marker.as_str(),
823            "last_indexed_event_id not found in JSONL — falling back to full rebuild"
824        );
825        return rebuild_state(conn, jsonl_path, project_hash);
826    }
827
828    if let Some(eid) = last_event_id.as_deref() {
829        record_last_indexed(&tx, project_hash, eid)?;
830    }
831    tx.commit()?;
832    Ok(count)
833}
834
835pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
836    let type_str = serde_json::to_value(event.event_type)?
837        .as_str()
838        .unwrap()
839        .to_string();
840    let status_str = serde_json::to_value(event.status)?
841        .as_str()
842        .unwrap()
843        .to_string();
844    // v0.5.0 Phase B: scrape artifacts (commit hashes, PR URLs, ticket
845    // IDs, file paths, branch names) out of the event text. Storing
846    // per-event so reclassify can recompute without touching foreign
847    // events; pack aggregates and dedupes across events at render time.
848    let artifacts = crate::artifacts::extract(&event.text);
849    let artifacts_json = if artifacts.is_empty() {
850        None
851    } else {
852        Some(serde_json::to_string(&artifacts)?)
853    };
854    conn.execute(
855        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
856         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
857        rusqlite::params![
858            event.event_id, event.task_id, type_str,
859            event.timestamp, event.confidence, status_str, artifacts_json
860        ],
861    )?;
862    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
863    conn.execute(
864        "DELETE FROM search_fts WHERE event_id=?1",
865        rusqlite::params![event.event_id],
866    )?;
867    conn.execute(
868        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
869        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
870    )?;
871
872    if event.event_type == EventType::Decision {
873        // v0.12.0: project structured alternatives (meta.alternatives) into
874        // a dedicated column so pack can render "considered A/B/C, chose X".
875        // Stored as the verbatim JSON of the meta value; NULL when absent.
876        let alternatives_json = match event.meta.get("alternatives") {
877            Some(v) if !v.is_null() => Some(serde_json::to_string(v)?),
878            _ => None,
879        };
880        conn.execute(
881            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status, alternatives)
882             VALUES (?1, ?2, ?3, 'active', ?4)",
883            rusqlite::params![event.event_id, event.task_id, event.text, alternatives_json],
884        )?;
885    }
886
887    if event.event_type == EventType::Supersede {
888        if let Some(target) = &event.supersedes {
889            conn.execute(
890                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
891                rusqlite::params![event.event_id, target],
892            )?;
893        }
894    }
895
896    if event.event_type == EventType::Evidence {
897        let strength_str = event
898            .evidence_strength
899            .map(|s| {
900                serde_json::to_value(s)
901                    .unwrap()
902                    .as_str()
903                    .unwrap()
904                    .to_string()
905            })
906            .unwrap_or_else(|| "medium".into());
907        conn.execute(
908            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
909             VALUES (?1, ?2, ?3, ?4)",
910            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
911        )?;
912    }
913
914    // Invalidate any cached pack for this task — and its parent, whose
915    // Subtasks roll-up depends on this child.
916    invalidate_pack_cascade(conn, &event.task_id)?;
917
918    Ok(())
919}
920
921pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
922    if let Some(parent) = path.as_ref().parent() {
923        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
924    }
925    let conn =
926        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
927    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
928    apply_migrations(&conn).context("apply schema migrations")?;
929    Ok(conn)
930}
931
932/// One row of the task list rendered by the TUI: enough to render the
933/// list view without round-tripping for each task. `event_count` joins
934/// `events_index` so we don't need a second query per row.
935#[derive(Debug, Clone)]
936pub struct TaskRow {
937    pub task_id: String,
938    pub title: String,
939    pub status: String,
940    pub last_event_at: String,
941    pub event_count: usize,
942}
943
944/// All tasks for a project, ordered with open ones first (by recency)
945/// then closed ones. The TUI list view binds directly to this — there
946/// is no other consumer, so the shape is tuned for that callsite.
947pub fn list_tasks_by_project(
948    conn: &Connection,
949    project_hash: &str,
950) -> anyhow::Result<Vec<TaskRow>> {
951    let mut stmt = conn.prepare(
952        "SELECT t.task_id, t.title, t.status, t.last_event_at,
953                COALESCE(c.cnt, 0) AS event_count
954         FROM tasks t
955         LEFT JOIN (
956             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
957         ) c ON c.task_id = t.task_id
958         WHERE t.project_hash = ?1
959         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
960    )?;
961    let rows = stmt
962        .query_map(rusqlite::params![project_hash], |r| {
963            Ok(TaskRow {
964                task_id: r.get::<_, String>(0)?,
965                title: r.get::<_, String>(1)?,
966                status: r.get::<_, String>(2)?,
967                last_event_at: r.get::<_, String>(3)?,
968                event_count: r.get::<_, i64>(4)? as usize,
969            })
970        })?
971        .collect::<Result<Vec<_>, _>>()?;
972    Ok(rows)
973}
974
975/// Top-level tasks for a project (those with no parent), ordered like
976/// `list_tasks_by_project` — open first, then by recency. The roots of
977/// the `list --tree` view.
978pub fn top_level_tasks(conn: &Connection, project_hash: &str) -> anyhow::Result<Vec<TaskRow>> {
979    let mut stmt = conn.prepare(
980        "SELECT t.task_id, t.title, t.status, t.last_event_at,
981                COALESCE(c.cnt, 0) AS event_count
982         FROM tasks t
983         LEFT JOIN (
984             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
985         ) c ON c.task_id = t.task_id
986         WHERE t.project_hash = ?1 AND t.parent_id IS NULL
987         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
988    )?;
989    let rows = stmt
990        .query_map(rusqlite::params![project_hash], |r| {
991            Ok(TaskRow {
992                task_id: r.get::<_, String>(0)?,
993                title: r.get::<_, String>(1)?,
994                status: r.get::<_, String>(2)?,
995                last_event_at: r.get::<_, String>(3)?,
996                event_count: r.get::<_, i64>(4)? as usize,
997            })
998        })?
999        .collect::<Result<Vec<_>, _>>()?;
1000    Ok(rows)
1001}
1002
1003/// Direct children of a task (one level), newest activity first.
1004pub fn children_of(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<TaskRow>> {
1005    let mut stmt = conn.prepare(
1006        "SELECT t.task_id, t.title, t.status, t.last_event_at,
1007                COALESCE(c.cnt, 0) AS event_count
1008         FROM tasks t
1009         LEFT JOIN (
1010             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
1011         ) c ON c.task_id = t.task_id
1012         WHERE t.parent_id = ?1
1013         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
1014    )?;
1015    let rows = stmt
1016        .query_map(rusqlite::params![task_id], |r| {
1017            Ok(TaskRow {
1018                task_id: r.get::<_, String>(0)?,
1019                title: r.get::<_, String>(1)?,
1020                status: r.get::<_, String>(2)?,
1021                last_event_at: r.get::<_, String>(3)?,
1022                event_count: r.get::<_, i64>(4)? as usize,
1023            })
1024        })?
1025        .collect::<Result<Vec<_>, _>>()?;
1026    Ok(rows)
1027}
1028
1029/// The stored parent of a task, if any.
1030pub fn parent_of(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
1031    let mut stmt = conn.prepare("SELECT parent_id FROM tasks WHERE task_id = ?1")?;
1032    let mut rows = stmt.query(rusqlite::params![task_id])?;
1033    Ok(match rows.next()? {
1034        Some(r) => r.get::<_, Option<String>>(0)?,
1035        None => None,
1036    })
1037}
1038
1039/// True if setting `new_parent` as the parent of `task_id` would create a
1040/// cycle (i.e. `new_parent` is `task_id` itself or a descendant of it).
1041/// Walks ancestors of `new_parent`; a depth cap guards against pre-existing
1042/// corrupt cycles.
1043pub fn would_create_cycle(
1044    conn: &Connection,
1045    task_id: &str,
1046    new_parent: &str,
1047) -> anyhow::Result<bool> {
1048    if task_id == new_parent {
1049        return Ok(true);
1050    }
1051    let mut cursor = Some(new_parent.to_string());
1052    for _ in 0..64 {
1053        let Some(cur) = cursor else {
1054            return Ok(false);
1055        };
1056        if cur == task_id {
1057            return Ok(true);
1058        }
1059        cursor = parent_of(conn, &cur)?;
1060    }
1061    // Depth cap exceeded — treat as a cycle to be safe.
1062    Ok(true)
1063}
1064
1065/// Number of direct children of `task_id` whose status is still open.
1066pub fn count_open_children(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
1067    let n: i64 = conn.query_row(
1068        "SELECT COUNT(*) FROM tasks WHERE parent_id = ?1 AND status = 'open'",
1069        rusqlite::params![task_id],
1070        |r| r.get(0),
1071    )?;
1072    Ok(n as usize)
1073}
1074
1075/// Clear the pack cache for a task and its parent (roll-up depends on both).
1076pub fn invalidate_pack_cascade(conn: &Connection, task_id: &str) -> anyhow::Result<()> {
1077    conn.execute(
1078        "DELETE FROM task_pack_cache WHERE task_id = ?1",
1079        rusqlite::params![task_id],
1080    )?;
1081    if let Some(parent) = parent_of(conn, task_id)? {
1082        conn.execute(
1083            "DELETE FROM task_pack_cache WHERE task_id = ?1",
1084            rusqlite::params![parent],
1085        )?;
1086    }
1087    Ok(())
1088}
1089
1090// ---------------------------------------------------------------------------
1091// Semantic-memory substrate (Pillar A / schema v008).
1092// ---------------------------------------------------------------------------
1093
1094/// One event awaiting an embedding: its id, task, and the text to embed.
1095pub struct PendingEmbed {
1096    pub event_id: String,
1097    pub task_id: String,
1098    pub text: String,
1099}
1100
1101/// Events that have no up-to-date embedding for `model` — either never embedded
1102/// or embedded by a different model. Pulls the text straight from `search_fts`.
1103/// `limit` bounds the batch; pass a large value to drain.
1104pub fn events_needing_embedding(
1105    conn: &Connection,
1106    model: &str,
1107    limit: usize,
1108) -> anyhow::Result<Vec<PendingEmbed>> {
1109    let mut stmt = conn.prepare(
1110        "SELECT f.event_id, f.task_id, f.text
1111           FROM search_fts f
1112           LEFT JOIN embeddings e ON e.event_id = f.event_id AND e.model = ?1
1113          WHERE e.event_id IS NULL
1114          LIMIT ?2",
1115    )?;
1116    let rows = stmt.query_map(rusqlite::params![model, limit as i64], |r| {
1117        Ok(PendingEmbed {
1118            event_id: r.get(0)?,
1119            task_id: r.get(1)?,
1120            text: r.get(2)?,
1121        })
1122    })?;
1123    let mut out = Vec::new();
1124    for r in rows {
1125        out.push(r?);
1126    }
1127    Ok(out)
1128}
1129
1130/// Upsert one vector. Keyed on `event_id`, so re-embedding (e.g. after a model
1131/// change) replaces the prior row idempotently across `rebuild_state` replays.
1132#[allow(clippy::too_many_arguments)]
1133pub fn upsert_embedding(
1134    conn: &Connection,
1135    event_id: &str,
1136    task_id: &str,
1137    project_hash: &str,
1138    tier: &str,
1139    model: &str,
1140    dim: usize,
1141    vec: &[f32],
1142    created_at: &str,
1143) -> anyhow::Result<()> {
1144    conn.execute(
1145        "INSERT OR REPLACE INTO embeddings(event_id, task_id, project_hash, tier, model, dim, vec, created_at)
1146         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1147        rusqlite::params![
1148            event_id,
1149            task_id,
1150            project_hash,
1151            tier,
1152            model,
1153            dim as i64,
1154            crate::embed::to_blob(vec),
1155            created_at
1156        ],
1157    )?;
1158    Ok(())
1159}
1160
1161/// High-signal events (decisions, constraints, rejections) for consolidation —
1162/// `(event_id, text)`, newest first, capped at `limit`.
1163pub fn high_signal_events(
1164    conn: &Connection,
1165    limit: usize,
1166) -> anyhow::Result<Vec<(String, String)>> {
1167    let mut stmt = conn.prepare(
1168        "SELECT f.event_id, f.text
1169           FROM search_fts f
1170           JOIN events_index ei ON ei.event_id = f.event_id
1171          WHERE f.type IN ('decision', 'constraint', 'rejection')
1172          ORDER BY ei.timestamp DESC
1173          LIMIT ?1",
1174    )?;
1175    let rows = stmt.query_map(rusqlite::params![limit as i64], |r| {
1176        Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1177    })?;
1178    let mut out = Vec::new();
1179    for r in rows {
1180        out.push(r?);
1181    }
1182    Ok(out)
1183}
1184
1185/// First task whose title exactly matches `title`, if any — used to find the
1186/// reusable per-project consolidation task.
1187pub fn find_task_by_title(conn: &Connection, title: &str) -> anyhow::Result<Option<String>> {
1188    let mut stmt = conn.prepare("SELECT task_id FROM tasks WHERE title = ?1 LIMIT 1")?;
1189    let mut rows = stmt.query(rusqlite::params![title])?;
1190    match rows.next()? {
1191        Some(row) => Ok(Some(row.get(0)?)),
1192        None => Ok(None),
1193    }
1194}
1195
1196/// Texts of all events under a task (for de-duplicating consolidated facts).
1197pub fn task_event_texts(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<String>> {
1198    let mut stmt = conn.prepare("SELECT text FROM search_fts WHERE task_id = ?1")?;
1199    let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
1200    let mut out = Vec::new();
1201    for r in rows {
1202        out.push(r?);
1203    }
1204    Ok(out)
1205}
1206
1207/// Number of stored embeddings for a project (test/stats helper).
1208pub fn count_embeddings(conn: &Connection, project_hash: &str) -> anyhow::Result<usize> {
1209    let n: i64 = conn.query_row(
1210        "SELECT COUNT(*) FROM embeddings WHERE project_hash = ?1",
1211        rusqlite::params![project_hash],
1212        |r| r.get(0),
1213    )?;
1214    Ok(n as usize)
1215}
1216
1217/// Embed up to `limit` events that still need a vector for the embedder's model,
1218/// and store them. Returns how many were embedded this call. Shared by
1219/// embed-on-ingest (small batch after `ingest_new_events`) and
1220/// `embed --backfill` (looped until it returns 0). Every pending text gets a
1221/// vector — including short boilerplate — so nothing is re-scanned next pass;
1222/// retrieval-side filtering ([`crate::embed::is_embeddable`]) decides what's
1223/// worth surfacing.
1224pub fn embed_pending(
1225    conn: &Connection,
1226    project_hash: &str,
1227    embedder: &dyn crate::embed::Embedder,
1228    created_at: &str,
1229    limit: usize,
1230) -> anyhow::Result<usize> {
1231    let pending = events_needing_embedding(conn, embedder.model_id(), limit)?;
1232    if pending.is_empty() {
1233        return Ok(0);
1234    }
1235    let texts: Vec<&str> = pending.iter().map(|p| p.text.as_str()).collect();
1236    let vecs = embedder.embed(&texts)?;
1237    let mut done = 0usize;
1238    for (p, v) in pending.iter().zip(vecs.iter()) {
1239        upsert_embedding(
1240            conn,
1241            &p.event_id,
1242            &p.task_id,
1243            project_hash,
1244            "episodic",
1245            embedder.model_id(),
1246            embedder.dim(),
1247            v,
1248            created_at,
1249        )?;
1250        done += 1;
1251    }
1252    Ok(done)
1253}
1254
1255/// A retrieval hit: the event, its task, and the relevance score.
1256pub struct ScoredHit {
1257    pub event_id: String,
1258    pub task_id: String,
1259    pub task_title: String,
1260    pub event_type: String,
1261    pub tier: String,
1262    pub text: String,
1263    pub score: f32,
1264}
1265
1266/// Semantic search over a project's embeddings. Scores every stored vector for
1267/// `model` against `query_vec` by cosine, returns the top `k` by score. The
1268/// caller embeds the query with the same embedder so the model ids match.
1269/// Pure vector ranking for now; recency / tier / contradiction weighting layer
1270/// on top in later phases.
1271pub fn semantic_search(
1272    conn: &Connection,
1273    project_hash: &str,
1274    query_vec: &[f32],
1275    model: &str,
1276    k: usize,
1277) -> anyhow::Result<Vec<ScoredHit>> {
1278    let mut stmt = conn.prepare(
1279        "SELECT e.event_id, e.task_id, e.tier, e.vec, f.text, f.type,
1280                COALESCE(t.title, '')
1281           FROM embeddings e
1282           JOIN search_fts f ON f.event_id = e.event_id
1283           LEFT JOIN tasks t ON t.task_id = e.task_id
1284          WHERE e.project_hash = ?1 AND e.model = ?2",
1285    )?;
1286    let rows = stmt.query_map(rusqlite::params![project_hash, model], |r| {
1287        let blob: Vec<u8> = r.get(3)?;
1288        Ok((
1289            r.get::<_, String>(0)?, // event_id
1290            r.get::<_, String>(1)?, // task_id
1291            r.get::<_, String>(2)?, // tier
1292            blob,
1293            r.get::<_, String>(4)?, // text
1294            r.get::<_, String>(5)?, // type
1295            r.get::<_, String>(6)?, // title
1296        ))
1297    })?;
1298
1299    let mut hits: Vec<ScoredHit> = Vec::new();
1300    for row in rows {
1301        let (event_id, task_id, tier, blob, text, event_type, task_title) = row?;
1302        let score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
1303        hits.push(ScoredHit {
1304            event_id,
1305            task_id,
1306            task_title,
1307            event_type,
1308            tier,
1309            text,
1310            score,
1311        });
1312    }
1313    hits.sort_by(|a, b| {
1314        b.score
1315            .partial_cmp(&a.score)
1316            .unwrap_or(std::cmp::Ordering::Equal)
1317    });
1318    hits.truncate(k);
1319    Ok(hits)
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324    use super::*;
1325    use crate::embed::Embedder;
1326    use tempfile::TempDir;
1327
1328    #[test]
1329    fn task_exists_returns_true_for_known_id_false_otherwise() {
1330        let d = TempDir::new().unwrap();
1331        let conn = open(d.path().join("s.sqlite")).unwrap();
1332
1333        assert!(!task_exists(&conn, "tj-nope").unwrap());
1334
1335        let e = make_open_event("tj-yes", "Hello");
1336        upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
1337        index_event(&conn, &e).unwrap();
1338
1339        assert!(task_exists(&conn, "tj-yes").unwrap());
1340        assert!(!task_exists(&conn, "tj-nope").unwrap());
1341    }
1342
1343    #[test]
1344    fn rename_event_updates_task_title() {
1345        let d = TempDir::new().unwrap();
1346        let conn = open(d.path().join("s.sqlite")).unwrap();
1347        let ph = "feedfacefeedface";
1348
1349        let open_ev = make_open_event("tj-rn", "#: 5");
1350        upsert_task_from_event(&conn, &open_ev, ph).unwrap();
1351        let title: String = conn
1352            .query_row("SELECT title FROM tasks WHERE task_id='tj-rn'", [], |r| {
1353                r.get(0)
1354            })
1355            .unwrap();
1356        assert_eq!(title, "#: 5");
1357
1358        let mut rename = crate::event::Event::new(
1359            "tj-rn",
1360            crate::event::EventType::Rename,
1361            crate::event::Author::Agent,
1362            crate::event::Source::Cli,
1363            "Support BID 29683996 — voucher refund 50% vs promised 100%".into(),
1364        );
1365        rename.timestamp = "2099-01-01T00:00:00.000Z".into();
1366        upsert_task_from_event(&conn, &rename, ph).unwrap();
1367
1368        let title: String = conn
1369            .query_row("SELECT title FROM tasks WHERE task_id='tj-rn'", [], |r| {
1370                r.get(0)
1371            })
1372            .unwrap();
1373        assert_eq!(
1374            title,
1375            "Support BID 29683996 — voucher refund 50% vs promised 100%"
1376        );
1377    }
1378
1379    #[test]
1380    fn close_event_restores_outcome_from_meta() {
1381        let d = TempDir::new().unwrap();
1382        let conn = open(d.path().join("s.sqlite")).unwrap();
1383        let ph = "feedfacefeedface";
1384
1385        let open_ev = make_open_event("tj-cl", "T");
1386        upsert_task_from_event(&conn, &open_ev, ph).unwrap();
1387
1388        let mut close = crate::event::Event::new(
1389            "tj-cl",
1390            crate::event::EventType::Close,
1391            crate::event::Author::Agent,
1392            crate::event::Source::Cli,
1393            "done".into(),
1394        );
1395        close.meta = serde_json::json!({"outcome": "Shipped the fix.", "outcome_tag": "done"});
1396        upsert_task_from_event(&conn, &close, ph).unwrap();
1397
1398        let (status, outcome, tag): (String, Option<String>, Option<String>) = conn
1399            .query_row(
1400                "SELECT status, outcome, outcome_tag FROM tasks WHERE task_id='tj-cl'",
1401                [],
1402                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1403            )
1404            .unwrap();
1405        assert_eq!(status, "closed");
1406        assert_eq!(outcome.as_deref(), Some("Shipped the fix."));
1407        assert_eq!(tag.as_deref(), Some("done"));
1408    }
1409
1410    #[test]
1411    fn fresh_db_runs_all_migrations() {
1412        let d = TempDir::new().unwrap();
1413        let p = d.path().join("state.sqlite");
1414        let conn = open(&p).unwrap();
1415
1416        let applied: Vec<i64> = conn
1417            .prepare("SELECT version FROM schema_migrations ORDER BY version")
1418            .unwrap()
1419            .query_map([], |r| r.get::<_, i64>(0))
1420            .unwrap()
1421            .collect::<Result<_, _>>()
1422            .unwrap();
1423        assert_eq!(
1424            applied,
1425            (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
1426            "every declared migration must be recorded"
1427        );
1428    }
1429
1430    #[test]
1431    fn apply_migrations_is_idempotent_across_reopens() {
1432        let d = TempDir::new().unwrap();
1433        let p = d.path().join("state.sqlite");
1434        let _ = open(&p).unwrap();
1435        let _ = open(&p).unwrap();
1436
1437        let count: i64 = open(&p)
1438            .unwrap()
1439            .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
1440            .unwrap();
1441        assert_eq!(
1442            count,
1443            MIGRATIONS.len() as i64,
1444            "schema_migrations must contain exactly one row per declared migration after repeated opens"
1445        );
1446    }
1447
1448    fn make_text_event(text: &str) -> crate::event::Event {
1449        crate::event::Event::new(
1450            "tj-x",
1451            crate::event::EventType::Finding,
1452            crate::event::Author::User,
1453            crate::event::Source::Cli,
1454            text.into(),
1455        )
1456    }
1457
1458    #[test]
1459    fn embed_pending_embeds_all_then_is_idempotent() {
1460        let d = TempDir::new().unwrap();
1461        let conn = open(d.path().join("s.sqlite")).unwrap();
1462        let ph = "feedfacefeedface";
1463
1464        for text in [
1465            "implement payment refund deduplication",
1466            "add validation for negative order amounts",
1467        ] {
1468            index_event(&conn, &make_text_event(text)).unwrap();
1469        }
1470
1471        let emb = crate::embed::HashEmbedder::new(64);
1472        let at = "2026-06-12T00:00:00Z";
1473
1474        let n = embed_pending(&conn, ph, &emb, at, 100).unwrap();
1475        assert_eq!(n, 2, "both events embedded on first pass");
1476        assert_eq!(count_embeddings(&conn, ph).unwrap(), 2);
1477
1478        // Idempotent: nothing left for this model on a second pass.
1479        assert_eq!(embed_pending(&conn, ph, &emb, at, 100).unwrap(), 0);
1480
1481        // Model-scoped: a different model id sees them as un-embedded
1482        // (so a model change triggers a re-embed).
1483        assert_eq!(
1484            events_needing_embedding(&conn, "other-model", 100)
1485                .unwrap()
1486                .len(),
1487            2
1488        );
1489    }
1490
1491    #[test]
1492    fn semantic_search_ranks_relevant_event_first() {
1493        let d = TempDir::new().unwrap();
1494        let conn = open(d.path().join("s.sqlite")).unwrap();
1495        let ph = "feedfacefeedface";
1496
1497        for text in [
1498            "fix duplicate payment refund write on partial refund",
1499            "update the frontend button hover color",
1500            "add a database index for faster user lookup",
1501        ] {
1502            index_event(&conn, &make_text_event(text)).unwrap();
1503        }
1504        let emb = crate::embed::HashEmbedder::new(256);
1505        embed_pending(&conn, ph, &emb, "t", 100).unwrap();
1506
1507        let q = emb.embed_one("payment refund duplicated").unwrap();
1508        let hits = semantic_search(&conn, ph, &q, emb.model_id(), 3).unwrap();
1509
1510        assert_eq!(hits.len(), 3);
1511        assert!(
1512            hits[0].text.contains("refund"),
1513            "the refund event must rank first, got: {}",
1514            hits[0].text
1515        );
1516        assert!(
1517            hits[0].score >= hits[1].score,
1518            "hits must be sorted by score desc"
1519        );
1520    }
1521
1522    #[test]
1523    fn open_creates_all_tables() {
1524        let d = TempDir::new().unwrap();
1525        let p = d.path().join("state.sqlite");
1526        let conn = open(&p).unwrap();
1527
1528        let names: Vec<String> = conn
1529            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
1530            .unwrap()
1531            .query_map([], |r| r.get::<_, String>(0))
1532            .unwrap()
1533            .collect::<Result<_, _>>()
1534            .unwrap();
1535
1536        for required in [
1537            "decisions",
1538            "events_index",
1539            "evidence",
1540            "task_pack_cache",
1541            "tasks",
1542            "search_fts",
1543        ] {
1544            assert!(
1545                names.iter().any(|n| n == required),
1546                "missing table {required}, have {names:?}"
1547            );
1548        }
1549    }
1550
1551    #[test]
1552    fn open_is_idempotent() {
1553        let d = TempDir::new().unwrap();
1554        let p = d.path().join("state.sqlite");
1555        let _ = open(&p).unwrap();
1556        let _ = open(&p).unwrap();
1557    }
1558
1559    #[test]
1560    fn index_event_projects_evidence() {
1561        let d = TempDir::new().unwrap();
1562        let conn = open(d.path().join("s.sqlite")).unwrap();
1563        let mut open_e = crate::event::Event::new(
1564            "tj-e",
1565            crate::event::EventType::Open,
1566            crate::event::Author::User,
1567            crate::event::Source::Cli,
1568            "x".into(),
1569        );
1570        open_e.meta = serde_json::json!({"title": "T"});
1571        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1572        index_event(&conn, &open_e).unwrap();
1573
1574        let mut ev = crate::event::Event::new(
1575            "tj-e",
1576            crate::event::EventType::Evidence,
1577            crate::event::Author::Agent,
1578            crate::event::Source::Chat,
1579            "Hook startup measured at 12ms".into(),
1580        );
1581        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
1582        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
1583        index_event(&conn, &ev).unwrap();
1584
1585        let (text, strength): (String, String) = conn
1586            .query_row(
1587                "SELECT text, strength FROM evidence WHERE task_id=?1",
1588                rusqlite::params!["tj-e"],
1589                |r| Ok((r.get(0)?, r.get(1)?)),
1590            )
1591            .unwrap();
1592        assert!(text.contains("12ms"));
1593        assert_eq!(strength, "strong");
1594    }
1595
1596    #[test]
1597    fn supersede_event_marks_decision_superseded() {
1598        let d = TempDir::new().unwrap();
1599        let conn = open(d.path().join("s.sqlite")).unwrap();
1600        let mut open_e = crate::event::Event::new(
1601            "tj-s",
1602            crate::event::EventType::Open,
1603            crate::event::Author::User,
1604            crate::event::Source::Cli,
1605            "x".into(),
1606        );
1607        open_e.meta = serde_json::json!({"title": "T"});
1608        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1609        index_event(&conn, &open_e).unwrap();
1610
1611        let dec = crate::event::Event::new(
1612            "tj-s",
1613            crate::event::EventType::Decision,
1614            crate::event::Author::Agent,
1615            crate::event::Source::Chat,
1616            "Use TS".into(),
1617        );
1618        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1619        index_event(&conn, &dec).unwrap();
1620
1621        let mut sup = crate::event::Event::new(
1622            "tj-s",
1623            crate::event::EventType::Supersede,
1624            crate::event::Author::Agent,
1625            crate::event::Source::Chat,
1626            "Replaced by Rust decision".into(),
1627        );
1628        sup.supersedes = Some(dec.event_id.clone());
1629        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1630        index_event(&conn, &sup).unwrap();
1631
1632        let (status, by): (String, Option<String>) = conn
1633            .query_row(
1634                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1635                rusqlite::params![dec.event_id],
1636                |r| Ok((r.get(0)?, r.get(1)?)),
1637            )
1638            .unwrap();
1639        assert_eq!(status, "superseded");
1640        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1641    }
1642
1643    #[test]
1644    fn index_event_projects_decision_to_decisions_table() {
1645        let d = TempDir::new().unwrap();
1646        let conn = open(d.path().join("s.sqlite")).unwrap();
1647
1648        let mut open_e = crate::event::Event::new(
1649            "tj-d",
1650            crate::event::EventType::Open,
1651            crate::event::Author::User,
1652            crate::event::Source::Cli,
1653            "x".into(),
1654        );
1655        open_e.meta = serde_json::json!({"title": "T"});
1656        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1657        index_event(&conn, &open_e).unwrap();
1658
1659        let dec = crate::event::Event::new(
1660            "tj-d",
1661            crate::event::EventType::Decision,
1662            crate::event::Author::Agent,
1663            crate::event::Source::Chat,
1664            "Adopt Rust".into(),
1665        );
1666        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1667        index_event(&conn, &dec).unwrap();
1668
1669        let (id, text, status): (String, String, String) = conn
1670            .query_row(
1671                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1672                rusqlite::params!["tj-d"],
1673                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1674            )
1675            .unwrap();
1676        assert_eq!(id, dec.event_id);
1677        assert_eq!(text, "Adopt Rust");
1678        assert_eq!(status, "active");
1679    }
1680
1681    #[test]
1682    fn index_event_projects_decision_alternatives_into_column() {
1683        let d = TempDir::new().unwrap();
1684        let conn = open(d.path().join("s.sqlite")).unwrap();
1685
1686        let mut dec = crate::event::Event::new(
1687            "tj-alt",
1688            crate::event::EventType::Decision,
1689            crate::event::Author::Agent,
1690            crate::event::Source::Chat,
1691            "Use SQLite".into(),
1692        );
1693        dec.meta = serde_json::json!({
1694            "alternatives": [
1695                {"option": "SQLite", "chosen": true, "rationale": "embedded, zero-ops"},
1696                {"option": "Postgres", "chosen": false, "rationale": "too heavy for local tool"}
1697            ]
1698        });
1699        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1700        index_event(&conn, &dec).unwrap();
1701
1702        let alts: Option<String> = conn
1703            .query_row(
1704                "SELECT alternatives FROM decisions WHERE decision_id=?1",
1705                rusqlite::params![dec.event_id],
1706                |r| r.get(0),
1707            )
1708            .unwrap();
1709        let alts = alts.expect("alternatives column should be populated");
1710        let parsed: serde_json::Value = serde_json::from_str(&alts).unwrap();
1711        assert_eq!(parsed.as_array().unwrap().len(), 2);
1712        assert_eq!(parsed[0]["option"], "SQLite");
1713        assert_eq!(parsed[0]["chosen"], true);
1714    }
1715
1716    #[test]
1717    fn index_event_decision_without_alternatives_leaves_column_null() {
1718        let d = TempDir::new().unwrap();
1719        let conn = open(d.path().join("s.sqlite")).unwrap();
1720
1721        let dec = crate::event::Event::new(
1722            "tj-noalt",
1723            crate::event::EventType::Decision,
1724            crate::event::Author::Agent,
1725            crate::event::Source::Chat,
1726            "Plain decision".into(),
1727        );
1728        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1729        index_event(&conn, &dec).unwrap();
1730
1731        let alts: Option<String> = conn
1732            .query_row(
1733                "SELECT alternatives FROM decisions WHERE decision_id=?1",
1734                rusqlite::params![dec.event_id],
1735                |r| r.get(0),
1736            )
1737            .unwrap();
1738        assert!(alts.is_none());
1739    }
1740
1741    #[test]
1742    fn index_event_is_idempotent_no_search_fts_duplicates() {
1743        let d = TempDir::new().unwrap();
1744        let conn = open(d.path().join("s.sqlite")).unwrap();
1745        let mut open_e = crate::event::Event::new(
1746            "tj-id",
1747            crate::event::EventType::Open,
1748            crate::event::Author::User,
1749            crate::event::Source::Cli,
1750            "x".into(),
1751        );
1752        open_e.meta = serde_json::json!({"title": "Idempotent"});
1753        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1754
1755        // Index three times — simulates rebuild_state replays.
1756        index_event(&conn, &open_e).unwrap();
1757        index_event(&conn, &open_e).unwrap();
1758        index_event(&conn, &open_e).unwrap();
1759
1760        let n: i64 = conn
1761            .query_row(
1762                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1763                rusqlite::params![open_e.event_id],
1764                |r| r.get(0),
1765            )
1766            .unwrap();
1767        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1768    }
1769
1770    #[test]
1771    fn list_all_projects_returns_hashes_from_state_dir() {
1772        use std::fs::File;
1773        let d = TempDir::new().unwrap();
1774        let state_dir = d.path().join("state");
1775        std::fs::create_dir_all(&state_dir).unwrap();
1776        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1777        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1778        File::create(state_dir.join("not-a-project.txt")).unwrap();
1779
1780        let mut hashes = list_all_projects(&state_dir).unwrap();
1781        hashes.sort();
1782        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1783    }
1784
1785    fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1786        use std::io::Write;
1787        writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1788    }
1789
1790    fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1791        let mut e = crate::event::Event::new(
1792            task_id,
1793            crate::event::EventType::Open,
1794            crate::event::Author::User,
1795            crate::event::Source::Cli,
1796            "x".into(),
1797        );
1798        e.meta = serde_json::json!({"title": title});
1799        e
1800    }
1801
1802    #[test]
1803    fn ingest_new_events_picks_up_only_new_lines() {
1804        let d = TempDir::new().unwrap();
1805        let jsonl = d.path().join("events.jsonl");
1806        let db = d.path().join("s.sqlite");
1807        let project = "deadbeefdeadbeef";
1808
1809        let e1 = make_open_event("tj-i1", "first");
1810        let e2 = make_open_event("tj-i2", "second");
1811        let e3 = make_open_event("tj-i3", "third");
1812
1813        let mut f = std::fs::File::create(&jsonl).unwrap();
1814        write_event_line(&mut f, &e1);
1815        write_event_line(&mut f, &e2);
1816        write_event_line(&mut f, &e3);
1817        drop(f);
1818
1819        // First pass — no marker yet, falls back to a full rebuild.
1820        let conn = open(&db).unwrap();
1821        let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1822        assert_eq!(n_first, 3);
1823
1824        // Append two more events.
1825        let e4 = make_open_event("tj-i4", "fourth");
1826        let e5 = make_open_event("tj-i5", "fifth");
1827        let mut f = std::fs::OpenOptions::new()
1828            .append(true)
1829            .open(&jsonl)
1830            .unwrap();
1831        write_event_line(&mut f, &e4);
1832        write_event_line(&mut f, &e5);
1833        drop(f);
1834
1835        // Second pass — marker = e3, only e4 + e5 must be processed.
1836        let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1837        assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1838
1839        let total: i64 = conn
1840            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1841            .unwrap();
1842        assert_eq!(total, 5);
1843
1844        let marker: String = conn
1845            .query_row(
1846                "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1847                rusqlite::params![project],
1848                |r| r.get(0),
1849            )
1850            .unwrap();
1851        assert_eq!(marker, e5.event_id);
1852    }
1853
1854    #[test]
1855    fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1856        let d = TempDir::new().unwrap();
1857        let jsonl = d.path().join("events.jsonl");
1858        let db = d.path().join("s.sqlite");
1859        let project = "feedfacefeedface";
1860
1861        let e1 = make_open_event("tj-r1", "first");
1862        let mut f = std::fs::File::create(&jsonl).unwrap();
1863        write_event_line(&mut f, &e1);
1864        drop(f);
1865
1866        let conn = open(&db).unwrap();
1867        ingest_new_events(&conn, &jsonl, project).unwrap();
1868
1869        // Replace the file entirely so the marker (e1.event_id) no longer
1870        // appears anywhere — simulates corruption / hand-edit.
1871        let e2 = make_open_event("tj-r2", "after-corruption");
1872        let e3 = make_open_event("tj-r3", "after-corruption-2");
1873        let mut f = std::fs::File::create(&jsonl).unwrap();
1874        write_event_line(&mut f, &e2);
1875        write_event_line(&mut f, &e3);
1876        drop(f);
1877
1878        let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1879        assert_eq!(n, 2, "missing marker must trigger full rebuild");
1880    }
1881
1882    #[test]
1883    fn rebuild_state_and_ingest_new_events_produce_same_state() {
1884        let d = TempDir::new().unwrap();
1885        let jsonl_a = d.path().join("a.jsonl");
1886        let jsonl_b = d.path().join("b.jsonl");
1887        let db_a = d.path().join("a.sqlite");
1888        let db_b = d.path().join("b.sqlite");
1889
1890        let events: Vec<_> = (0..5)
1891            .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1892            .collect();
1893        for path in [&jsonl_a, &jsonl_b] {
1894            let mut f = std::fs::File::create(path).unwrap();
1895            for e in &events {
1896                write_event_line(&mut f, e);
1897            }
1898        }
1899
1900        let conn_a = open(&db_a).unwrap();
1901        let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1902
1903        let conn_b = open(&db_b).unwrap();
1904        let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1905
1906        assert_eq!(n_a, n_b);
1907        assert_eq!(n_a, 5);
1908
1909        for table in ["tasks", "events_index"] {
1910            let q = format!("SELECT COUNT(*) FROM {table}");
1911            let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1912            let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1913            assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1914        }
1915    }
1916
1917    #[test]
1918    fn rebuild_state_skips_malformed_jsonl_lines() {
1919        use std::io::Write;
1920        let d = TempDir::new().unwrap();
1921        let events_path = d.path().join("events.jsonl");
1922        let db_path = d.path().join("s.sqlite");
1923
1924        let mut f = std::fs::File::create(&events_path).unwrap();
1925
1926        let mut e1 = crate::event::Event::new(
1927            "tj-skip",
1928            crate::event::EventType::Open,
1929            crate::event::Author::User,
1930            crate::event::Source::Cli,
1931            "x".into(),
1932        );
1933        e1.meta = serde_json::json!({"title": "Skip test"});
1934        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1935
1936        // Garbage that is not even JSON.
1937        writeln!(f, "this is not a json event line").unwrap();
1938
1939        // Valid JSON but not a valid Event (missing required fields).
1940        writeln!(f, "{{\"foo\": 1}}").unwrap();
1941
1942        let e3 = crate::event::Event::new(
1943            "tj-skip",
1944            crate::event::EventType::Decision,
1945            crate::event::Author::Agent,
1946            crate::event::Source::Chat,
1947            "Adopt Rust".into(),
1948        );
1949        writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1950        drop(f);
1951
1952        let conn = open(&db_path).unwrap();
1953        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1954            .expect("rebuild_state must succeed despite malformed lines");
1955        assert_eq!(
1956            n, 2,
1957            "expected 2 valid events indexed (2 malformed skipped)"
1958        );
1959
1960        let indexed: i64 = conn
1961            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1962            .unwrap();
1963        assert_eq!(indexed, 2);
1964    }
1965
1966    #[test]
1967    fn rebuild_state_reads_jsonl_and_populates_db() {
1968        use std::io::Write;
1969        let d = TempDir::new().unwrap();
1970        let events_path = d.path().join("events.jsonl");
1971        let db_path = d.path().join("s.sqlite");
1972
1973        let mut f = std::fs::File::create(&events_path).unwrap();
1974        let mut e1 = crate::event::Event::new(
1975            "tj-9",
1976            crate::event::EventType::Open,
1977            crate::event::Author::User,
1978            crate::event::Source::Cli,
1979            "x".into(),
1980        );
1981        e1.meta = serde_json::json!({"title": "Nine"});
1982        let e2 = crate::event::Event::new(
1983            "tj-9",
1984            crate::event::EventType::Decision,
1985            crate::event::Author::Agent,
1986            crate::event::Source::Chat,
1987            "Adopt Rust".into(),
1988        );
1989        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1990        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1991        drop(f);
1992
1993        let conn = open(&db_path).unwrap();
1994        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1995        assert_eq!(n, 2);
1996
1997        let n: i64 = conn
1998            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1999            .unwrap();
2000        assert_eq!(n, 1);
2001        let n: i64 = conn
2002            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
2003            .unwrap();
2004        assert_eq!(n, 2);
2005    }
2006
2007    #[test]
2008    fn index_event_writes_index_and_fts() {
2009        let d = TempDir::new().unwrap();
2010        let conn = open(d.path().join("s.sqlite")).unwrap();
2011        let mut open_e = crate::event::Event::new(
2012            "tj-1",
2013            crate::event::EventType::Open,
2014            crate::event::Author::User,
2015            crate::event::Source::Cli,
2016            "Title".into(),
2017        );
2018        open_e.meta = serde_json::json!({"title": "Title"});
2019        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
2020        index_event(&conn, &open_e).unwrap();
2021
2022        let mut decision = crate::event::Event::new(
2023            "tj-1",
2024            crate::event::EventType::Decision,
2025            crate::event::Author::Agent,
2026            crate::event::Source::Chat,
2027            "Adopt Rust".into(),
2028        );
2029        decision.confidence = Some(0.92);
2030        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
2031        index_event(&conn, &decision).unwrap();
2032
2033        let count: i64 = conn
2034            .query_row(
2035                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
2036                rusqlite::params!["tj-1"],
2037                |r| r.get(0),
2038            )
2039            .unwrap();
2040        assert_eq!(count, 2);
2041
2042        let mut stmt = conn
2043            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
2044            .unwrap();
2045        let hits: Vec<String> = stmt
2046            .query_map(rusqlite::params!["Rust"], |r| {
2047                let s: String = r.get(0)?;
2048                Ok(s)
2049            })
2050            .unwrap()
2051            .collect::<Result<Vec<_>, _>>()
2052            .unwrap();
2053        assert_eq!(hits.len(), 1);
2054        assert_eq!(hits[0], decision.event_id);
2055    }
2056
2057    #[test]
2058    fn upsert_task_from_open_event_inserts_row() {
2059        let d = TempDir::new().unwrap();
2060        let conn = open(d.path().join("s.sqlite")).unwrap();
2061
2062        let mut e = crate::event::Event::new(
2063            "tj-7f3a",
2064            crate::event::EventType::Open,
2065            crate::event::Author::User,
2066            crate::event::Source::Cli,
2067            "Add OAuth".into(),
2068        );
2069        e.meta = serde_json::json!({ "title": "Add OAuth login" });
2070
2071        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
2072
2073        let (id, title, status): (String, String, String) = conn
2074            .query_row(
2075                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
2076                ["tj-7f3a"],
2077                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2078            )
2079            .unwrap();
2080
2081        assert_eq!(id, "tj-7f3a");
2082        assert_eq!(title, "Add OAuth login");
2083        assert_eq!(status, "open");
2084    }
2085
2086    #[test]
2087    fn migration_adds_parent_id_column_nullable() {
2088        let d = tempfile::TempDir::new().unwrap();
2089        let conn = open(d.path().join("s.sqlite")).unwrap();
2090
2091        // Seed a task via an open event (no parent).
2092        let e = make_open_event("tj-a", "Top");
2093        upsert_task_from_event(&conn, &e, "ph").unwrap();
2094
2095        let parent: Option<String> = conn
2096            .query_row(
2097                "SELECT parent_id FROM tasks WHERE task_id = ?1",
2098                rusqlite::params!["tj-a"],
2099                |r| r.get(0),
2100            )
2101            .unwrap();
2102        assert_eq!(parent, None);
2103    }
2104
2105    #[test]
2106    fn open_event_meta_parent_id_is_persisted() {
2107        let d = tempfile::TempDir::new().unwrap();
2108        let conn = open(d.path().join("s.sqlite")).unwrap();
2109
2110        // Parent first.
2111        upsert_task_from_event(&conn, &make_open_event("tj-parent", "Parent"), "ph").unwrap();
2112
2113        // Child carries meta.parent_id.
2114        let mut child = make_open_event("tj-child", "Child");
2115        child.meta = serde_json::json!({"title": "Child", "parent_id": "tj-parent"});
2116        upsert_task_from_event(&conn, &child, "ph").unwrap();
2117
2118        let parent: Option<String> = conn
2119            .query_row(
2120                "SELECT parent_id FROM tasks WHERE task_id = ?1",
2121                rusqlite::params!["tj-child"],
2122                |r| r.get(0),
2123            )
2124            .unwrap();
2125        assert_eq!(parent.as_deref(), Some("tj-parent"));
2126    }
2127
2128    #[test]
2129    fn children_of_and_parent_of_work() {
2130        let d = tempfile::TempDir::new().unwrap();
2131        let conn = open(d.path().join("s.sqlite")).unwrap();
2132        upsert_task_from_event(&conn, &make_open_event("p", "Parent"), "ph").unwrap();
2133
2134        let mut c1 = make_open_event("c1", "Child1");
2135        c1.meta = serde_json::json!({"title": "Child1", "parent_id": "p"});
2136        upsert_task_from_event(&conn, &c1, "ph").unwrap();
2137        let mut c2 = make_open_event("c2", "Child2");
2138        c2.meta = serde_json::json!({"title": "Child2", "parent_id": "p"});
2139        upsert_task_from_event(&conn, &c2, "ph").unwrap();
2140
2141        let kids = children_of(&conn, "p").unwrap();
2142        let ids: Vec<&str> = kids.iter().map(|t| t.task_id.as_str()).collect();
2143        assert!(ids.contains(&"c1") && ids.contains(&"c2"));
2144        assert_eq!(kids.len(), 2);
2145
2146        assert_eq!(parent_of(&conn, "c1").unwrap().as_deref(), Some("p"));
2147        assert_eq!(parent_of(&conn, "p").unwrap(), None);
2148    }
2149
2150    #[test]
2151    fn cycle_guard_rejects_self_and_ancestor() {
2152        let d = tempfile::TempDir::new().unwrap();
2153        let conn = open(d.path().join("s.sqlite")).unwrap();
2154        upsert_task_from_event(&conn, &make_open_event("a", "A"), "ph").unwrap();
2155        let mut b = make_open_event("b", "B");
2156        b.meta = serde_json::json!({"title": "B", "parent_id": "a"});
2157        upsert_task_from_event(&conn, &b, "ph").unwrap();
2158
2159        // a is b's ancestor → making a a child of b is a cycle.
2160        assert!(would_create_cycle(&conn, "a", "b").unwrap());
2161        // self-parent is a cycle.
2162        assert!(would_create_cycle(&conn, "a", "a").unwrap());
2163        // unrelated parent is fine.
2164        upsert_task_from_event(&conn, &make_open_event("x", "X"), "ph").unwrap();
2165        assert!(!would_create_cycle(&conn, "x", "a").unwrap());
2166    }
2167
2168    #[test]
2169    fn invalidate_cascade_clears_parent_pack() {
2170        let d = tempfile::TempDir::new().unwrap();
2171        let conn = open(d.path().join("s.sqlite")).unwrap();
2172        upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2173        let mut c = make_open_event("c", "C");
2174        c.meta = serde_json::json!({"title": "C", "parent_id": "p"});
2175        upsert_task_from_event(&conn, &c, "ph").unwrap();
2176
2177        // Seed pack cache rows for both.
2178        for id in ["p", "c"] {
2179            conn.execute(
2180                "INSERT INTO task_pack_cache(task_id, mode, text, generated_at, source_event_count)
2181                 VALUES (?1, 'compact', 'x', '2026-01-01T00:00:00Z', 1)",
2182                rusqlite::params![id],
2183            )
2184            .unwrap();
2185        }
2186
2187        invalidate_pack_cascade(&conn, "c").unwrap();
2188
2189        let remaining: i64 = conn
2190            .query_row("SELECT COUNT(*) FROM task_pack_cache", [], |r| r.get(0))
2191            .unwrap();
2192        assert_eq!(remaining, 0, "both child and parent pack caches cleared");
2193    }
2194
2195    #[test]
2196    fn count_open_children_counts_only_open() {
2197        let d = tempfile::TempDir::new().unwrap();
2198        let conn = open(d.path().join("s.sqlite")).unwrap();
2199        upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2200        let mut c1 = make_open_event("c1", "C1");
2201        c1.meta = serde_json::json!({"title": "C1", "parent_id": "p"});
2202        upsert_task_from_event(&conn, &c1, "ph").unwrap();
2203        // Close c1.
2204        let mut close = crate::event::Event::new(
2205            "c1",
2206            crate::event::EventType::Close,
2207            crate::event::Author::User,
2208            crate::event::Source::Cli,
2209            "done".into(),
2210        );
2211        close.timestamp = "2026-01-02T00:00:00Z".into();
2212        upsert_task_from_event(&conn, &close, "ph").unwrap();
2213        let mut c2 = make_open_event("c2", "C2");
2214        c2.meta = serde_json::json!({"title": "C2", "parent_id": "p"});
2215        upsert_task_from_event(&conn, &c2, "ph").unwrap();
2216
2217        assert_eq!(count_open_children(&conn, "p").unwrap(), 1); // only c2
2218    }
2219}