Skip to main content

tj_core/
db.rs

1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6/// One forward-only schema migration. Migrations are applied in `version`
7/// order; each is recorded in `schema_migrations` so re-running `open()`
8/// is idempotent.
9struct Migration {
10    version: i64,
11    sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16  task_id        TEXT PRIMARY KEY,
17  title          TEXT NOT NULL,
18  status         TEXT NOT NULL,
19  project_hash   TEXT NOT NULL,
20  opened_at      TEXT NOT NULL,
21  closed_at      TEXT,
22  last_event_at  TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27  event_id    TEXT PRIMARY KEY,
28  task_id     TEXT NOT NULL,
29  type        TEXT NOT NULL,
30  timestamp   TEXT NOT NULL,
31  confidence  REAL,
32  status      TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37  decision_id    TEXT PRIMARY KEY,
38  task_id        TEXT NOT NULL,
39  text           TEXT NOT NULL,
40  status         TEXT NOT NULL,
41  superseded_by  TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45  evidence_id           TEXT PRIMARY KEY,
46  task_id               TEXT NOT NULL,
47  text                  TEXT NOT NULL,
48  strength              TEXT NOT NULL,
49  refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53  task_id             TEXT NOT NULL,
54  mode                TEXT NOT NULL,
55  text                TEXT NOT NULL,
56  generated_at        TEXT NOT NULL,
57  source_event_count  INTEGER NOT NULL,
58  PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62  task_id UNINDEXED,
63  event_id UNINDEXED,
64  text,
65  type
66);
67"#;
68
69/// Tracks how far we've ingested the JSONL log per project so subsequent
70/// `ingest_new_events` calls can read only the tail rather than rescanning
71/// the entire file. `last_indexed_event_id` is the `event_id` of the most
72/// recent event written to `events_index`.
73const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75  project_hash          TEXT PRIMARY KEY,
76  last_indexed_event_id TEXT NOT NULL,
77  updated_at            TEXT NOT NULL
78);
79"#;
80
81/// v0.4.0 task-as-goal redesign: explicit goal/outcome on tasks +
82/// typed artifacts on events. NULLable so existing rows survive
83/// without backfill. Wipes the pack cache so old packs (rendered
84/// without Goal/Outcome blocks) regenerate on next view.
85const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal        TEXT;
87ALTER TABLE tasks ADD COLUMN outcome     TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external    TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94// v0.5.0 Phase B — artifacts auto-extract on ingest. The column was
95// added in v003 but stayed NULL for everyone; v004 just wipes the
96// pack cache so newly-extracted artifacts surface in the next pack
97// render. Existing events stay NULL until `reclassify` (Phase B+) or
98// `rebuild-state` is run.
99const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103/// All schema migrations in version order. Append new entries here; never
104/// edit a published migration's `sql` — write a new one instead.
105const MIGRATIONS: &[Migration] = &[
106    Migration {
107        version: 1,
108        sql: MIGRATION_001,
109    },
110    Migration {
111        version: 2,
112        sql: MIGRATION_002,
113    },
114    Migration {
115        version: 3,
116        sql: MIGRATION_003,
117    },
118    Migration {
119        version: 4,
120        sql: MIGRATION_004,
121    },
122];
123
124fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
125    conn.execute_batch(
126        "CREATE TABLE IF NOT EXISTS schema_migrations (
127            version    INTEGER PRIMARY KEY,
128            applied_at TEXT NOT NULL
129        )",
130    )
131    .context("create schema_migrations table")?;
132
133    let applied: HashSet<i64> = {
134        let mut stmt = conn
135            .prepare("SELECT version FROM schema_migrations")
136            .context("select applied versions")?;
137        let rows = stmt
138            .query_map([], |r| r.get::<_, i64>(0))
139            .context("iterate schema_migrations")?;
140        rows.collect::<rusqlite::Result<HashSet<_>>>()
141            .context("collect applied versions")?
142    };
143
144    for migration in MIGRATIONS {
145        if applied.contains(&migration.version) {
146            continue;
147        }
148        conn.execute_batch(migration.sql)
149            .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
150        conn.execute(
151            "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
152            rusqlite::params![
153                migration.version,
154                chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
155            ],
156        )
157        .with_context(|| {
158            format!(
159                "record schema migration v{:03} as applied",
160                migration.version
161            )
162        })?;
163    }
164    Ok(())
165}
166
167use crate::event::{Event, EventType};
168
169pub fn upsert_task_from_event(
170    conn: &Connection,
171    event: &Event,
172    project_hash: &str,
173) -> anyhow::Result<()> {
174    match event.event_type {
175        EventType::Open => {
176            let title = event
177                .meta
178                .get("title")
179                .and_then(|v| v.as_str())
180                .unwrap_or(&event.text)
181                .to_string();
182            conn.execute(
183                "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
184                 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
185                 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
186                rusqlite::params![event.task_id, title, project_hash, event.timestamp],
187            )?;
188        }
189        EventType::Close => {
190            conn.execute(
191                "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
192                rusqlite::params![event.task_id, event.timestamp],
193            )?;
194        }
195        EventType::Reopen => {
196            conn.execute(
197                "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
198                rusqlite::params![event.task_id, event.timestamp],
199            )?;
200        }
201        _ => {
202            conn.execute(
203                "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
204                rusqlite::params![event.task_id, event.timestamp],
205            )?;
206        }
207    }
208    Ok(())
209}
210
211use std::io::BufRead;
212
213pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
214    let dir = state_dir.as_ref();
215    if !dir.exists() {
216        return Ok(vec![]);
217    }
218    let mut out = Vec::new();
219    for entry in std::fs::read_dir(dir)? {
220        let entry = entry?;
221        let path = entry.path();
222        if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
223            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
224                out.push(stem.to_string());
225            }
226        }
227    }
228    Ok(out)
229}
230
231pub fn rebuild_state(
232    conn: &Connection,
233    jsonl_path: impl AsRef<Path>,
234    project_hash: &str,
235) -> anyhow::Result<usize> {
236    let f = std::fs::File::open(&jsonl_path)
237        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
238    let reader = std::io::BufReader::new(f);
239
240    let tx = conn.unchecked_transaction()?;
241    let mut count = 0;
242    let mut last_event_id: Option<String> = None;
243    for (i, line) in reader.lines().enumerate() {
244        let line = line.with_context(|| format!("read line {i}"))?;
245        if line.trim().is_empty() {
246            continue;
247        }
248        // Malformed JSONL lines are skipped with a warning so that one bad
249        // event cannot abort an otherwise-recoverable rebuild. SQL errors
250        // still propagate — those indicate schema/integrity problems.
251        let event: Event = match serde_json::from_str(&line) {
252            Ok(e) => e,
253            Err(err) => {
254                tracing::warn!(
255                    line_number = i + 1,
256                    error = %err,
257                    "skipping malformed JSONL line in rebuild_state"
258                );
259                continue;
260            }
261        };
262        upsert_task_from_event(&tx, &event, project_hash)?;
263        index_event(&tx, &event)?;
264        last_event_id = Some(event.event_id.clone());
265        count += 1;
266    }
267    if let Some(eid) = last_event_id.as_deref() {
268        record_last_indexed(&tx, project_hash, eid)?;
269    }
270    tx.commit()?;
271    Ok(count)
272}
273
274/// Returns whether a task with this id has been recorded in the derived
275/// state. Cheap O(1) lookup against the `tasks` primary key. Callers
276/// should run [`ingest_new_events`] first if they want to see the latest
277/// JSONL state.
278pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
279    let count: i64 = conn.query_row(
280        "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
281        rusqlite::params![task_id],
282        |r| r.get(0),
283    )?;
284    Ok(count > 0)
285}
286
287/// Status string for an existing task (e.g. "open", "closed"). Returns
288/// `None` when the task is unknown — caller decides whether that's a
289/// hard error or a route-to-pending case.
290pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
291    let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
292    let mut rows = stmt.query(rusqlite::params![task_id])?;
293    Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
294}
295
296/// Set or replace `tasks.goal` for an existing task. Caller is
297/// expected to have validated the task exists (via `task_exists`); we
298/// don't error on no-op rows so the upsert pattern is uniform.
299pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
300    conn.execute(
301        "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
302        rusqlite::params![goal, task_id],
303    )
304    .with_context(|| format!("set goal for {task_id}"))?;
305    // Pack cache is now stale for this task — drop the entry so the
306    // next render picks up the new goal.
307    conn.execute(
308        "DELETE FROM task_pack_cache WHERE task_id = ?1",
309        rusqlite::params![task_id],
310    )?;
311    Ok(())
312}
313
314/// Set or replace the closure metadata. Pass `None` for `outcome_tag`
315/// to leave it unset; pass `Some("done"|"abandoned"|"superseded")`
316/// for a structured tag. Free-text `outcome` is the primary field.
317pub fn set_task_outcome(
318    conn: &Connection,
319    task_id: &str,
320    outcome: &str,
321    outcome_tag: Option<&str>,
322) -> anyhow::Result<()> {
323    conn.execute(
324        "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
325        rusqlite::params![outcome, outcome_tag, task_id],
326    )
327    .with_context(|| format!("set outcome for {task_id}"))?;
328    conn.execute(
329        "DELETE FROM task_pack_cache WHERE task_id = ?1",
330        rusqlite::params![task_id],
331    )?;
332    Ok(())
333}
334
335/// Append an external reference to `tasks.external`. The column is
336/// stored as a comma-separated list — small, append-mostly, no
337/// uniqueness constraint. Acceptable shapes (loose, not enforced):
338/// `beads:claude-memory-rsw`, `github:#42`, `jira:PROJ-1234`.
339pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
340    let current: Option<String> = conn
341        .query_row(
342            "SELECT external FROM tasks WHERE task_id = ?1",
343            rusqlite::params![task_id],
344            |r| r.get::<_, Option<String>>(0),
345        )
346        .with_context(|| format!("read external for {task_id}"))?;
347    let next = match current {
348        Some(s) if !s.is_empty() => format!("{s},{reference}"),
349        _ => reference.to_string(),
350    };
351    conn.execute(
352        "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
353        rusqlite::params![next, task_id],
354    )?;
355    conn.execute(
356        "DELETE FROM task_pack_cache WHERE task_id = ?1",
357        rusqlite::params![task_id],
358    )?;
359    Ok(())
360}
361
362/// Read-only metadata bundle used by pack rendering (and TUI list
363/// teasers in v0.4.0+). Returns `None` for unknown tasks.
364#[derive(Debug, Clone, Default)]
365pub struct TaskMetadata {
366    pub goal: Option<String>,
367    pub outcome: Option<String>,
368    pub outcome_tag: Option<String>,
369    pub external: Option<String>,
370}
371
372pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
373    let mut stmt =
374        conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
375    let mut rows = stmt.query(rusqlite::params![task_id])?;
376    Ok(match rows.next()? {
377        Some(r) => Some(TaskMetadata {
378            goal: r.get::<_, Option<String>>(0)?,
379            outcome: r.get::<_, Option<String>>(1)?,
380            outcome_tag: r.get::<_, Option<String>>(2)?,
381            external: r.get::<_, Option<String>>(3)?,
382        }),
383        None => None,
384    })
385}
386
387/// Find tasks (open or closed) whose events reference any of the given
388/// issue identifiers (FIN-868, JIRA-123, INC-7…). Looks at the
389/// per-event `artifacts.linked_issues` column populated on ingest.
390/// Returns `(task_id, status)` deduplicated, most-recent first. Used
391/// by the v0.5.0 Phase C auto-link flow to recognise that a fresh
392/// prompt is a continuation of a prior task.
393pub fn find_tasks_by_linked_issues(
394    conn: &Connection,
395    issues: &[String],
396) -> anyhow::Result<Vec<(String, String)>> {
397    if issues.is_empty() {
398        return Ok(Vec::new());
399    }
400    // Stage A: collect candidate task_ids whose events_index.artifacts
401    // contains any of the requested issue strings. JSON1 is overkill
402    // here — a substring LIKE on the raw JSON is correct given the
403    // ticket id format ("FIN-868") never appears outside its own
404    // linked_issues array.
405    let mut candidate_ids: Vec<String> = Vec::new();
406    for issue in issues {
407        let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
408        let mut stmt = conn.prepare(
409            "SELECT DISTINCT task_id FROM events_index
410             WHERE artifacts LIKE ?1
411             ORDER BY timestamp DESC",
412        )?;
413        let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
414        for r in rows {
415            let id = r?;
416            if !candidate_ids.contains(&id) {
417                candidate_ids.push(id);
418            }
419        }
420    }
421    // Stage B: hydrate status for each candidate.
422    let mut out = Vec::with_capacity(candidate_ids.len());
423    for id in candidate_ids {
424        let status: Option<String> = conn
425            .query_row(
426                "SELECT status FROM tasks WHERE task_id = ?1",
427                rusqlite::params![&id],
428                |r| r.get(0),
429            )
430            .ok();
431        if let Some(s) = status {
432            out.push((id, s));
433        }
434    }
435    Ok(out)
436}
437
438/// Re-run artifact extraction over every event of a task and write the
439/// result back to `events_index.artifacts`. Used to backfill events
440/// that were ingested before Phase B landed. Returns the number of
441/// events touched. Wipes the pack cache for the task so the next
442/// render reflects the freshly extracted artifacts.
443pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
444    let mut stmt = conn.prepare(
445        "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
446         LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
447         WHERE ei.task_id = ?1",
448    )?;
449    let rows: Vec<(String, String)> = stmt
450        .query_map(rusqlite::params![task_id], |r| {
451            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
452        })?
453        .collect::<Result<_, _>>()?;
454    let count = rows.len();
455    for (event_id, text) in rows {
456        let arts = crate::artifacts::extract(&text);
457        let json = if arts.is_empty() {
458            None
459        } else {
460            Some(serde_json::to_string(&arts)?)
461        };
462        conn.execute(
463            "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
464            rusqlite::params![json, event_id],
465        )?;
466    }
467    conn.execute(
468        "DELETE FROM task_pack_cache WHERE task_id = ?1",
469        rusqlite::params![task_id],
470    )?;
471    Ok(count)
472}
473
474/// Aggregate artifacts (commit hashes, PR URLs, ticket IDs, files,
475/// branches) across every event of a task, deduplicated. Reads the
476/// per-event JSON payload that `ingest_new_events` populated. Skips
477/// events whose `artifacts` column is NULL or unparseable rather than
478/// failing the pack render.
479pub fn task_artifacts(
480    conn: &Connection,
481    task_id: &str,
482) -> anyhow::Result<crate::artifacts::Artifacts> {
483    let mut stmt = conn.prepare(
484        "SELECT artifacts FROM events_index
485         WHERE task_id = ?1 AND artifacts IS NOT NULL
486         ORDER BY timestamp ASC",
487    )?;
488    let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
489    let mut acc = crate::artifacts::Artifacts::default();
490    for row in rows {
491        let json = row?;
492        if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
493            acc.merge(parsed);
494        }
495    }
496    Ok(acc)
497}
498
499/// Look up the most recent `event_id` we've ingested for this project.
500/// Returns `None` when the project has never been indexed (first call,
501/// or migration v002 just landed on an existing 0.1.x DB).
502fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
503    let mut stmt =
504        conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
505    let mut rows = stmt.query(rusqlite::params![project_hash])?;
506    if let Some(row) = rows.next()? {
507        Ok(Some(row.get::<_, String>(0)?))
508    } else {
509        Ok(None)
510    }
511}
512
513fn record_last_indexed(
514    conn: &Connection,
515    project_hash: &str,
516    event_id: &str,
517) -> anyhow::Result<()> {
518    conn.execute(
519        "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
520         VALUES (?1, ?2, ?3)
521         ON CONFLICT(project_hash) DO UPDATE SET
522             last_indexed_event_id = excluded.last_indexed_event_id,
523             updated_at = excluded.updated_at",
524        rusqlite::params![
525            project_hash,
526            event_id,
527            chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
528        ],
529    )?;
530    Ok(())
531}
532
533/// Read only the tail of the JSONL log since the last call. The cheap path
534/// for hot loops (every MCP tool invocation): scan to the marker, ingest
535/// the rest, update the marker.
536///
537/// Falls back to a full [`rebuild_state`] in two cases:
538/// - No marker yet for this project (first call after migration v002 or
539///   on a brand-new install).
540/// - The stored marker is not present in the JSONL (corrupted / truncated
541///   file). A `tracing::warn!` is emitted so the operator notices.
542pub fn ingest_new_events(
543    conn: &Connection,
544    jsonl_path: impl AsRef<Path>,
545    project_hash: &str,
546) -> anyhow::Result<usize> {
547    let marker = match last_indexed_event_id(conn, project_hash)? {
548        Some(id) => id,
549        None => return rebuild_state(conn, jsonl_path, project_hash),
550    };
551
552    let f = std::fs::File::open(&jsonl_path)
553        .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
554    let reader = std::io::BufReader::new(f);
555
556    // First pass: confirm the marker still exists in the file. If it does
557    // not, the JSONL has been rewritten under us — we can't trust the
558    // marker, so we fall back to a full rebuild.
559    let tx = conn.unchecked_transaction()?;
560    let mut found_marker = false;
561    let mut count = 0;
562    let mut last_event_id: Option<String> = None;
563    for (i, line) in reader.lines().enumerate() {
564        let line = line.with_context(|| format!("read line {i}"))?;
565        if line.trim().is_empty() {
566            continue;
567        }
568        let event: Event = match serde_json::from_str(&line) {
569            Ok(e) => e,
570            Err(err) => {
571                tracing::warn!(
572                    line_number = i + 1,
573                    error = %err,
574                    "skipping malformed JSONL line in ingest_new_events"
575                );
576                continue;
577            }
578        };
579        if !found_marker {
580            if event.event_id == marker {
581                found_marker = true;
582            }
583            continue;
584        }
585        upsert_task_from_event(&tx, &event, project_hash)?;
586        index_event(&tx, &event)?;
587        last_event_id = Some(event.event_id.clone());
588        count += 1;
589    }
590
591    if !found_marker {
592        // Discard the (empty) tx and rebuild from scratch.
593        drop(tx);
594        tracing::warn!(
595            project_hash = project_hash,
596            marker = marker.as_str(),
597            "last_indexed_event_id not found in JSONL — falling back to full rebuild"
598        );
599        return rebuild_state(conn, jsonl_path, project_hash);
600    }
601
602    if let Some(eid) = last_event_id.as_deref() {
603        record_last_indexed(&tx, project_hash, eid)?;
604    }
605    tx.commit()?;
606    Ok(count)
607}
608
609pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
610    let type_str = serde_json::to_value(event.event_type)?
611        .as_str()
612        .unwrap()
613        .to_string();
614    let status_str = serde_json::to_value(event.status)?
615        .as_str()
616        .unwrap()
617        .to_string();
618    // v0.5.0 Phase B: scrape artifacts (commit hashes, PR URLs, ticket
619    // IDs, file paths, branch names) out of the event text. Storing
620    // per-event so reclassify can recompute without touching foreign
621    // events; pack aggregates and dedupes across events at render time.
622    let artifacts = crate::artifacts::extract(&event.text);
623    let artifacts_json = if artifacts.is_empty() {
624        None
625    } else {
626        Some(serde_json::to_string(&artifacts)?)
627    };
628    conn.execute(
629        "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
630         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
631        rusqlite::params![
632            event.event_id, event.task_id, type_str,
633            event.timestamp, event.confidence, status_str, artifacts_json
634        ],
635    )?;
636    // search_fts has no PK; clear then insert to keep idempotent across rebuild_state replays.
637    conn.execute(
638        "DELETE FROM search_fts WHERE event_id=?1",
639        rusqlite::params![event.event_id],
640    )?;
641    conn.execute(
642        "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
643        rusqlite::params![event.task_id, event.event_id, event.text, type_str],
644    )?;
645
646    if event.event_type == EventType::Decision {
647        conn.execute(
648            "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
649             VALUES (?1, ?2, ?3, 'active')",
650            rusqlite::params![event.event_id, event.task_id, event.text],
651        )?;
652    }
653
654    if event.event_type == EventType::Supersede {
655        if let Some(target) = &event.supersedes {
656            conn.execute(
657                "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
658                rusqlite::params![event.event_id, target],
659            )?;
660        }
661    }
662
663    if event.event_type == EventType::Evidence {
664        let strength_str = event
665            .evidence_strength
666            .map(|s| {
667                serde_json::to_value(s)
668                    .unwrap()
669                    .as_str()
670                    .unwrap()
671                    .to_string()
672            })
673            .unwrap_or_else(|| "medium".into());
674        conn.execute(
675            "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
676             VALUES (?1, ?2, ?3, ?4)",
677            rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
678        )?;
679    }
680
681    // Invalidate any cached pack for this task.
682    conn.execute(
683        "DELETE FROM task_pack_cache WHERE task_id=?1",
684        rusqlite::params![event.task_id],
685    )?;
686
687    Ok(())
688}
689
690pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
691    if let Some(parent) = path.as_ref().parent() {
692        std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
693    }
694    let conn =
695        Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
696    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
697    apply_migrations(&conn).context("apply schema migrations")?;
698    Ok(conn)
699}
700
701/// One row of the task list rendered by the TUI: enough to render the
702/// list view without round-tripping for each task. `event_count` joins
703/// `events_index` so we don't need a second query per row.
704#[derive(Debug, Clone)]
705pub struct TaskRow {
706    pub task_id: String,
707    pub title: String,
708    pub status: String,
709    pub last_event_at: String,
710    pub event_count: usize,
711}
712
713/// All tasks for a project, ordered with open ones first (by recency)
714/// then closed ones. The TUI list view binds directly to this — there
715/// is no other consumer, so the shape is tuned for that callsite.
716pub fn list_tasks_by_project(
717    conn: &Connection,
718    project_hash: &str,
719) -> anyhow::Result<Vec<TaskRow>> {
720    let mut stmt = conn.prepare(
721        "SELECT t.task_id, t.title, t.status, t.last_event_at,
722                COALESCE(c.cnt, 0) AS event_count
723         FROM tasks t
724         LEFT JOIN (
725             SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
726         ) c ON c.task_id = t.task_id
727         WHERE t.project_hash = ?1
728         ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
729    )?;
730    let rows = stmt
731        .query_map(rusqlite::params![project_hash], |r| {
732            Ok(TaskRow {
733                task_id: r.get::<_, String>(0)?,
734                title: r.get::<_, String>(1)?,
735                status: r.get::<_, String>(2)?,
736                last_event_at: r.get::<_, String>(3)?,
737                event_count: r.get::<_, i64>(4)? as usize,
738            })
739        })?
740        .collect::<Result<Vec<_>, _>>()?;
741    Ok(rows)
742}
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747    use tempfile::TempDir;
748
749    #[test]
750    fn task_exists_returns_true_for_known_id_false_otherwise() {
751        let d = TempDir::new().unwrap();
752        let conn = open(d.path().join("s.sqlite")).unwrap();
753
754        assert!(!task_exists(&conn, "tj-nope").unwrap());
755
756        let e = make_open_event("tj-yes", "Hello");
757        upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
758        index_event(&conn, &e).unwrap();
759
760        assert!(task_exists(&conn, "tj-yes").unwrap());
761        assert!(!task_exists(&conn, "tj-nope").unwrap());
762    }
763
764    #[test]
765    fn fresh_db_runs_all_migrations() {
766        let d = TempDir::new().unwrap();
767        let p = d.path().join("state.sqlite");
768        let conn = open(&p).unwrap();
769
770        let applied: Vec<i64> = conn
771            .prepare("SELECT version FROM schema_migrations ORDER BY version")
772            .unwrap()
773            .query_map([], |r| r.get::<_, i64>(0))
774            .unwrap()
775            .collect::<Result<_, _>>()
776            .unwrap();
777        assert_eq!(
778            applied,
779            (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
780            "every declared migration must be recorded"
781        );
782    }
783
784    #[test]
785    fn apply_migrations_is_idempotent_across_reopens() {
786        let d = TempDir::new().unwrap();
787        let p = d.path().join("state.sqlite");
788        let _ = open(&p).unwrap();
789        let _ = open(&p).unwrap();
790
791        let count: i64 = open(&p)
792            .unwrap()
793            .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
794            .unwrap();
795        assert_eq!(
796            count,
797            MIGRATIONS.len() as i64,
798            "schema_migrations must contain exactly one row per declared migration after repeated opens"
799        );
800    }
801
802    #[test]
803    fn open_creates_all_tables() {
804        let d = TempDir::new().unwrap();
805        let p = d.path().join("state.sqlite");
806        let conn = open(&p).unwrap();
807
808        let names: Vec<String> = conn
809            .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
810            .unwrap()
811            .query_map([], |r| r.get::<_, String>(0))
812            .unwrap()
813            .collect::<Result<_, _>>()
814            .unwrap();
815
816        for required in [
817            "decisions",
818            "events_index",
819            "evidence",
820            "task_pack_cache",
821            "tasks",
822            "search_fts",
823        ] {
824            assert!(
825                names.iter().any(|n| n == required),
826                "missing table {required}, have {names:?}"
827            );
828        }
829    }
830
831    #[test]
832    fn open_is_idempotent() {
833        let d = TempDir::new().unwrap();
834        let p = d.path().join("state.sqlite");
835        let _ = open(&p).unwrap();
836        let _ = open(&p).unwrap();
837    }
838
839    #[test]
840    fn index_event_projects_evidence() {
841        let d = TempDir::new().unwrap();
842        let conn = open(d.path().join("s.sqlite")).unwrap();
843        let mut open_e = crate::event::Event::new(
844            "tj-e",
845            crate::event::EventType::Open,
846            crate::event::Author::User,
847            crate::event::Source::Cli,
848            "x".into(),
849        );
850        open_e.meta = serde_json::json!({"title": "T"});
851        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
852        index_event(&conn, &open_e).unwrap();
853
854        let mut ev = crate::event::Event::new(
855            "tj-e",
856            crate::event::EventType::Evidence,
857            crate::event::Author::Agent,
858            crate::event::Source::Chat,
859            "Hook startup measured at 12ms".into(),
860        );
861        ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
862        upsert_task_from_event(&conn, &ev, "feedface").unwrap();
863        index_event(&conn, &ev).unwrap();
864
865        let (text, strength): (String, String) = conn
866            .query_row(
867                "SELECT text, strength FROM evidence WHERE task_id=?1",
868                rusqlite::params!["tj-e"],
869                |r| Ok((r.get(0)?, r.get(1)?)),
870            )
871            .unwrap();
872        assert!(text.contains("12ms"));
873        assert_eq!(strength, "strong");
874    }
875
876    #[test]
877    fn supersede_event_marks_decision_superseded() {
878        let d = TempDir::new().unwrap();
879        let conn = open(d.path().join("s.sqlite")).unwrap();
880        let mut open_e = crate::event::Event::new(
881            "tj-s",
882            crate::event::EventType::Open,
883            crate::event::Author::User,
884            crate::event::Source::Cli,
885            "x".into(),
886        );
887        open_e.meta = serde_json::json!({"title": "T"});
888        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
889        index_event(&conn, &open_e).unwrap();
890
891        let dec = crate::event::Event::new(
892            "tj-s",
893            crate::event::EventType::Decision,
894            crate::event::Author::Agent,
895            crate::event::Source::Chat,
896            "Use TS".into(),
897        );
898        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
899        index_event(&conn, &dec).unwrap();
900
901        let mut sup = crate::event::Event::new(
902            "tj-s",
903            crate::event::EventType::Supersede,
904            crate::event::Author::Agent,
905            crate::event::Source::Chat,
906            "Replaced by Rust decision".into(),
907        );
908        sup.supersedes = Some(dec.event_id.clone());
909        upsert_task_from_event(&conn, &sup, "feedface").unwrap();
910        index_event(&conn, &sup).unwrap();
911
912        let (status, by): (String, Option<String>) = conn
913            .query_row(
914                "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
915                rusqlite::params![dec.event_id],
916                |r| Ok((r.get(0)?, r.get(1)?)),
917            )
918            .unwrap();
919        assert_eq!(status, "superseded");
920        assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
921    }
922
923    #[test]
924    fn index_event_projects_decision_to_decisions_table() {
925        let d = TempDir::new().unwrap();
926        let conn = open(d.path().join("s.sqlite")).unwrap();
927
928        let mut open_e = crate::event::Event::new(
929            "tj-d",
930            crate::event::EventType::Open,
931            crate::event::Author::User,
932            crate::event::Source::Cli,
933            "x".into(),
934        );
935        open_e.meta = serde_json::json!({"title": "T"});
936        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
937        index_event(&conn, &open_e).unwrap();
938
939        let dec = crate::event::Event::new(
940            "tj-d",
941            crate::event::EventType::Decision,
942            crate::event::Author::Agent,
943            crate::event::Source::Chat,
944            "Adopt Rust".into(),
945        );
946        upsert_task_from_event(&conn, &dec, "feedface").unwrap();
947        index_event(&conn, &dec).unwrap();
948
949        let (id, text, status): (String, String, String) = conn
950            .query_row(
951                "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
952                rusqlite::params!["tj-d"],
953                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
954            )
955            .unwrap();
956        assert_eq!(id, dec.event_id);
957        assert_eq!(text, "Adopt Rust");
958        assert_eq!(status, "active");
959    }
960
961    #[test]
962    fn index_event_is_idempotent_no_search_fts_duplicates() {
963        let d = TempDir::new().unwrap();
964        let conn = open(d.path().join("s.sqlite")).unwrap();
965        let mut open_e = crate::event::Event::new(
966            "tj-id",
967            crate::event::EventType::Open,
968            crate::event::Author::User,
969            crate::event::Source::Cli,
970            "x".into(),
971        );
972        open_e.meta = serde_json::json!({"title": "Idempotent"});
973        upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
974
975        // Index three times — simulates rebuild_state replays.
976        index_event(&conn, &open_e).unwrap();
977        index_event(&conn, &open_e).unwrap();
978        index_event(&conn, &open_e).unwrap();
979
980        let n: i64 = conn
981            .query_row(
982                "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
983                rusqlite::params![open_e.event_id],
984                |r| r.get(0),
985            )
986            .unwrap();
987        assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
988    }
989
990    #[test]
991    fn list_all_projects_returns_hashes_from_state_dir() {
992        use std::fs::File;
993        let d = TempDir::new().unwrap();
994        let state_dir = d.path().join("state");
995        std::fs::create_dir_all(&state_dir).unwrap();
996        File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
997        File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
998        File::create(state_dir.join("not-a-project.txt")).unwrap();
999
1000        let mut hashes = list_all_projects(&state_dir).unwrap();
1001        hashes.sort();
1002        assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1003    }
1004
1005    fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1006        use std::io::Write;
1007        writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1008    }
1009
1010    fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1011        let mut e = crate::event::Event::new(
1012            task_id,
1013            crate::event::EventType::Open,
1014            crate::event::Author::User,
1015            crate::event::Source::Cli,
1016            "x".into(),
1017        );
1018        e.meta = serde_json::json!({"title": title});
1019        e
1020    }
1021
1022    #[test]
1023    fn ingest_new_events_picks_up_only_new_lines() {
1024        let d = TempDir::new().unwrap();
1025        let jsonl = d.path().join("events.jsonl");
1026        let db = d.path().join("s.sqlite");
1027        let project = "deadbeefdeadbeef";
1028
1029        let e1 = make_open_event("tj-i1", "first");
1030        let e2 = make_open_event("tj-i2", "second");
1031        let e3 = make_open_event("tj-i3", "third");
1032
1033        let mut f = std::fs::File::create(&jsonl).unwrap();
1034        write_event_line(&mut f, &e1);
1035        write_event_line(&mut f, &e2);
1036        write_event_line(&mut f, &e3);
1037        drop(f);
1038
1039        // First pass — no marker yet, falls back to a full rebuild.
1040        let conn = open(&db).unwrap();
1041        let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1042        assert_eq!(n_first, 3);
1043
1044        // Append two more events.
1045        let e4 = make_open_event("tj-i4", "fourth");
1046        let e5 = make_open_event("tj-i5", "fifth");
1047        let mut f = std::fs::OpenOptions::new()
1048            .append(true)
1049            .open(&jsonl)
1050            .unwrap();
1051        write_event_line(&mut f, &e4);
1052        write_event_line(&mut f, &e5);
1053        drop(f);
1054
1055        // Second pass — marker = e3, only e4 + e5 must be processed.
1056        let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1057        assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1058
1059        let total: i64 = conn
1060            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1061            .unwrap();
1062        assert_eq!(total, 5);
1063
1064        let marker: String = conn
1065            .query_row(
1066                "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1067                rusqlite::params![project],
1068                |r| r.get(0),
1069            )
1070            .unwrap();
1071        assert_eq!(marker, e5.event_id);
1072    }
1073
1074    #[test]
1075    fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1076        let d = TempDir::new().unwrap();
1077        let jsonl = d.path().join("events.jsonl");
1078        let db = d.path().join("s.sqlite");
1079        let project = "feedfacefeedface";
1080
1081        let e1 = make_open_event("tj-r1", "first");
1082        let mut f = std::fs::File::create(&jsonl).unwrap();
1083        write_event_line(&mut f, &e1);
1084        drop(f);
1085
1086        let conn = open(&db).unwrap();
1087        ingest_new_events(&conn, &jsonl, project).unwrap();
1088
1089        // Replace the file entirely so the marker (e1.event_id) no longer
1090        // appears anywhere — simulates corruption / hand-edit.
1091        let e2 = make_open_event("tj-r2", "after-corruption");
1092        let e3 = make_open_event("tj-r3", "after-corruption-2");
1093        let mut f = std::fs::File::create(&jsonl).unwrap();
1094        write_event_line(&mut f, &e2);
1095        write_event_line(&mut f, &e3);
1096        drop(f);
1097
1098        let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1099        assert_eq!(n, 2, "missing marker must trigger full rebuild");
1100    }
1101
1102    #[test]
1103    fn rebuild_state_and_ingest_new_events_produce_same_state() {
1104        let d = TempDir::new().unwrap();
1105        let jsonl_a = d.path().join("a.jsonl");
1106        let jsonl_b = d.path().join("b.jsonl");
1107        let db_a = d.path().join("a.sqlite");
1108        let db_b = d.path().join("b.sqlite");
1109
1110        let events: Vec<_> = (0..5)
1111            .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1112            .collect();
1113        for path in [&jsonl_a, &jsonl_b] {
1114            let mut f = std::fs::File::create(path).unwrap();
1115            for e in &events {
1116                write_event_line(&mut f, e);
1117            }
1118        }
1119
1120        let conn_a = open(&db_a).unwrap();
1121        let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1122
1123        let conn_b = open(&db_b).unwrap();
1124        let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1125
1126        assert_eq!(n_a, n_b);
1127        assert_eq!(n_a, 5);
1128
1129        for table in ["tasks", "events_index"] {
1130            let q = format!("SELECT COUNT(*) FROM {table}");
1131            let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1132            let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1133            assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1134        }
1135    }
1136
1137    #[test]
1138    fn rebuild_state_skips_malformed_jsonl_lines() {
1139        use std::io::Write;
1140        let d = TempDir::new().unwrap();
1141        let events_path = d.path().join("events.jsonl");
1142        let db_path = d.path().join("s.sqlite");
1143
1144        let mut f = std::fs::File::create(&events_path).unwrap();
1145
1146        let mut e1 = crate::event::Event::new(
1147            "tj-skip",
1148            crate::event::EventType::Open,
1149            crate::event::Author::User,
1150            crate::event::Source::Cli,
1151            "x".into(),
1152        );
1153        e1.meta = serde_json::json!({"title": "Skip test"});
1154        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1155
1156        // Garbage that is not even JSON.
1157        writeln!(f, "this is not a json event line").unwrap();
1158
1159        // Valid JSON but not a valid Event (missing required fields).
1160        writeln!(f, "{{\"foo\": 1}}").unwrap();
1161
1162        let e3 = crate::event::Event::new(
1163            "tj-skip",
1164            crate::event::EventType::Decision,
1165            crate::event::Author::Agent,
1166            crate::event::Source::Chat,
1167            "Adopt Rust".into(),
1168        );
1169        writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1170        drop(f);
1171
1172        let conn = open(&db_path).unwrap();
1173        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1174            .expect("rebuild_state must succeed despite malformed lines");
1175        assert_eq!(
1176            n, 2,
1177            "expected 2 valid events indexed (2 malformed skipped)"
1178        );
1179
1180        let indexed: i64 = conn
1181            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1182            .unwrap();
1183        assert_eq!(indexed, 2);
1184    }
1185
1186    #[test]
1187    fn rebuild_state_reads_jsonl_and_populates_db() {
1188        use std::io::Write;
1189        let d = TempDir::new().unwrap();
1190        let events_path = d.path().join("events.jsonl");
1191        let db_path = d.path().join("s.sqlite");
1192
1193        let mut f = std::fs::File::create(&events_path).unwrap();
1194        let mut e1 = crate::event::Event::new(
1195            "tj-9",
1196            crate::event::EventType::Open,
1197            crate::event::Author::User,
1198            crate::event::Source::Cli,
1199            "x".into(),
1200        );
1201        e1.meta = serde_json::json!({"title": "Nine"});
1202        let e2 = crate::event::Event::new(
1203            "tj-9",
1204            crate::event::EventType::Decision,
1205            crate::event::Author::Agent,
1206            crate::event::Source::Chat,
1207            "Adopt Rust".into(),
1208        );
1209        writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1210        writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1211        drop(f);
1212
1213        let conn = open(&db_path).unwrap();
1214        let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1215        assert_eq!(n, 2);
1216
1217        let n: i64 = conn
1218            .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1219            .unwrap();
1220        assert_eq!(n, 1);
1221        let n: i64 = conn
1222            .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1223            .unwrap();
1224        assert_eq!(n, 2);
1225    }
1226
1227    #[test]
1228    fn index_event_writes_index_and_fts() {
1229        let d = TempDir::new().unwrap();
1230        let conn = open(d.path().join("s.sqlite")).unwrap();
1231        let mut open_e = crate::event::Event::new(
1232            "tj-1",
1233            crate::event::EventType::Open,
1234            crate::event::Author::User,
1235            crate::event::Source::Cli,
1236            "Title".into(),
1237        );
1238        open_e.meta = serde_json::json!({"title": "Title"});
1239        upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1240        index_event(&conn, &open_e).unwrap();
1241
1242        let mut decision = crate::event::Event::new(
1243            "tj-1",
1244            crate::event::EventType::Decision,
1245            crate::event::Author::Agent,
1246            crate::event::Source::Chat,
1247            "Adopt Rust".into(),
1248        );
1249        decision.confidence = Some(0.92);
1250        upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1251        index_event(&conn, &decision).unwrap();
1252
1253        let count: i64 = conn
1254            .query_row(
1255                "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1256                rusqlite::params!["tj-1"],
1257                |r| r.get(0),
1258            )
1259            .unwrap();
1260        assert_eq!(count, 2);
1261
1262        let mut stmt = conn
1263            .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1264            .unwrap();
1265        let hits: Vec<String> = stmt
1266            .query_map(rusqlite::params!["Rust"], |r| {
1267                let s: String = r.get(0)?;
1268                Ok(s)
1269            })
1270            .unwrap()
1271            .collect::<Result<Vec<_>, _>>()
1272            .unwrap();
1273        assert_eq!(hits.len(), 1);
1274        assert_eq!(hits[0], decision.event_id);
1275    }
1276
1277    #[test]
1278    fn upsert_task_from_open_event_inserts_row() {
1279        let d = TempDir::new().unwrap();
1280        let conn = open(d.path().join("s.sqlite")).unwrap();
1281
1282        let mut e = crate::event::Event::new(
1283            "tj-7f3a",
1284            crate::event::EventType::Open,
1285            crate::event::Author::User,
1286            crate::event::Source::Cli,
1287            "Add OAuth".into(),
1288        );
1289        e.meta = serde_json::json!({ "title": "Add OAuth login" });
1290
1291        upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1292
1293        let (id, title, status): (String, String, String) = conn
1294            .query_row(
1295                "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1296                ["tj-7f3a"],
1297                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1298            )
1299            .unwrap();
1300
1301        assert_eq!(id, "tj-7f3a");
1302        assert_eq!(title, "Add OAuth login");
1303        assert_eq!(status, "open");
1304    }
1305}