1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal TEXT;
87ALTER TABLE tasks ADD COLUMN outcome TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103const MIGRATIONS: &[Migration] = &[
106 Migration {
107 version: 1,
108 sql: MIGRATION_001,
109 },
110 Migration {
111 version: 2,
112 sql: MIGRATION_002,
113 },
114 Migration {
115 version: 3,
116 sql: MIGRATION_003,
117 },
118 Migration {
119 version: 4,
120 sql: MIGRATION_004,
121 },
122];
123
124fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
125 conn.execute_batch(
126 "CREATE TABLE IF NOT EXISTS schema_migrations (
127 version INTEGER PRIMARY KEY,
128 applied_at TEXT NOT NULL
129 )",
130 )
131 .context("create schema_migrations table")?;
132
133 let applied: HashSet<i64> = {
134 let mut stmt = conn
135 .prepare("SELECT version FROM schema_migrations")
136 .context("select applied versions")?;
137 let rows = stmt
138 .query_map([], |r| r.get::<_, i64>(0))
139 .context("iterate schema_migrations")?;
140 rows.collect::<rusqlite::Result<HashSet<_>>>()
141 .context("collect applied versions")?
142 };
143
144 for migration in MIGRATIONS {
145 if applied.contains(&migration.version) {
146 continue;
147 }
148 conn.execute_batch(migration.sql)
149 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
150 conn.execute(
151 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
152 rusqlite::params![
153 migration.version,
154 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
155 ],
156 )
157 .with_context(|| {
158 format!(
159 "record schema migration v{:03} as applied",
160 migration.version
161 )
162 })?;
163 }
164 Ok(())
165}
166
167use crate::event::{Event, EventType};
168
169pub fn upsert_task_from_event(
170 conn: &Connection,
171 event: &Event,
172 project_hash: &str,
173) -> anyhow::Result<()> {
174 match event.event_type {
175 EventType::Open => {
176 let title = event
177 .meta
178 .get("title")
179 .and_then(|v| v.as_str())
180 .unwrap_or(&event.text)
181 .to_string();
182 conn.execute(
183 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
184 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
185 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
186 rusqlite::params![event.task_id, title, project_hash, event.timestamp],
187 )?;
188 }
189 EventType::Close => {
190 conn.execute(
191 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
192 rusqlite::params![event.task_id, event.timestamp],
193 )?;
194 }
195 EventType::Reopen => {
196 conn.execute(
197 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
198 rusqlite::params![event.task_id, event.timestamp],
199 )?;
200 }
201 _ => {
202 conn.execute(
203 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
204 rusqlite::params![event.task_id, event.timestamp],
205 )?;
206 }
207 }
208 Ok(())
209}
210
211use std::io::BufRead;
212
213pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
214 let dir = state_dir.as_ref();
215 if !dir.exists() {
216 return Ok(vec![]);
217 }
218 let mut out = Vec::new();
219 for entry in std::fs::read_dir(dir)? {
220 let entry = entry?;
221 let path = entry.path();
222 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
223 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
224 out.push(stem.to_string());
225 }
226 }
227 }
228 Ok(out)
229}
230
231pub fn rebuild_state(
232 conn: &Connection,
233 jsonl_path: impl AsRef<Path>,
234 project_hash: &str,
235) -> anyhow::Result<usize> {
236 let f = std::fs::File::open(&jsonl_path)
237 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
238 let reader = std::io::BufReader::new(f);
239
240 let tx = conn.unchecked_transaction()?;
241 let mut count = 0;
242 let mut last_event_id: Option<String> = None;
243 for (i, line) in reader.lines().enumerate() {
244 let line = line.with_context(|| format!("read line {i}"))?;
245 if line.trim().is_empty() {
246 continue;
247 }
248 let event: Event = match serde_json::from_str(&line) {
252 Ok(e) => e,
253 Err(err) => {
254 tracing::warn!(
255 line_number = i + 1,
256 error = %err,
257 "skipping malformed JSONL line in rebuild_state"
258 );
259 continue;
260 }
261 };
262 upsert_task_from_event(&tx, &event, project_hash)?;
263 index_event(&tx, &event)?;
264 last_event_id = Some(event.event_id.clone());
265 count += 1;
266 }
267 if let Some(eid) = last_event_id.as_deref() {
268 record_last_indexed(&tx, project_hash, eid)?;
269 }
270 tx.commit()?;
271 Ok(count)
272}
273
274pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
279 let count: i64 = conn.query_row(
280 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
281 rusqlite::params![task_id],
282 |r| r.get(0),
283 )?;
284 Ok(count > 0)
285}
286
287pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
291 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
292 let mut rows = stmt.query(rusqlite::params![task_id])?;
293 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
294}
295
296pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
300 conn.execute(
301 "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
302 rusqlite::params![goal, task_id],
303 )
304 .with_context(|| format!("set goal for {task_id}"))?;
305 conn.execute(
308 "DELETE FROM task_pack_cache WHERE task_id = ?1",
309 rusqlite::params![task_id],
310 )?;
311 Ok(())
312}
313
314pub fn set_task_outcome(
318 conn: &Connection,
319 task_id: &str,
320 outcome: &str,
321 outcome_tag: Option<&str>,
322) -> anyhow::Result<()> {
323 conn.execute(
324 "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
325 rusqlite::params![outcome, outcome_tag, task_id],
326 )
327 .with_context(|| format!("set outcome for {task_id}"))?;
328 conn.execute(
329 "DELETE FROM task_pack_cache WHERE task_id = ?1",
330 rusqlite::params![task_id],
331 )?;
332 Ok(())
333}
334
335pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
340 let current: Option<String> = conn
341 .query_row(
342 "SELECT external FROM tasks WHERE task_id = ?1",
343 rusqlite::params![task_id],
344 |r| r.get::<_, Option<String>>(0),
345 )
346 .with_context(|| format!("read external for {task_id}"))?;
347 let next = match current {
348 Some(s) if !s.is_empty() => format!("{s},{reference}"),
349 _ => reference.to_string(),
350 };
351 conn.execute(
352 "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
353 rusqlite::params![next, task_id],
354 )?;
355 conn.execute(
356 "DELETE FROM task_pack_cache WHERE task_id = ?1",
357 rusqlite::params![task_id],
358 )?;
359 Ok(())
360}
361
362#[derive(Debug, Clone, Default)]
365pub struct TaskMetadata {
366 pub goal: Option<String>,
367 pub outcome: Option<String>,
368 pub outcome_tag: Option<String>,
369 pub external: Option<String>,
370}
371
372pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
373 let mut stmt =
374 conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
375 let mut rows = stmt.query(rusqlite::params![task_id])?;
376 Ok(match rows.next()? {
377 Some(r) => Some(TaskMetadata {
378 goal: r.get::<_, Option<String>>(0)?,
379 outcome: r.get::<_, Option<String>>(1)?,
380 outcome_tag: r.get::<_, Option<String>>(2)?,
381 external: r.get::<_, Option<String>>(3)?,
382 }),
383 None => None,
384 })
385}
386
387pub fn find_tasks_by_linked_issues(
394 conn: &Connection,
395 issues: &[String],
396) -> anyhow::Result<Vec<(String, String)>> {
397 if issues.is_empty() {
398 return Ok(Vec::new());
399 }
400 let mut candidate_ids: Vec<String> = Vec::new();
406 for issue in issues {
407 let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
408 let mut stmt = conn.prepare(
409 "SELECT DISTINCT task_id FROM events_index
410 WHERE artifacts LIKE ?1
411 ORDER BY timestamp DESC",
412 )?;
413 let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
414 for r in rows {
415 let id = r?;
416 if !candidate_ids.contains(&id) {
417 candidate_ids.push(id);
418 }
419 }
420 }
421 let mut out = Vec::with_capacity(candidate_ids.len());
423 for id in candidate_ids {
424 let status: Option<String> = conn
425 .query_row(
426 "SELECT status FROM tasks WHERE task_id = ?1",
427 rusqlite::params![&id],
428 |r| r.get(0),
429 )
430 .ok();
431 if let Some(s) = status {
432 out.push((id, s));
433 }
434 }
435 Ok(out)
436}
437
438pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
444 let mut stmt = conn.prepare(
445 "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
446 LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
447 WHERE ei.task_id = ?1",
448 )?;
449 let rows: Vec<(String, String)> = stmt
450 .query_map(rusqlite::params![task_id], |r| {
451 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
452 })?
453 .collect::<Result<_, _>>()?;
454 let count = rows.len();
455 for (event_id, text) in rows {
456 let arts = crate::artifacts::extract(&text);
457 let json = if arts.is_empty() {
458 None
459 } else {
460 Some(serde_json::to_string(&arts)?)
461 };
462 conn.execute(
463 "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
464 rusqlite::params![json, event_id],
465 )?;
466 }
467 conn.execute(
468 "DELETE FROM task_pack_cache WHERE task_id = ?1",
469 rusqlite::params![task_id],
470 )?;
471 Ok(count)
472}
473
474pub fn task_artifacts(
480 conn: &Connection,
481 task_id: &str,
482) -> anyhow::Result<crate::artifacts::Artifacts> {
483 let mut stmt = conn.prepare(
484 "SELECT artifacts FROM events_index
485 WHERE task_id = ?1 AND artifacts IS NOT NULL
486 ORDER BY timestamp ASC",
487 )?;
488 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
489 let mut acc = crate::artifacts::Artifacts::default();
490 for row in rows {
491 let json = row?;
492 if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
493 acc.merge(parsed);
494 }
495 }
496 Ok(acc)
497}
498
499fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
503 let mut stmt =
504 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
505 let mut rows = stmt.query(rusqlite::params![project_hash])?;
506 if let Some(row) = rows.next()? {
507 Ok(Some(row.get::<_, String>(0)?))
508 } else {
509 Ok(None)
510 }
511}
512
513fn record_last_indexed(
514 conn: &Connection,
515 project_hash: &str,
516 event_id: &str,
517) -> anyhow::Result<()> {
518 conn.execute(
519 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
520 VALUES (?1, ?2, ?3)
521 ON CONFLICT(project_hash) DO UPDATE SET
522 last_indexed_event_id = excluded.last_indexed_event_id,
523 updated_at = excluded.updated_at",
524 rusqlite::params![
525 project_hash,
526 event_id,
527 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
528 ],
529 )?;
530 Ok(())
531}
532
533pub fn ingest_new_events(
543 conn: &Connection,
544 jsonl_path: impl AsRef<Path>,
545 project_hash: &str,
546) -> anyhow::Result<usize> {
547 let marker = match last_indexed_event_id(conn, project_hash)? {
548 Some(id) => id,
549 None => return rebuild_state(conn, jsonl_path, project_hash),
550 };
551
552 let f = std::fs::File::open(&jsonl_path)
553 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
554 let reader = std::io::BufReader::new(f);
555
556 let tx = conn.unchecked_transaction()?;
560 let mut found_marker = false;
561 let mut count = 0;
562 let mut last_event_id: Option<String> = None;
563 for (i, line) in reader.lines().enumerate() {
564 let line = line.with_context(|| format!("read line {i}"))?;
565 if line.trim().is_empty() {
566 continue;
567 }
568 let event: Event = match serde_json::from_str(&line) {
569 Ok(e) => e,
570 Err(err) => {
571 tracing::warn!(
572 line_number = i + 1,
573 error = %err,
574 "skipping malformed JSONL line in ingest_new_events"
575 );
576 continue;
577 }
578 };
579 if !found_marker {
580 if event.event_id == marker {
581 found_marker = true;
582 }
583 continue;
584 }
585 upsert_task_from_event(&tx, &event, project_hash)?;
586 index_event(&tx, &event)?;
587 last_event_id = Some(event.event_id.clone());
588 count += 1;
589 }
590
591 if !found_marker {
592 drop(tx);
594 tracing::warn!(
595 project_hash = project_hash,
596 marker = marker.as_str(),
597 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
598 );
599 return rebuild_state(conn, jsonl_path, project_hash);
600 }
601
602 if let Some(eid) = last_event_id.as_deref() {
603 record_last_indexed(&tx, project_hash, eid)?;
604 }
605 tx.commit()?;
606 Ok(count)
607}
608
609pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
610 let type_str = serde_json::to_value(event.event_type)?
611 .as_str()
612 .unwrap()
613 .to_string();
614 let status_str = serde_json::to_value(event.status)?
615 .as_str()
616 .unwrap()
617 .to_string();
618 let artifacts = crate::artifacts::extract(&event.text);
623 let artifacts_json = if artifacts.is_empty() {
624 None
625 } else {
626 Some(serde_json::to_string(&artifacts)?)
627 };
628 conn.execute(
629 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
630 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
631 rusqlite::params![
632 event.event_id, event.task_id, type_str,
633 event.timestamp, event.confidence, status_str, artifacts_json
634 ],
635 )?;
636 conn.execute(
638 "DELETE FROM search_fts WHERE event_id=?1",
639 rusqlite::params![event.event_id],
640 )?;
641 conn.execute(
642 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
643 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
644 )?;
645
646 if event.event_type == EventType::Decision {
647 conn.execute(
648 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
649 VALUES (?1, ?2, ?3, 'active')",
650 rusqlite::params![event.event_id, event.task_id, event.text],
651 )?;
652 }
653
654 if event.event_type == EventType::Supersede {
655 if let Some(target) = &event.supersedes {
656 conn.execute(
657 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
658 rusqlite::params![event.event_id, target],
659 )?;
660 }
661 }
662
663 if event.event_type == EventType::Evidence {
664 let strength_str = event
665 .evidence_strength
666 .map(|s| {
667 serde_json::to_value(s)
668 .unwrap()
669 .as_str()
670 .unwrap()
671 .to_string()
672 })
673 .unwrap_or_else(|| "medium".into());
674 conn.execute(
675 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
676 VALUES (?1, ?2, ?3, ?4)",
677 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
678 )?;
679 }
680
681 conn.execute(
683 "DELETE FROM task_pack_cache WHERE task_id=?1",
684 rusqlite::params![event.task_id],
685 )?;
686
687 Ok(())
688}
689
690pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
691 if let Some(parent) = path.as_ref().parent() {
692 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
693 }
694 let conn =
695 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
696 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
697 apply_migrations(&conn).context("apply schema migrations")?;
698 Ok(conn)
699}
700
701#[derive(Debug, Clone)]
705pub struct TaskRow {
706 pub task_id: String,
707 pub title: String,
708 pub status: String,
709 pub last_event_at: String,
710 pub event_count: usize,
711}
712
713pub fn list_tasks_by_project(
717 conn: &Connection,
718 project_hash: &str,
719) -> anyhow::Result<Vec<TaskRow>> {
720 let mut stmt = conn.prepare(
721 "SELECT t.task_id, t.title, t.status, t.last_event_at,
722 COALESCE(c.cnt, 0) AS event_count
723 FROM tasks t
724 LEFT JOIN (
725 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
726 ) c ON c.task_id = t.task_id
727 WHERE t.project_hash = ?1
728 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
729 )?;
730 let rows = stmt
731 .query_map(rusqlite::params![project_hash], |r| {
732 Ok(TaskRow {
733 task_id: r.get::<_, String>(0)?,
734 title: r.get::<_, String>(1)?,
735 status: r.get::<_, String>(2)?,
736 last_event_at: r.get::<_, String>(3)?,
737 event_count: r.get::<_, i64>(4)? as usize,
738 })
739 })?
740 .collect::<Result<Vec<_>, _>>()?;
741 Ok(rows)
742}
743
744#[cfg(test)]
745mod tests {
746 use super::*;
747 use tempfile::TempDir;
748
749 #[test]
750 fn task_exists_returns_true_for_known_id_false_otherwise() {
751 let d = TempDir::new().unwrap();
752 let conn = open(d.path().join("s.sqlite")).unwrap();
753
754 assert!(!task_exists(&conn, "tj-nope").unwrap());
755
756 let e = make_open_event("tj-yes", "Hello");
757 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
758 index_event(&conn, &e).unwrap();
759
760 assert!(task_exists(&conn, "tj-yes").unwrap());
761 assert!(!task_exists(&conn, "tj-nope").unwrap());
762 }
763
764 #[test]
765 fn fresh_db_runs_all_migrations() {
766 let d = TempDir::new().unwrap();
767 let p = d.path().join("state.sqlite");
768 let conn = open(&p).unwrap();
769
770 let applied: Vec<i64> = conn
771 .prepare("SELECT version FROM schema_migrations ORDER BY version")
772 .unwrap()
773 .query_map([], |r| r.get::<_, i64>(0))
774 .unwrap()
775 .collect::<Result<_, _>>()
776 .unwrap();
777 assert_eq!(
778 applied,
779 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
780 "every declared migration must be recorded"
781 );
782 }
783
784 #[test]
785 fn apply_migrations_is_idempotent_across_reopens() {
786 let d = TempDir::new().unwrap();
787 let p = d.path().join("state.sqlite");
788 let _ = open(&p).unwrap();
789 let _ = open(&p).unwrap();
790
791 let count: i64 = open(&p)
792 .unwrap()
793 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
794 .unwrap();
795 assert_eq!(
796 count,
797 MIGRATIONS.len() as i64,
798 "schema_migrations must contain exactly one row per declared migration after repeated opens"
799 );
800 }
801
802 #[test]
803 fn open_creates_all_tables() {
804 let d = TempDir::new().unwrap();
805 let p = d.path().join("state.sqlite");
806 let conn = open(&p).unwrap();
807
808 let names: Vec<String> = conn
809 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
810 .unwrap()
811 .query_map([], |r| r.get::<_, String>(0))
812 .unwrap()
813 .collect::<Result<_, _>>()
814 .unwrap();
815
816 for required in [
817 "decisions",
818 "events_index",
819 "evidence",
820 "task_pack_cache",
821 "tasks",
822 "search_fts",
823 ] {
824 assert!(
825 names.iter().any(|n| n == required),
826 "missing table {required}, have {names:?}"
827 );
828 }
829 }
830
831 #[test]
832 fn open_is_idempotent() {
833 let d = TempDir::new().unwrap();
834 let p = d.path().join("state.sqlite");
835 let _ = open(&p).unwrap();
836 let _ = open(&p).unwrap();
837 }
838
839 #[test]
840 fn index_event_projects_evidence() {
841 let d = TempDir::new().unwrap();
842 let conn = open(d.path().join("s.sqlite")).unwrap();
843 let mut open_e = crate::event::Event::new(
844 "tj-e",
845 crate::event::EventType::Open,
846 crate::event::Author::User,
847 crate::event::Source::Cli,
848 "x".into(),
849 );
850 open_e.meta = serde_json::json!({"title": "T"});
851 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
852 index_event(&conn, &open_e).unwrap();
853
854 let mut ev = crate::event::Event::new(
855 "tj-e",
856 crate::event::EventType::Evidence,
857 crate::event::Author::Agent,
858 crate::event::Source::Chat,
859 "Hook startup measured at 12ms".into(),
860 );
861 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
862 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
863 index_event(&conn, &ev).unwrap();
864
865 let (text, strength): (String, String) = conn
866 .query_row(
867 "SELECT text, strength FROM evidence WHERE task_id=?1",
868 rusqlite::params!["tj-e"],
869 |r| Ok((r.get(0)?, r.get(1)?)),
870 )
871 .unwrap();
872 assert!(text.contains("12ms"));
873 assert_eq!(strength, "strong");
874 }
875
876 #[test]
877 fn supersede_event_marks_decision_superseded() {
878 let d = TempDir::new().unwrap();
879 let conn = open(d.path().join("s.sqlite")).unwrap();
880 let mut open_e = crate::event::Event::new(
881 "tj-s",
882 crate::event::EventType::Open,
883 crate::event::Author::User,
884 crate::event::Source::Cli,
885 "x".into(),
886 );
887 open_e.meta = serde_json::json!({"title": "T"});
888 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
889 index_event(&conn, &open_e).unwrap();
890
891 let dec = crate::event::Event::new(
892 "tj-s",
893 crate::event::EventType::Decision,
894 crate::event::Author::Agent,
895 crate::event::Source::Chat,
896 "Use TS".into(),
897 );
898 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
899 index_event(&conn, &dec).unwrap();
900
901 let mut sup = crate::event::Event::new(
902 "tj-s",
903 crate::event::EventType::Supersede,
904 crate::event::Author::Agent,
905 crate::event::Source::Chat,
906 "Replaced by Rust decision".into(),
907 );
908 sup.supersedes = Some(dec.event_id.clone());
909 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
910 index_event(&conn, &sup).unwrap();
911
912 let (status, by): (String, Option<String>) = conn
913 .query_row(
914 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
915 rusqlite::params![dec.event_id],
916 |r| Ok((r.get(0)?, r.get(1)?)),
917 )
918 .unwrap();
919 assert_eq!(status, "superseded");
920 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
921 }
922
923 #[test]
924 fn index_event_projects_decision_to_decisions_table() {
925 let d = TempDir::new().unwrap();
926 let conn = open(d.path().join("s.sqlite")).unwrap();
927
928 let mut open_e = crate::event::Event::new(
929 "tj-d",
930 crate::event::EventType::Open,
931 crate::event::Author::User,
932 crate::event::Source::Cli,
933 "x".into(),
934 );
935 open_e.meta = serde_json::json!({"title": "T"});
936 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
937 index_event(&conn, &open_e).unwrap();
938
939 let dec = crate::event::Event::new(
940 "tj-d",
941 crate::event::EventType::Decision,
942 crate::event::Author::Agent,
943 crate::event::Source::Chat,
944 "Adopt Rust".into(),
945 );
946 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
947 index_event(&conn, &dec).unwrap();
948
949 let (id, text, status): (String, String, String) = conn
950 .query_row(
951 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
952 rusqlite::params!["tj-d"],
953 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
954 )
955 .unwrap();
956 assert_eq!(id, dec.event_id);
957 assert_eq!(text, "Adopt Rust");
958 assert_eq!(status, "active");
959 }
960
961 #[test]
962 fn index_event_is_idempotent_no_search_fts_duplicates() {
963 let d = TempDir::new().unwrap();
964 let conn = open(d.path().join("s.sqlite")).unwrap();
965 let mut open_e = crate::event::Event::new(
966 "tj-id",
967 crate::event::EventType::Open,
968 crate::event::Author::User,
969 crate::event::Source::Cli,
970 "x".into(),
971 );
972 open_e.meta = serde_json::json!({"title": "Idempotent"});
973 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
974
975 index_event(&conn, &open_e).unwrap();
977 index_event(&conn, &open_e).unwrap();
978 index_event(&conn, &open_e).unwrap();
979
980 let n: i64 = conn
981 .query_row(
982 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
983 rusqlite::params![open_e.event_id],
984 |r| r.get(0),
985 )
986 .unwrap();
987 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
988 }
989
990 #[test]
991 fn list_all_projects_returns_hashes_from_state_dir() {
992 use std::fs::File;
993 let d = TempDir::new().unwrap();
994 let state_dir = d.path().join("state");
995 std::fs::create_dir_all(&state_dir).unwrap();
996 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
997 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
998 File::create(state_dir.join("not-a-project.txt")).unwrap();
999
1000 let mut hashes = list_all_projects(&state_dir).unwrap();
1001 hashes.sort();
1002 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1003 }
1004
1005 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1006 use std::io::Write;
1007 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1008 }
1009
1010 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1011 let mut e = crate::event::Event::new(
1012 task_id,
1013 crate::event::EventType::Open,
1014 crate::event::Author::User,
1015 crate::event::Source::Cli,
1016 "x".into(),
1017 );
1018 e.meta = serde_json::json!({"title": title});
1019 e
1020 }
1021
1022 #[test]
1023 fn ingest_new_events_picks_up_only_new_lines() {
1024 let d = TempDir::new().unwrap();
1025 let jsonl = d.path().join("events.jsonl");
1026 let db = d.path().join("s.sqlite");
1027 let project = "deadbeefdeadbeef";
1028
1029 let e1 = make_open_event("tj-i1", "first");
1030 let e2 = make_open_event("tj-i2", "second");
1031 let e3 = make_open_event("tj-i3", "third");
1032
1033 let mut f = std::fs::File::create(&jsonl).unwrap();
1034 write_event_line(&mut f, &e1);
1035 write_event_line(&mut f, &e2);
1036 write_event_line(&mut f, &e3);
1037 drop(f);
1038
1039 let conn = open(&db).unwrap();
1041 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1042 assert_eq!(n_first, 3);
1043
1044 let e4 = make_open_event("tj-i4", "fourth");
1046 let e5 = make_open_event("tj-i5", "fifth");
1047 let mut f = std::fs::OpenOptions::new()
1048 .append(true)
1049 .open(&jsonl)
1050 .unwrap();
1051 write_event_line(&mut f, &e4);
1052 write_event_line(&mut f, &e5);
1053 drop(f);
1054
1055 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1057 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1058
1059 let total: i64 = conn
1060 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1061 .unwrap();
1062 assert_eq!(total, 5);
1063
1064 let marker: String = conn
1065 .query_row(
1066 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1067 rusqlite::params![project],
1068 |r| r.get(0),
1069 )
1070 .unwrap();
1071 assert_eq!(marker, e5.event_id);
1072 }
1073
1074 #[test]
1075 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1076 let d = TempDir::new().unwrap();
1077 let jsonl = d.path().join("events.jsonl");
1078 let db = d.path().join("s.sqlite");
1079 let project = "feedfacefeedface";
1080
1081 let e1 = make_open_event("tj-r1", "first");
1082 let mut f = std::fs::File::create(&jsonl).unwrap();
1083 write_event_line(&mut f, &e1);
1084 drop(f);
1085
1086 let conn = open(&db).unwrap();
1087 ingest_new_events(&conn, &jsonl, project).unwrap();
1088
1089 let e2 = make_open_event("tj-r2", "after-corruption");
1092 let e3 = make_open_event("tj-r3", "after-corruption-2");
1093 let mut f = std::fs::File::create(&jsonl).unwrap();
1094 write_event_line(&mut f, &e2);
1095 write_event_line(&mut f, &e3);
1096 drop(f);
1097
1098 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1099 assert_eq!(n, 2, "missing marker must trigger full rebuild");
1100 }
1101
1102 #[test]
1103 fn rebuild_state_and_ingest_new_events_produce_same_state() {
1104 let d = TempDir::new().unwrap();
1105 let jsonl_a = d.path().join("a.jsonl");
1106 let jsonl_b = d.path().join("b.jsonl");
1107 let db_a = d.path().join("a.sqlite");
1108 let db_b = d.path().join("b.sqlite");
1109
1110 let events: Vec<_> = (0..5)
1111 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1112 .collect();
1113 for path in [&jsonl_a, &jsonl_b] {
1114 let mut f = std::fs::File::create(path).unwrap();
1115 for e in &events {
1116 write_event_line(&mut f, e);
1117 }
1118 }
1119
1120 let conn_a = open(&db_a).unwrap();
1121 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1122
1123 let conn_b = open(&db_b).unwrap();
1124 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1125
1126 assert_eq!(n_a, n_b);
1127 assert_eq!(n_a, 5);
1128
1129 for table in ["tasks", "events_index"] {
1130 let q = format!("SELECT COUNT(*) FROM {table}");
1131 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1132 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1133 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1134 }
1135 }
1136
1137 #[test]
1138 fn rebuild_state_skips_malformed_jsonl_lines() {
1139 use std::io::Write;
1140 let d = TempDir::new().unwrap();
1141 let events_path = d.path().join("events.jsonl");
1142 let db_path = d.path().join("s.sqlite");
1143
1144 let mut f = std::fs::File::create(&events_path).unwrap();
1145
1146 let mut e1 = crate::event::Event::new(
1147 "tj-skip",
1148 crate::event::EventType::Open,
1149 crate::event::Author::User,
1150 crate::event::Source::Cli,
1151 "x".into(),
1152 );
1153 e1.meta = serde_json::json!({"title": "Skip test"});
1154 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1155
1156 writeln!(f, "this is not a json event line").unwrap();
1158
1159 writeln!(f, "{{\"foo\": 1}}").unwrap();
1161
1162 let e3 = crate::event::Event::new(
1163 "tj-skip",
1164 crate::event::EventType::Decision,
1165 crate::event::Author::Agent,
1166 crate::event::Source::Chat,
1167 "Adopt Rust".into(),
1168 );
1169 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1170 drop(f);
1171
1172 let conn = open(&db_path).unwrap();
1173 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1174 .expect("rebuild_state must succeed despite malformed lines");
1175 assert_eq!(
1176 n, 2,
1177 "expected 2 valid events indexed (2 malformed skipped)"
1178 );
1179
1180 let indexed: i64 = conn
1181 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1182 .unwrap();
1183 assert_eq!(indexed, 2);
1184 }
1185
1186 #[test]
1187 fn rebuild_state_reads_jsonl_and_populates_db() {
1188 use std::io::Write;
1189 let d = TempDir::new().unwrap();
1190 let events_path = d.path().join("events.jsonl");
1191 let db_path = d.path().join("s.sqlite");
1192
1193 let mut f = std::fs::File::create(&events_path).unwrap();
1194 let mut e1 = crate::event::Event::new(
1195 "tj-9",
1196 crate::event::EventType::Open,
1197 crate::event::Author::User,
1198 crate::event::Source::Cli,
1199 "x".into(),
1200 );
1201 e1.meta = serde_json::json!({"title": "Nine"});
1202 let e2 = crate::event::Event::new(
1203 "tj-9",
1204 crate::event::EventType::Decision,
1205 crate::event::Author::Agent,
1206 crate::event::Source::Chat,
1207 "Adopt Rust".into(),
1208 );
1209 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1210 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1211 drop(f);
1212
1213 let conn = open(&db_path).unwrap();
1214 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1215 assert_eq!(n, 2);
1216
1217 let n: i64 = conn
1218 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1219 .unwrap();
1220 assert_eq!(n, 1);
1221 let n: i64 = conn
1222 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1223 .unwrap();
1224 assert_eq!(n, 2);
1225 }
1226
1227 #[test]
1228 fn index_event_writes_index_and_fts() {
1229 let d = TempDir::new().unwrap();
1230 let conn = open(d.path().join("s.sqlite")).unwrap();
1231 let mut open_e = crate::event::Event::new(
1232 "tj-1",
1233 crate::event::EventType::Open,
1234 crate::event::Author::User,
1235 crate::event::Source::Cli,
1236 "Title".into(),
1237 );
1238 open_e.meta = serde_json::json!({"title": "Title"});
1239 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1240 index_event(&conn, &open_e).unwrap();
1241
1242 let mut decision = crate::event::Event::new(
1243 "tj-1",
1244 crate::event::EventType::Decision,
1245 crate::event::Author::Agent,
1246 crate::event::Source::Chat,
1247 "Adopt Rust".into(),
1248 );
1249 decision.confidence = Some(0.92);
1250 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1251 index_event(&conn, &decision).unwrap();
1252
1253 let count: i64 = conn
1254 .query_row(
1255 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1256 rusqlite::params!["tj-1"],
1257 |r| r.get(0),
1258 )
1259 .unwrap();
1260 assert_eq!(count, 2);
1261
1262 let mut stmt = conn
1263 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1264 .unwrap();
1265 let hits: Vec<String> = stmt
1266 .query_map(rusqlite::params!["Rust"], |r| {
1267 let s: String = r.get(0)?;
1268 Ok(s)
1269 })
1270 .unwrap()
1271 .collect::<Result<Vec<_>, _>>()
1272 .unwrap();
1273 assert_eq!(hits.len(), 1);
1274 assert_eq!(hits[0], decision.event_id);
1275 }
1276
1277 #[test]
1278 fn upsert_task_from_open_event_inserts_row() {
1279 let d = TempDir::new().unwrap();
1280 let conn = open(d.path().join("s.sqlite")).unwrap();
1281
1282 let mut e = crate::event::Event::new(
1283 "tj-7f3a",
1284 crate::event::EventType::Open,
1285 crate::event::Author::User,
1286 crate::event::Source::Cli,
1287 "Add OAuth".into(),
1288 );
1289 e.meta = serde_json::json!({ "title": "Add OAuth login" });
1290
1291 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1292
1293 let (id, title, status): (String, String, String) = conn
1294 .query_row(
1295 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1296 ["tj-7f3a"],
1297 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1298 )
1299 .unwrap();
1300
1301 assert_eq!(id, "tj-7f3a");
1302 assert_eq!(title, "Add OAuth login");
1303 assert_eq!(status, "open");
1304 }
1305}