Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6/// One forward-only schema migration. Migrations are applied in `version`
7/// order; each is recorded in `schema_migrations` so re-running `open()`
8/// is idempotent.
9struct Migration {
10    version: i64,
11    sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16  task_id        TEXT PRIMARY KEY,
17  title          TEXT NOT NULL,
18  status         TEXT NOT NULL,
19  project_hash   TEXT NOT NULL,
20  opened_at      TEXT NOT NULL,
21  closed_at      TEXT,
22  last_event_at  TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27  event_id    TEXT PRIMARY KEY,
28  task_id     TEXT NOT NULL,
29  type        TEXT NOT NULL,
30  timestamp   TEXT NOT NULL,
31  confidence  REAL,
32  status      TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37  decision_id    TEXT PRIMARY KEY,
38  task_id        TEXT NOT NULL,
39  text           TEXT NOT NULL,
40  status         TEXT NOT NULL,
41  superseded_by  TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45  evidence_id           TEXT PRIMARY KEY,
46  task_id               TEXT NOT NULL,
47  text                  TEXT NOT NULL,
48  strength              TEXT NOT NULL,
49  refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53  task_id             TEXT NOT NULL,
54  mode                TEXT NOT NULL,
55  text                TEXT NOT NULL,
56  generated_at        TEXT NOT NULL,
57  source_event_count  INTEGER NOT NULL,
58  PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62  task_id UNINDEXED,
63  event_id UNINDEXED,
64  text,
65  type
66);
67"#;
68
69/// Tracks how far we've ingested the JSONL log per project so subsequent
70/// `ingest_new_events` calls can read only the tail rather than rescanning
71/// the entire file. `last_indexed_event_id` is the `event_id` of the most
72/// recent event written to `events_index`.
73const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75  project_hash          TEXT PRIMARY KEY,
76  last_indexed_event_id TEXT NOT NULL,
77  updated_at            TEXT NOT NULL
78);
79"#;
80
81/// v0.4.0 task-as-goal redesign: explicit goal/outcome on tasks +
82/// typed artifacts on events. NULLable so existing rows survive
83/// without backfill. Wipes the pack cache so old packs (rendered
84/// without Goal/Outcome blocks) regenerate on next view.
85const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal        TEXT;
87ALTER TABLE tasks ADD COLUMN outcome     TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external    TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94// v0.5.0 Phase B — artifacts auto-extract on ingest. The column was
95// added in v003 but stayed NULL for everyone; v004 just wipes the
96// pack cache so newly-extracted artifacts surface in the next pack
97// render. Existing events stay NULL until `reclassify` (Phase B+) or
98// `rebuild-state` is run.
99const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103/// All schema migrations in version order. Append new entries here; never
104/// edit a published migration's `sql` — write a new one instead.
105const MIGRATIONS: &[Migration] = &[
106    Migration {
107        version: 1,
108        sql: MIGRATION_001,
109    },
110    Migration {
111        version: 2,
112        sql: MIGRATION_002,
113    },
114    Migration {
115        version: 3,
116        sql: MIGRATION_003,
117    },
118    Migration {
119        version: 4,
120        sql: MIGRATION_004,
121    },
122];
123
124fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
125    conn.execute_batch(
126        "CREATE TABLE IF NOT EXISTS schema_migrations (
127            version    INTEGER PRIMARY KEY,
128            applied_at TEXT NOT NULL
129        )",
130    )
131    .context("create schema_migrations table")?;
132
133    let applied: HashSet<i64> = {
134        let mut stmt = conn
135            .prepare("SELECT version FROM schema_migrations")
136            .context("select applied versions")?;
137        let rows = stmt
138            .query_map([], |r| r.get::<_, i64>(0))
139            .context("iterate schema_migrations")?;
140        rows.collect::<rusqlite::Result<HashSet<_>>>()
141            .context("collect applied versions")?
142    };
143
144    for migration in MIGRATIONS {
145        if applied.contains(&migration.version) {
146            continue;
147        }
148        conn.execute_batch(migration.sql)
149            .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
150        conn.execute(
151            "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
152            rusqlite::params![
153                migration.version,
154                chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
155            ],
156        )
157        .with_context(|| {
158            format!(
159                "record schema migration v{:03} as applied",
160                migration.version
161            )
162        })?;
163    }
164    Ok(())
165}
166
167use crate::event::{Event, EventType};
168
169pub fn upsert_task_from_event(
170    conn: &Connection,
171    event: &Event,
172    project_hash: &str,
173) -> anyhow::Result<()> {
174    match event.event_type {
175        EventType::Open => {
176            let title = event
177                .meta
178                .get("title")
179                .and_then(|v| v.as_str())
180                .unwrap_or(&event.text)
181                .to_string();
182            conn.execute(
183                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
184                 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
185                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
186                rusqlite::params![event.task_id, title, project_hash, event.timestamp],
187            )?;
188        }
189        EventType::Close => {
190            conn.execute(
191                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
192                rusqlite::params![event.task_id, event.timestamp],
193            )?;
194        }
195        EventType::Reopen => {
196            conn.execute(
197                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
198                rusqlite::params![event.task_id, event.timestamp],
199            )?;
200        }
201        _ => {
202            conn.execute(
203                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
204                rusqlite::params![event.task_id, event.timestamp],
205            )?;
206        }
207    }
208    Ok(())
209}
210
211use std::io::BufRead;
212
213pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
214    let dir = state_dir.as_ref();
215    if !dir.exists() {
216        return Ok(vec![]);
217    }
218    let mut out = Vec::new();
219    for entry in std::fs::read_dir(dir)? {
220        let entry = entry?;
221        let path = entry.path();
222        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
223            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
224                out.push(stem.to_string());
225            }
226        }
227    }
228    Ok(out)
229}
230
231pub fn rebuild_state(
232    conn: &Connection,
233    jsonl_path: impl AsRef<Path>,
234    project_hash: &str,
235) -> anyhow::Result<usize> {
236    let f = std::fs::File::open(&jsonl_path)
237        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
238    let reader = std::io::BufReader::new(f);
239
240    let tx = conn.unchecked_transaction()?;
241    let mut count = 0;
242    let mut last_event_id: Option<String> = None;
243    for (i, line) in reader.lines().enumerate() {
244        let line = line.with_context(|| format!("read line {i}"))?;
245        if line.trim().is_empty() {
246            continue;
247        }
248        // Malformed JSONL lines are skipped with a warning so that one bad
249        // event cannot abort an otherwise-recoverable rebuild. SQL errors
250        // still propagate — those indicate schema/integrity problems.
251        let event: Event = match serde_json::from_str(&line) {
252            Ok(e) => e,
253            Err(err) => {
254                tracing::warn!(
255                    line_number = i + 1,
256                    error = %err,
257                    "skipping malformed JSONL line in rebuild_state"
258                );
259                continue;
260            }
261        };
262        upsert_task_from_event(&tx, &event, project_hash)?;
263        index_event(&tx, &event)?;
264        last_event_id = Some(event.event_id.clone());
265        count += 1;
266    }
267    if let Some(eid) = last_event_id.as_deref() {
268        record_last_indexed(&tx, project_hash, eid)?;
269    }
270    tx.commit()?;
271    Ok(count)
272}
273
274/// Returns whether a task with this id has been recorded in the derived
275/// state. Cheap O(1) lookup against the `tasks` primary key. Callers
276/// should run [`ingest_new_events`] first if they want to see the latest
277/// JSONL state.
278pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
279    let count: i64 = conn.query_row(
280        "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
281        rusqlite::params![task_id],
282        |r| r.get(0),
283    )?;
284    Ok(count > 0)
285}
286
287/// Status string for an existing task (e.g. "open", "closed"). Returns
288/// `None` when the task is unknown — caller decides whether that's a
289/// hard error or a route-to-pending case.
290pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
291    let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
292    let mut rows = stmt.query(rusqlite::params![task_id])?;
293    Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
294}
295
296/// Set or replace `tasks.goal` for an existing task. Caller is
297/// expected to have validated the task exists (via `task_exists`); we
298/// don't error on no-op rows so the upsert pattern is uniform.
299pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
300    conn.execute(
301        "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
302        rusqlite::params![goal, task_id],
303    )
304    .with_context(|| format!("set goal for {task_id}"))?;
305    // Pack cache is now stale for this task — drop the entry so the
306    // next render picks up the new goal.
307    conn.execute(
308        "DELETE FROM task_pack_cache WHERE task_id = ?1",
309        rusqlite::params![task_id],
310    )?;
311    Ok(())
312}
313
314/// Set or replace the closure metadata. Pass `None` for `outcome_tag`
315/// to leave it unset; pass `Some("done"|"abandoned"|"superseded")`
316/// for a structured tag. Free-text `outcome` is the primary field.
317pub fn set_task_outcome(
318    conn: &Connection,
319    task_id: &str,
320    outcome: &str,
321    outcome_tag: Option<&str>,
322) -> anyhow::Result<()> {
323    conn.execute(
324        "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
325        rusqlite::params![outcome, outcome_tag, task_id],
326    )
327    .with_context(|| format!("set outcome for {task_id}"))?;
328    conn.execute(
329        "DELETE FROM task_pack_cache WHERE task_id = ?1",
330        rusqlite::params![task_id],
331    )?;
332    Ok(())
333}
334
335/// Append an external reference to `tasks.external`. The column is
336/// stored as a comma-separated list — small, append-mostly, no
337/// uniqueness constraint. Acceptable shapes (loose, not enforced):
338/// `beads:claude-memory-rsw`, `github:#42`, `jira:PROJ-1234`.
339pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
340    let current: Option<String> = conn
341        .query_row(
342            "SELECT external FROM tasks WHERE task_id = ?1",
343            rusqlite::params![task_id],
344            |r| r.get::<_, Option<String>>(0),
345        )
346        .with_context(|| format!("read external for {task_id}"))?;
347    let next = match current {
348        Some(s) if !s.is_empty() => format!("{s},{reference}"),
349        _ => reference.to_string(),
350    };
351    conn.execute(
352        "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
353        rusqlite::params![next, task_id],
354    )?;
355    conn.execute(
356        "DELETE FROM task_pack_cache WHERE task_id = ?1",
357        rusqlite::params![task_id],
358    )?;
359    Ok(())
360}
361
362/// Read-only metadata bundle used by pack rendering (and TUI list
363/// teasers in v0.4.0+). Returns `None` for unknown tasks.
364#[derive(Debug, Clone, Default)]
365pub struct TaskMetadata {
366    pub goal: Option<String>,
367    pub outcome: Option<String>,
368    pub outcome_tag: Option<String>,
369    pub external: Option<String>,
370}
371
372pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
373    let mut stmt =
374        conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
375    let mut rows = stmt.query(rusqlite::params![task_id])?;
376    Ok(match rows.next()? {
377        Some(r) => Some(TaskMetadata {
378            goal: r.get::<_, Option<String>>(0)?,
379            outcome: r.get::<_, Option<String>>(1)?,
380            outcome_tag: r.get::<_, Option<String>>(2)?,
381            external: r.get::<_, Option<String>>(3)?,
382        }),
383        None => None,
384    })
385}
386
387/// One row of the stale-task report: an open task whose last event
388/// crossed the inactivity threshold.
389#[derive(Debug, Clone)]
390pub struct StaleTask {
391    pub task_id: String,
392    pub title: String,
393    pub last_event_at: String,
394    pub days_idle: i64,
395}
396
397/// Find open tasks with no event in the last `days` days. Sorted by
398/// idle time descending so the user sees the most ancient first.
399pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
400    let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
401    let cutoff_str = cutoff.to_rfc3339();
402    let mut stmt = conn.prepare(
403        "SELECT task_id, title, last_event_at FROM tasks
404         WHERE status = 'open' AND last_event_at < ?1
405         ORDER BY last_event_at ASC",
406    )?;
407    let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
408        Ok((
409            r.get::<_, String>(0)?,
410            r.get::<_, String>(1)?,
411            r.get::<_, String>(2)?,
412        ))
413    })?;
414    let now = chrono::Utc::now();
415    let mut out = Vec::new();
416    for row in rows {
417        let (task_id, title, last_at) = row?;
418        let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
419            .map(|d| d.with_timezone(&chrono::Utc))
420            .unwrap_or(now);
421        let days_idle = (now - dt).num_days();
422        out.push(StaleTask {
423            task_id,
424            title,
425            last_event_at: last_at,
426            days_idle,
427        });
428    }
429    Ok(out)
430}
431
432/// Score-weighted relationship between a fresh prompt's artifacts and
433/// every prior task's artifacts. Higher score = stronger continuation
434/// signal. Threshold tuning is the caller's job; v0.6.0 auto-link
435/// keeps anything with score > 0.0.
436#[derive(Debug, Clone)]
437pub struct RelatedTask {
438    pub task_id: String,
439    pub status: String,
440    pub score: f64,
441}
442
443/// Find tasks whose events overlap the given artifacts on any
444/// dimension we have a signal for. Weights:
445///   shared linked_issue → +1.0   (strongest, ticket id is unique)
446///   shared commit_hash  → +0.8   (commits are nearly unique)
447///   shared file path    → +0.3   (files churn across tasks)
448///
449/// The scan reads `events_index.artifacts` (JSON) directly with LIKE
450/// substring matches — JSON1 would be cleaner but keeps the codepath
451/// dependency-free. Returns top hits sorted by score desc; ties keep
452/// the most-recent task first.
453pub fn find_related_tasks(
454    conn: &Connection,
455    arts: &crate::artifacts::Artifacts,
456) -> anyhow::Result<Vec<RelatedTask>> {
457    use std::collections::HashMap;
458    if arts.is_empty() {
459        return Ok(Vec::new());
460    }
461    let mut scores: HashMap<String, f64> = HashMap::new();
462    let mut last_seen: HashMap<String, String> = HashMap::new();
463
464    let needles: Vec<(String, f64)> = arts
465        .linked_issues
466        .iter()
467        .map(|s| (s.clone(), 1.0))
468        .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
469        .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
470        .collect();
471
472    for (needle, weight) in needles {
473        let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
474        let mut stmt = conn.prepare(
475            "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
476             WHERE artifacts LIKE ?1
477             GROUP BY task_id
478             ORDER BY ts DESC",
479        )?;
480        let rows = stmt.query_map(rusqlite::params![pattern], |r| {
481            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
482        })?;
483        for row in rows {
484            let (id, ts) = row?;
485            *scores.entry(id.clone()).or_insert(0.0) += weight;
486            last_seen.insert(id, ts);
487        }
488    }
489
490    let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
491    for (id, score) in scores {
492        let status: Option<String> = conn
493            .query_row(
494                "SELECT status FROM tasks WHERE task_id = ?1",
495                rusqlite::params![&id],
496                |r| r.get(0),
497            )
498            .ok();
499        if let Some(status) = status {
500            out.push(RelatedTask {
501                task_id: id,
502                status,
503                score,
504            });
505        }
506    }
507    out.sort_by(|a, b| {
508        b.score
509            .partial_cmp(&a.score)
510            .unwrap_or(std::cmp::Ordering::Equal)
511            .then_with(|| {
512                let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
513                let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
514                ts_b.cmp(&ts_a)
515            })
516    });
517    Ok(out)
518}
519
520/// Find tasks (open or closed) whose events reference any of the given
521/// issue identifiers (FIN-868, JIRA-123, INC-7…). Looks at the
522/// per-event `artifacts.linked_issues` column populated on ingest.
523/// Returns `(task_id, status)` deduplicated, most-recent first. Used
524/// by the v0.5.0 Phase C auto-link flow to recognise that a fresh
525/// prompt is a continuation of a prior task.
526pub fn find_tasks_by_linked_issues(
527    conn: &Connection,
528    issues: &[String],
529) -> anyhow::Result<Vec<(String, String)>> {
530    if issues.is_empty() {
531        return Ok(Vec::new());
532    }
533    // Stage A: collect candidate task_ids whose events_index.artifacts
534    // contains any of the requested issue strings. JSON1 is overkill
535    // here — a substring LIKE on the raw JSON is correct given the
536    // ticket id format ("FIN-868") never appears outside its own
537    // linked_issues array.
538    let mut candidate_ids: Vec<String> = Vec::new();
539    for issue in issues {
540        let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
541        let mut stmt = conn.prepare(
542            "SELECT DISTINCT task_id FROM events_index
543             WHERE artifacts LIKE ?1
544             ORDER BY timestamp DESC",
545        )?;
546        let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
547        for r in rows {
548            let id = r?;
549            if !candidate_ids.contains(&id) {
550                candidate_ids.push(id);
551            }
552        }
553    }
554    // Stage B: hydrate status for each candidate.
555    let mut out = Vec::with_capacity(candidate_ids.len());
556    for id in candidate_ids {
557        let status: Option<String> = conn
558            .query_row(
559                "SELECT status FROM tasks WHERE task_id = ?1",
560                rusqlite::params![&id],
561                |r| r.get(0),
562            )
563            .ok();
564        if let Some(s) = status {
565            out.push((id, s));
566        }
567    }
568    Ok(out)
569}
570
571/// Re-run artifact extraction over every event of a task and write the
572/// result back to `events_index.artifacts`. Used to backfill events
573/// that were ingested before Phase B landed. Returns the number of
574/// events touched. Wipes the pack cache for the task so the next
575/// render reflects the freshly extracted artifacts.
576pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
577    let mut stmt = conn.prepare(
578        "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
579         LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
580         WHERE ei.task_id = ?1",
581    )?;
582    let rows: Vec<(String, String)> = stmt
583        .query_map(rusqlite::params![task_id], |r| {
584            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
585        })?
586        .collect::<Result<_, _>>()?;
587    let count = rows.len();
588    for (event_id, text) in rows {
589        let arts = crate::artifacts::extract(&text);
590        let json = if arts.is_empty() {
591            None
592        } else {
593            Some(serde_json::to_string(&arts)?)
594        };
595        conn.execute(
596            "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
597            rusqlite::params![json, event_id],
598        )?;
599    }
600    conn.execute(
601        "DELETE FROM task_pack_cache WHERE task_id = ?1",
602        rusqlite::params![task_id],
603    )?;
604    Ok(count)
605}
606
607/// Aggregate artifacts (commit hashes, PR URLs, ticket IDs, files,
608/// branches) across every event of a task, deduplicated. Reads the
609/// per-event JSON payload that `ingest_new_events` populated. Skips
610/// events whose `artifacts` column is NULL or unparseable rather than
611/// failing the pack render.
612pub fn task_artifacts(
613    conn: &Connection,
614    task_id: &str,
615) -> anyhow::Result<crate::artifacts::Artifacts> {
616    let mut stmt = conn.prepare(
617        "SELECT artifacts FROM events_index
618         WHERE task_id = ?1 AND artifacts IS NOT NULL
619         ORDER BY timestamp ASC",
620    )?;
621    let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
622    let mut acc = crate::artifacts::Artifacts::default();
623    for row in rows {
624        let json = row?;
625        if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
626            acc.merge(parsed);
627        }
628    }
629    Ok(acc)
630}
631
632/// Look up the most recent `event_id` we've ingested for this project.
633/// Returns `None` when the project has never been indexed (first call,
634/// or migration v002 just landed on an existing 0.1.x DB).
635fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
636    let mut stmt =
637        conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
638    let mut rows = stmt.query(rusqlite::params![project_hash])?;
639    if let Some(row) = rows.next()? {
640        Ok(Some(row.get::<_, String>(0)?))
641    } else {
642        Ok(None)
643    }
644}
645
646fn record_last_indexed(
647    conn: &Connection,
648    project_hash: &str,
649    event_id: &str,
650) -> anyhow::Result<()> {
651    conn.execute(
652        "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
653         VALUES (?1, ?2, ?3)
654         ON CONFLICT(project_hash) DO UPDATE SET
655             last_indexed_event_id = excluded.last_indexed_event_id,
656             updated_at = excluded.updated_at",
657        rusqlite::params![
658            project_hash,
659            event_id,
660            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
661        ],
662    )?;
663    Ok(())
664}
665
666/// Read only the tail of the JSONL log since the last call. The cheap path
667/// for hot loops (every MCP tool invocation): scan to the marker, ingest
668/// the rest, update the marker.
669///
670/// Falls back to a full [`rebuild_state`] in two cases:
671/// - No marker yet for this project (first call after migration v002 or
672///   on a brand-new install).
673/// - The stored marker is not present in the JSONL (corrupted / truncated
674///   file). A `tracing::warn!` is emitted so the operator notices.
675pub fn ingest_new_events(
676    conn: &Connection,
677    jsonl_path: impl AsRef<Path>,
678    project_hash: &str,
679) -> anyhow::Result<usize> {
680    let marker = match last_indexed_event_id(conn, project_hash)? {
681        Some(id) => id,
682        None => return rebuild_state(conn, jsonl_path, project_hash),
683    };
684
685    let f = std::fs::File::open(&jsonl_path)
686        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
687    let reader = std::io::BufReader::new(f);
688
689    // First pass: confirm the marker still exists in the file. If it does
690    // not, the JSONL has been rewritten under us — we can't trust the
691    // marker, so we fall back to a full rebuild.
692    let tx = conn.unchecked_transaction()?;
693    let mut found_marker = false;
694    let mut count = 0;
695    let mut last_event_id: Option<String> = None;
696    for (i, line) in reader.lines().enumerate() {
697        let line = line.with_context(|| format!("read line {i}"))?;
698        if line.trim().is_empty() {
699            continue;
700        }
701        let event: Event = match serde_json::from_str(&line) {
702            Ok(e) => e,
703            Err(err) => {
704                tracing::warn!(
705                    line_number = i + 1,
706                    error = %err,
707                    "skipping malformed JSONL line in ingest_new_events"
708                );
709                continue;
710            }
711        };
712        if !found_marker {
713            if event.event_id == marker {
714                found_marker = true;
715            }
716            continue;
717        }
718        upsert_task_from_event(&tx, &event, project_hash)?;
719        index_event(&tx, &event)?;
720        last_event_id = Some(event.event_id.clone());
721        count += 1;
722    }
723
724    if !found_marker {
725        // Discard the (empty) tx and rebuild from scratch.
726        drop(tx);
727        tracing::warn!(
728            project_hash = project_hash,
729            marker = marker.as_str(),
730            "last_indexed_event_id not found in JSONL — falling back to full rebuild"
731        );
732        return rebuild_state(conn, jsonl_path, project_hash);
733    }
734
735    if let Some(eid) = last_event_id.as_deref() {
736        record_last_indexed(&tx, project_hash, eid)?;
737    }
738    tx.commit()?;
739    Ok(count)
740}
741
742pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
743    let type_str = serde_json::to_value(event.event_type)?
744        .as_str()
745        .unwrap()
746        .to_string();
747    let status_str = serde_json::to_value(event.status)?
748        .as_str()
749        .unwrap()
750        .to_string();
751    // v0.5.0 Phase B: scrape artifacts (commit hashes, PR URLs, ticket
752    // IDs, file paths, branch names) out of the event text. Storing
753    // per-event so reclassify can recompute without touching foreign
754    // events; pack aggregates and dedupes across events at render time.
755    let artifacts = crate::artifacts::extract(&event.text);
756    let artifacts_json = if artifacts.is_empty() {
757        None
758    } else {
759        Some(serde_json::to_string(&artifacts)?)
760    };
761    conn.execute(
762        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
763         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
764        rusqlite::params![
765            event.event_id, event.task_id, type_str,
766            event.timestamp, event.confidence, status_str, artifacts_json
767        ],
768    )?;
769    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
770    conn.execute(
771        "DELETE FROM search_fts WHERE event_id=?1",
772        rusqlite::params![event.event_id],
773    )?;
774    conn.execute(
775        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
776        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
777    )?;
778
779    if event.event_type == EventType::Decision {
780        conn.execute(
781            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
782             VALUES (?1, ?2, ?3, 'active')",
783            rusqlite::params![event.event_id, event.task_id, event.text],
784        )?;
785    }
786
787    if event.event_type == EventType::Supersede {
788        if let Some(target) = &event.supersedes {
789            conn.execute(
790                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
791                rusqlite::params![event.event_id, target],
792            )?;
793        }
794    }
795
796    if event.event_type == EventType::Evidence {
797        let strength_str = event
798            .evidence_strength
799            .map(|s| {
800                serde_json::to_value(s)
801                    .unwrap()
802                    .as_str()
803                    .unwrap()
804                    .to_string()
805            })
806            .unwrap_or_else(|| "medium".into());
807        conn.execute(
808            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
809             VALUES (?1, ?2, ?3, ?4)",
810            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
811        )?;
812    }
813
814    // Invalidate any cached pack for this task.
815    conn.execute(
816        "DELETE FROM task_pack_cache WHERE task_id=?1",
817        rusqlite::params![event.task_id],
818    )?;
819
820    Ok(())
821}
822
823pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
824    if let Some(parent) = path.as_ref().parent() {
825        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
826    }
827    let conn =
828        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
829    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
830    apply_migrations(&conn).context("apply schema migrations")?;
831    Ok(conn)
832}
833
834/// One row of the task list rendered by the TUI: enough to render the
835/// list view without round-tripping for each task. `event_count` joins
836/// `events_index` so we don't need a second query per row.
837#[derive(Debug, Clone)]
838pub struct TaskRow {
839    pub task_id: String,
840    pub title: String,
841    pub status: String,
842    pub last_event_at: String,
843    pub event_count: usize,
844}
845
846/// All tasks for a project, ordered with open ones first (by recency)
847/// then closed ones. The TUI list view binds directly to this — there
848/// is no other consumer, so the shape is tuned for that callsite.
849pub fn list_tasks_by_project(
850    conn: &Connection,
851    project_hash: &str,
852) -> anyhow::Result<Vec<TaskRow>> {
853    let mut stmt = conn.prepare(
854        "SELECT t.task_id, t.title, t.status, t.last_event_at,
855                COALESCE(c.cnt, 0) AS event_count
856         FROM tasks t
857         LEFT JOIN (
858             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
859         ) c ON c.task_id = t.task_id
860         WHERE t.project_hash = ?1
861         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
862    )?;
863    let rows = stmt
864        .query_map(rusqlite::params![project_hash], |r| {
865            Ok(TaskRow {
866                task_id: r.get::<_, String>(0)?,
867                title: r.get::<_, String>(1)?,
868                status: r.get::<_, String>(2)?,
869                last_event_at: r.get::<_, String>(3)?,
870                event_count: r.get::<_, i64>(4)? as usize,
871            })
872        })?
873        .collect::<Result<Vec<_>, _>>()?;
874    Ok(rows)
875}
876
877#[cfg(test)]
878mod tests {
879    use super::*;
880    use tempfile::TempDir;
881
882    #[test]
883    fn task_exists_returns_true_for_known_id_false_otherwise() {
884        let d = TempDir::new().unwrap();
885        let conn = open(d.path().join("s.sqlite")).unwrap();
886
887        assert!(!task_exists(&conn, "tj-nope").unwrap());
888
889        let e = make_open_event("tj-yes", "Hello");
890        upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
891        index_event(&conn, &e).unwrap();
892
893        assert!(task_exists(&conn, "tj-yes").unwrap());
894        assert!(!task_exists(&conn, "tj-nope").unwrap());
895    }
896
897    #[test]
898    fn fresh_db_runs_all_migrations() {
899        let d = TempDir::new().unwrap();
900        let p = d.path().join("state.sqlite");
901        let conn = open(&p).unwrap();
902
903        let applied: Vec<i64> = conn
904            .prepare("SELECT version FROM schema_migrations ORDER BY version")
905            .unwrap()
906            .query_map([], |r| r.get::<_, i64>(0))
907            .unwrap()
908            .collect::<Result<_, _>>()
909            .unwrap();
910        assert_eq!(
911            applied,
912            (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
913            "every declared migration must be recorded"
914        );
915    }
916
917    #[test]
918    fn apply_migrations_is_idempotent_across_reopens() {
919        let d = TempDir::new().unwrap();
920        let p = d.path().join("state.sqlite");
921        let _ = open(&p).unwrap();
922        let _ = open(&p).unwrap();
923
924        let count: i64 = open(&p)
925            .unwrap()
926            .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
927            .unwrap();
928        assert_eq!(
929            count,
930            MIGRATIONS.len() as i64,
931            "schema_migrations must contain exactly one row per declared migration after repeated opens"
932        );
933    }
934
935    #[test]
936    fn open_creates_all_tables() {
937        let d = TempDir::new().unwrap();
938        let p = d.path().join("state.sqlite");
939        let conn = open(&p).unwrap();
940
941        let names: Vec<String> = conn
942            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
943            .unwrap()
944            .query_map([], |r| r.get::<_, String>(0))
945            .unwrap()
946            .collect::<Result<_, _>>()
947            .unwrap();
948
949        for required in [
950            "decisions",
951            "events_index",
952            "evidence",
953            "task_pack_cache",
954            "tasks",
955            "search_fts",
956        ] {
957            assert!(
958                names.iter().any(|n| n == required),
959                "missing table {required}, have {names:?}"
960            );
961        }
962    }
963
964    #[test]
965    fn open_is_idempotent() {
966        let d = TempDir::new().unwrap();
967        let p = d.path().join("state.sqlite");
968        let _ = open(&p).unwrap();
969        let _ = open(&p).unwrap();
970    }
971
972    #[test]
973    fn index_event_projects_evidence() {
974        let d = TempDir::new().unwrap();
975        let conn = open(d.path().join("s.sqlite")).unwrap();
976        let mut open_e = crate::event::Event::new(
977            "tj-e",
978            crate::event::EventType::Open,
979            crate::event::Author::User,
980            crate::event::Source::Cli,
981            "x".into(),
982        );
983        open_e.meta = serde_json::json!({"title": "T"});
984        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
985        index_event(&conn, &open_e).unwrap();
986
987        let mut ev = crate::event::Event::new(
988            "tj-e",
989            crate::event::EventType::Evidence,
990            crate::event::Author::Agent,
991            crate::event::Source::Chat,
992            "Hook startup measured at 12ms".into(),
993        );
994        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
995        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
996        index_event(&conn, &ev).unwrap();
997
998        let (text, strength): (String, String) = conn
999            .query_row(
1000                "SELECT text, strength FROM evidence WHERE task_id=?1",
1001                rusqlite::params!["tj-e"],
1002                |r| Ok((r.get(0)?, r.get(1)?)),
1003            )
1004            .unwrap();
1005        assert!(text.contains("12ms"));
1006        assert_eq!(strength, "strong");
1007    }
1008
1009    #[test]
1010    fn supersede_event_marks_decision_superseded() {
1011        let d = TempDir::new().unwrap();
1012        let conn = open(d.path().join("s.sqlite")).unwrap();
1013        let mut open_e = crate::event::Event::new(
1014            "tj-s",
1015            crate::event::EventType::Open,
1016            crate::event::Author::User,
1017            crate::event::Source::Cli,
1018            "x".into(),
1019        );
1020        open_e.meta = serde_json::json!({"title": "T"});
1021        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1022        index_event(&conn, &open_e).unwrap();
1023
1024        let dec = crate::event::Event::new(
1025            "tj-s",
1026            crate::event::EventType::Decision,
1027            crate::event::Author::Agent,
1028            crate::event::Source::Chat,
1029            "Use TS".into(),
1030        );
1031        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1032        index_event(&conn, &dec).unwrap();
1033
1034        let mut sup = crate::event::Event::new(
1035            "tj-s",
1036            crate::event::EventType::Supersede,
1037            crate::event::Author::Agent,
1038            crate::event::Source::Chat,
1039            "Replaced by Rust decision".into(),
1040        );
1041        sup.supersedes = Some(dec.event_id.clone());
1042        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1043        index_event(&conn, &sup).unwrap();
1044
1045        let (status, by): (String, Option<String>) = conn
1046            .query_row(
1047                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1048                rusqlite::params![dec.event_id],
1049                |r| Ok((r.get(0)?, r.get(1)?)),
1050            )
1051            .unwrap();
1052        assert_eq!(status, "superseded");
1053        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1054    }
1055
1056    #[test]
1057    fn index_event_projects_decision_to_decisions_table() {
1058        let d = TempDir::new().unwrap();
1059        let conn = open(d.path().join("s.sqlite")).unwrap();
1060
1061        let mut open_e = crate::event::Event::new(
1062            "tj-d",
1063            crate::event::EventType::Open,
1064            crate::event::Author::User,
1065            crate::event::Source::Cli,
1066            "x".into(),
1067        );
1068        open_e.meta = serde_json::json!({"title": "T"});
1069        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1070        index_event(&conn, &open_e).unwrap();
1071
1072        let dec = crate::event::Event::new(
1073            "tj-d",
1074            crate::event::EventType::Decision,
1075            crate::event::Author::Agent,
1076            crate::event::Source::Chat,
1077            "Adopt Rust".into(),
1078        );
1079        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1080        index_event(&conn, &dec).unwrap();
1081
1082        let (id, text, status): (String, String, String) = conn
1083            .query_row(
1084                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1085                rusqlite::params!["tj-d"],
1086                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1087            )
1088            .unwrap();
1089        assert_eq!(id, dec.event_id);
1090        assert_eq!(text, "Adopt Rust");
1091        assert_eq!(status, "active");
1092    }
1093
1094    #[test]
1095    fn index_event_is_idempotent_no_search_fts_duplicates() {
1096        let d = TempDir::new().unwrap();
1097        let conn = open(d.path().join("s.sqlite")).unwrap();
1098        let mut open_e = crate::event::Event::new(
1099            "tj-id",
1100            crate::event::EventType::Open,
1101            crate::event::Author::User,
1102            crate::event::Source::Cli,
1103            "x".into(),
1104        );
1105        open_e.meta = serde_json::json!({"title": "Idempotent"});
1106        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1107
1108        // Index three times — simulates rebuild_state replays.
1109        index_event(&conn, &open_e).unwrap();
1110        index_event(&conn, &open_e).unwrap();
1111        index_event(&conn, &open_e).unwrap();
1112
1113        let n: i64 = conn
1114            .query_row(
1115                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1116                rusqlite::params![open_e.event_id],
1117                |r| r.get(0),
1118            )
1119            .unwrap();
1120        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1121    }
1122
1123    #[test]
1124    fn list_all_projects_returns_hashes_from_state_dir() {
1125        use std::fs::File;
1126        let d = TempDir::new().unwrap();
1127        let state_dir = d.path().join("state");
1128        std::fs::create_dir_all(&state_dir).unwrap();
1129        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1130        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1131        File::create(state_dir.join("not-a-project.txt")).unwrap();
1132
1133        let mut hashes = list_all_projects(&state_dir).unwrap();
1134        hashes.sort();
1135        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1136    }
1137
1138    fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1139        use std::io::Write;
1140        writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1141    }
1142
1143    fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1144        let mut e = crate::event::Event::new(
1145            task_id,
1146            crate::event::EventType::Open,
1147            crate::event::Author::User,
1148            crate::event::Source::Cli,
1149            "x".into(),
1150        );
1151        e.meta = serde_json::json!({"title": title});
1152        e
1153    }
1154
1155    #[test]
1156    fn ingest_new_events_picks_up_only_new_lines() {
1157        let d = TempDir::new().unwrap();
1158        let jsonl = d.path().join("events.jsonl");
1159        let db = d.path().join("s.sqlite");
1160        let project = "deadbeefdeadbeef";
1161
1162        let e1 = make_open_event("tj-i1", "first");
1163        let e2 = make_open_event("tj-i2", "second");
1164        let e3 = make_open_event("tj-i3", "third");
1165
1166        let mut f = std::fs::File::create(&jsonl).unwrap();
1167        write_event_line(&mut f, &e1);
1168        write_event_line(&mut f, &e2);
1169        write_event_line(&mut f, &e3);
1170        drop(f);
1171
1172        // First pass — no marker yet, falls back to a full rebuild.
1173        let conn = open(&db).unwrap();
1174        let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1175        assert_eq!(n_first, 3);
1176
1177        // Append two more events.
1178        let e4 = make_open_event("tj-i4", "fourth");
1179        let e5 = make_open_event("tj-i5", "fifth");
1180        let mut f = std::fs::OpenOptions::new()
1181            .append(true)
1182            .open(&jsonl)
1183            .unwrap();
1184        write_event_line(&mut f, &e4);
1185        write_event_line(&mut f, &e5);
1186        drop(f);
1187
1188        // Second pass — marker = e3, only e4 + e5 must be processed.
1189        let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1190        assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1191
1192        let total: i64 = conn
1193            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1194            .unwrap();
1195        assert_eq!(total, 5);
1196
1197        let marker: String = conn
1198            .query_row(
1199                "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1200                rusqlite::params![project],
1201                |r| r.get(0),
1202            )
1203            .unwrap();
1204        assert_eq!(marker, e5.event_id);
1205    }
1206
1207    #[test]
1208    fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1209        let d = TempDir::new().unwrap();
1210        let jsonl = d.path().join("events.jsonl");
1211        let db = d.path().join("s.sqlite");
1212        let project = "feedfacefeedface";
1213
1214        let e1 = make_open_event("tj-r1", "first");
1215        let mut f = std::fs::File::create(&jsonl).unwrap();
1216        write_event_line(&mut f, &e1);
1217        drop(f);
1218
1219        let conn = open(&db).unwrap();
1220        ingest_new_events(&conn, &jsonl, project).unwrap();
1221
1222        // Replace the file entirely so the marker (e1.event_id) no longer
1223        // appears anywhere — simulates corruption / hand-edit.
1224        let e2 = make_open_event("tj-r2", "after-corruption");
1225        let e3 = make_open_event("tj-r3", "after-corruption-2");
1226        let mut f = std::fs::File::create(&jsonl).unwrap();
1227        write_event_line(&mut f, &e2);
1228        write_event_line(&mut f, &e3);
1229        drop(f);
1230
1231        let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1232        assert_eq!(n, 2, "missing marker must trigger full rebuild");
1233    }
1234
1235    #[test]
1236    fn rebuild_state_and_ingest_new_events_produce_same_state() {
1237        let d = TempDir::new().unwrap();
1238        let jsonl_a = d.path().join("a.jsonl");
1239        let jsonl_b = d.path().join("b.jsonl");
1240        let db_a = d.path().join("a.sqlite");
1241        let db_b = d.path().join("b.sqlite");
1242
1243        let events: Vec<_> = (0..5)
1244            .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1245            .collect();
1246        for path in [&jsonl_a, &jsonl_b] {
1247            let mut f = std::fs::File::create(path).unwrap();
1248            for e in &events {
1249                write_event_line(&mut f, e);
1250            }
1251        }
1252
1253        let conn_a = open(&db_a).unwrap();
1254        let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1255
1256        let conn_b = open(&db_b).unwrap();
1257        let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1258
1259        assert_eq!(n_a, n_b);
1260        assert_eq!(n_a, 5);
1261
1262        for table in ["tasks", "events_index"] {
1263            let q = format!("SELECT COUNT(*) FROM {table}");
1264            let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1265            let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1266            assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1267        }
1268    }
1269
1270    #[test]
1271    fn rebuild_state_skips_malformed_jsonl_lines() {
1272        use std::io::Write;
1273        let d = TempDir::new().unwrap();
1274        let events_path = d.path().join("events.jsonl");
1275        let db_path = d.path().join("s.sqlite");
1276
1277        let mut f = std::fs::File::create(&events_path).unwrap();
1278
1279        let mut e1 = crate::event::Event::new(
1280            "tj-skip",
1281            crate::event::EventType::Open,
1282            crate::event::Author::User,
1283            crate::event::Source::Cli,
1284            "x".into(),
1285        );
1286        e1.meta = serde_json::json!({"title": "Skip test"});
1287        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1288
1289        // Garbage that is not even JSON.
1290        writeln!(f, "this is not a json event line").unwrap();
1291
1292        // Valid JSON but not a valid Event (missing required fields).
1293        writeln!(f, "{{\"foo\": 1}}").unwrap();
1294
1295        let e3 = crate::event::Event::new(
1296            "tj-skip",
1297            crate::event::EventType::Decision,
1298            crate::event::Author::Agent,
1299            crate::event::Source::Chat,
1300            "Adopt Rust".into(),
1301        );
1302        writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1303        drop(f);
1304
1305        let conn = open(&db_path).unwrap();
1306        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1307            .expect("rebuild_state must succeed despite malformed lines");
1308        assert_eq!(
1309            n, 2,
1310            "expected 2 valid events indexed (2 malformed skipped)"
1311        );
1312
1313        let indexed: i64 = conn
1314            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1315            .unwrap();
1316        assert_eq!(indexed, 2);
1317    }
1318
1319    #[test]
1320    fn rebuild_state_reads_jsonl_and_populates_db() {
1321        use std::io::Write;
1322        let d = TempDir::new().unwrap();
1323        let events_path = d.path().join("events.jsonl");
1324        let db_path = d.path().join("s.sqlite");
1325
1326        let mut f = std::fs::File::create(&events_path).unwrap();
1327        let mut e1 = crate::event::Event::new(
1328            "tj-9",
1329            crate::event::EventType::Open,
1330            crate::event::Author::User,
1331            crate::event::Source::Cli,
1332            "x".into(),
1333        );
1334        e1.meta = serde_json::json!({"title": "Nine"});
1335        let e2 = crate::event::Event::new(
1336            "tj-9",
1337            crate::event::EventType::Decision,
1338            crate::event::Author::Agent,
1339            crate::event::Source::Chat,
1340            "Adopt Rust".into(),
1341        );
1342        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1343        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1344        drop(f);
1345
1346        let conn = open(&db_path).unwrap();
1347        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1348        assert_eq!(n, 2);
1349
1350        let n: i64 = conn
1351            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1352            .unwrap();
1353        assert_eq!(n, 1);
1354        let n: i64 = conn
1355            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1356            .unwrap();
1357        assert_eq!(n, 2);
1358    }
1359
1360    #[test]
1361    fn index_event_writes_index_and_fts() {
1362        let d = TempDir::new().unwrap();
1363        let conn = open(d.path().join("s.sqlite")).unwrap();
1364        let mut open_e = crate::event::Event::new(
1365            "tj-1",
1366            crate::event::EventType::Open,
1367            crate::event::Author::User,
1368            crate::event::Source::Cli,
1369            "Title".into(),
1370        );
1371        open_e.meta = serde_json::json!({"title": "Title"});
1372        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1373        index_event(&conn, &open_e).unwrap();
1374
1375        let mut decision = crate::event::Event::new(
1376            "tj-1",
1377            crate::event::EventType::Decision,
1378            crate::event::Author::Agent,
1379            crate::event::Source::Chat,
1380            "Adopt Rust".into(),
1381        );
1382        decision.confidence = Some(0.92);
1383        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1384        index_event(&conn, &decision).unwrap();
1385
1386        let count: i64 = conn
1387            .query_row(
1388                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1389                rusqlite::params!["tj-1"],
1390                |r| r.get(0),
1391            )
1392            .unwrap();
1393        assert_eq!(count, 2);
1394
1395        let mut stmt = conn
1396            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1397            .unwrap();
1398        let hits: Vec<String> = stmt
1399            .query_map(rusqlite::params!["Rust"], |r| {
1400                let s: String = r.get(0)?;
1401                Ok(s)
1402            })
1403            .unwrap()
1404            .collect::<Result<Vec<_>, _>>()
1405            .unwrap();
1406        assert_eq!(hits.len(), 1);
1407        assert_eq!(hits[0], decision.event_id);
1408    }
1409
1410    #[test]
1411    fn upsert_task_from_open_event_inserts_row() {
1412        let d = TempDir::new().unwrap();
1413        let conn = open(d.path().join("s.sqlite")).unwrap();
1414
1415        let mut e = crate::event::Event::new(
1416            "tj-7f3a",
1417            crate::event::EventType::Open,
1418            crate::event::Author::User,
1419            crate::event::Source::Cli,
1420            "Add OAuth".into(),
1421        );
1422        e.meta = serde_json::json!({ "title": "Add OAuth login" });
1423
1424        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1425
1426        let (id, title, status): (String, String, String) = conn
1427            .query_row(
1428                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1429                ["tj-7f3a"],
1430                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1431            )
1432            .unwrap();
1433
1434        assert_eq!(id, "tj-7f3a");
1435        assert_eq!(title, "Add OAuth login");
1436        assert_eq!(status, "open");
1437    }
1438}