1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal TEXT;
87ALTER TABLE tasks ADD COLUMN outcome TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94const MIGRATIONS: &[Migration] = &[
97 Migration {
98 version: 1,
99 sql: MIGRATION_001,
100 },
101 Migration {
102 version: 2,
103 sql: MIGRATION_002,
104 },
105 Migration {
106 version: 3,
107 sql: MIGRATION_003,
108 },
109];
110
111fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
112 conn.execute_batch(
113 "CREATE TABLE IF NOT EXISTS schema_migrations (
114 version INTEGER PRIMARY KEY,
115 applied_at TEXT NOT NULL
116 )",
117 )
118 .context("create schema_migrations table")?;
119
120 let applied: HashSet<i64> = {
121 let mut stmt = conn
122 .prepare("SELECT version FROM schema_migrations")
123 .context("select applied versions")?;
124 let rows = stmt
125 .query_map([], |r| r.get::<_, i64>(0))
126 .context("iterate schema_migrations")?;
127 rows.collect::<rusqlite::Result<HashSet<_>>>()
128 .context("collect applied versions")?
129 };
130
131 for migration in MIGRATIONS {
132 if applied.contains(&migration.version) {
133 continue;
134 }
135 conn.execute_batch(migration.sql)
136 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
137 conn.execute(
138 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
139 rusqlite::params![
140 migration.version,
141 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
142 ],
143 )
144 .with_context(|| {
145 format!(
146 "record schema migration v{:03} as applied",
147 migration.version
148 )
149 })?;
150 }
151 Ok(())
152}
153
154use crate::event::{Event, EventType};
155
156pub fn upsert_task_from_event(
157 conn: &Connection,
158 event: &Event,
159 project_hash: &str,
160) -> anyhow::Result<()> {
161 match event.event_type {
162 EventType::Open => {
163 let title = event
164 .meta
165 .get("title")
166 .and_then(|v| v.as_str())
167 .unwrap_or(&event.text)
168 .to_string();
169 conn.execute(
170 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
171 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
172 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
173 rusqlite::params![event.task_id, title, project_hash, event.timestamp],
174 )?;
175 }
176 EventType::Close => {
177 conn.execute(
178 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
179 rusqlite::params![event.task_id, event.timestamp],
180 )?;
181 }
182 EventType::Reopen => {
183 conn.execute(
184 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
185 rusqlite::params![event.task_id, event.timestamp],
186 )?;
187 }
188 _ => {
189 conn.execute(
190 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
191 rusqlite::params![event.task_id, event.timestamp],
192 )?;
193 }
194 }
195 Ok(())
196}
197
198use std::io::BufRead;
199
200pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
201 let dir = state_dir.as_ref();
202 if !dir.exists() {
203 return Ok(vec![]);
204 }
205 let mut out = Vec::new();
206 for entry in std::fs::read_dir(dir)? {
207 let entry = entry?;
208 let path = entry.path();
209 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
210 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
211 out.push(stem.to_string());
212 }
213 }
214 }
215 Ok(out)
216}
217
218pub fn rebuild_state(
219 conn: &Connection,
220 jsonl_path: impl AsRef<Path>,
221 project_hash: &str,
222) -> anyhow::Result<usize> {
223 let f = std::fs::File::open(&jsonl_path)
224 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
225 let reader = std::io::BufReader::new(f);
226
227 let tx = conn.unchecked_transaction()?;
228 let mut count = 0;
229 let mut last_event_id: Option<String> = None;
230 for (i, line) in reader.lines().enumerate() {
231 let line = line.with_context(|| format!("read line {i}"))?;
232 if line.trim().is_empty() {
233 continue;
234 }
235 let event: Event = match serde_json::from_str(&line) {
239 Ok(e) => e,
240 Err(err) => {
241 tracing::warn!(
242 line_number = i + 1,
243 error = %err,
244 "skipping malformed JSONL line in rebuild_state"
245 );
246 continue;
247 }
248 };
249 upsert_task_from_event(&tx, &event, project_hash)?;
250 index_event(&tx, &event)?;
251 last_event_id = Some(event.event_id.clone());
252 count += 1;
253 }
254 if let Some(eid) = last_event_id.as_deref() {
255 record_last_indexed(&tx, project_hash, eid)?;
256 }
257 tx.commit()?;
258 Ok(count)
259}
260
261pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
266 let count: i64 = conn.query_row(
267 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
268 rusqlite::params![task_id],
269 |r| r.get(0),
270 )?;
271 Ok(count > 0)
272}
273
274pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
278 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
279 let mut rows = stmt.query(rusqlite::params![task_id])?;
280 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
281}
282
283pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
287 conn.execute(
288 "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
289 rusqlite::params![goal, task_id],
290 )
291 .with_context(|| format!("set goal for {task_id}"))?;
292 conn.execute(
295 "DELETE FROM task_pack_cache WHERE task_id = ?1",
296 rusqlite::params![task_id],
297 )?;
298 Ok(())
299}
300
301pub fn set_task_outcome(
305 conn: &Connection,
306 task_id: &str,
307 outcome: &str,
308 outcome_tag: Option<&str>,
309) -> anyhow::Result<()> {
310 conn.execute(
311 "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
312 rusqlite::params![outcome, outcome_tag, task_id],
313 )
314 .with_context(|| format!("set outcome for {task_id}"))?;
315 conn.execute(
316 "DELETE FROM task_pack_cache WHERE task_id = ?1",
317 rusqlite::params![task_id],
318 )?;
319 Ok(())
320}
321
322pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
327 let current: Option<String> = conn
328 .query_row(
329 "SELECT external FROM tasks WHERE task_id = ?1",
330 rusqlite::params![task_id],
331 |r| r.get::<_, Option<String>>(0),
332 )
333 .with_context(|| format!("read external for {task_id}"))?;
334 let next = match current {
335 Some(s) if !s.is_empty() => format!("{s},{reference}"),
336 _ => reference.to_string(),
337 };
338 conn.execute(
339 "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
340 rusqlite::params![next, task_id],
341 )?;
342 conn.execute(
343 "DELETE FROM task_pack_cache WHERE task_id = ?1",
344 rusqlite::params![task_id],
345 )?;
346 Ok(())
347}
348
349#[derive(Debug, Clone, Default)]
352pub struct TaskMetadata {
353 pub goal: Option<String>,
354 pub outcome: Option<String>,
355 pub outcome_tag: Option<String>,
356 pub external: Option<String>,
357}
358
359pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
360 let mut stmt =
361 conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
362 let mut rows = stmt.query(rusqlite::params![task_id])?;
363 Ok(match rows.next()? {
364 Some(r) => Some(TaskMetadata {
365 goal: r.get::<_, Option<String>>(0)?,
366 outcome: r.get::<_, Option<String>>(1)?,
367 outcome_tag: r.get::<_, Option<String>>(2)?,
368 external: r.get::<_, Option<String>>(3)?,
369 }),
370 None => None,
371 })
372}
373
374fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
378 let mut stmt =
379 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
380 let mut rows = stmt.query(rusqlite::params![project_hash])?;
381 if let Some(row) = rows.next()? {
382 Ok(Some(row.get::<_, String>(0)?))
383 } else {
384 Ok(None)
385 }
386}
387
388fn record_last_indexed(
389 conn: &Connection,
390 project_hash: &str,
391 event_id: &str,
392) -> anyhow::Result<()> {
393 conn.execute(
394 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
395 VALUES (?1, ?2, ?3)
396 ON CONFLICT(project_hash) DO UPDATE SET
397 last_indexed_event_id = excluded.last_indexed_event_id,
398 updated_at = excluded.updated_at",
399 rusqlite::params![
400 project_hash,
401 event_id,
402 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
403 ],
404 )?;
405 Ok(())
406}
407
408pub fn ingest_new_events(
418 conn: &Connection,
419 jsonl_path: impl AsRef<Path>,
420 project_hash: &str,
421) -> anyhow::Result<usize> {
422 let marker = match last_indexed_event_id(conn, project_hash)? {
423 Some(id) => id,
424 None => return rebuild_state(conn, jsonl_path, project_hash),
425 };
426
427 let f = std::fs::File::open(&jsonl_path)
428 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
429 let reader = std::io::BufReader::new(f);
430
431 let tx = conn.unchecked_transaction()?;
435 let mut found_marker = false;
436 let mut count = 0;
437 let mut last_event_id: Option<String> = None;
438 for (i, line) in reader.lines().enumerate() {
439 let line = line.with_context(|| format!("read line {i}"))?;
440 if line.trim().is_empty() {
441 continue;
442 }
443 let event: Event = match serde_json::from_str(&line) {
444 Ok(e) => e,
445 Err(err) => {
446 tracing::warn!(
447 line_number = i + 1,
448 error = %err,
449 "skipping malformed JSONL line in ingest_new_events"
450 );
451 continue;
452 }
453 };
454 if !found_marker {
455 if event.event_id == marker {
456 found_marker = true;
457 }
458 continue;
459 }
460 upsert_task_from_event(&tx, &event, project_hash)?;
461 index_event(&tx, &event)?;
462 last_event_id = Some(event.event_id.clone());
463 count += 1;
464 }
465
466 if !found_marker {
467 drop(tx);
469 tracing::warn!(
470 project_hash = project_hash,
471 marker = marker.as_str(),
472 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
473 );
474 return rebuild_state(conn, jsonl_path, project_hash);
475 }
476
477 if let Some(eid) = last_event_id.as_deref() {
478 record_last_indexed(&tx, project_hash, eid)?;
479 }
480 tx.commit()?;
481 Ok(count)
482}
483
484pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
485 let type_str = serde_json::to_value(event.event_type)?
486 .as_str()
487 .unwrap()
488 .to_string();
489 let status_str = serde_json::to_value(event.status)?
490 .as_str()
491 .unwrap()
492 .to_string();
493 conn.execute(
494 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status)
495 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
496 rusqlite::params![
497 event.event_id, event.task_id, type_str,
498 event.timestamp, event.confidence, status_str
499 ],
500 )?;
501 conn.execute(
503 "DELETE FROM search_fts WHERE event_id=?1",
504 rusqlite::params![event.event_id],
505 )?;
506 conn.execute(
507 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
508 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
509 )?;
510
511 if event.event_type == EventType::Decision {
512 conn.execute(
513 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
514 VALUES (?1, ?2, ?3, 'active')",
515 rusqlite::params![event.event_id, event.task_id, event.text],
516 )?;
517 }
518
519 if event.event_type == EventType::Supersede {
520 if let Some(target) = &event.supersedes {
521 conn.execute(
522 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
523 rusqlite::params![event.event_id, target],
524 )?;
525 }
526 }
527
528 if event.event_type == EventType::Evidence {
529 let strength_str = event
530 .evidence_strength
531 .map(|s| {
532 serde_json::to_value(s)
533 .unwrap()
534 .as_str()
535 .unwrap()
536 .to_string()
537 })
538 .unwrap_or_else(|| "medium".into());
539 conn.execute(
540 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
541 VALUES (?1, ?2, ?3, ?4)",
542 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
543 )?;
544 }
545
546 conn.execute(
548 "DELETE FROM task_pack_cache WHERE task_id=?1",
549 rusqlite::params![event.task_id],
550 )?;
551
552 Ok(())
553}
554
555pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
556 if let Some(parent) = path.as_ref().parent() {
557 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
558 }
559 let conn =
560 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
561 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
562 apply_migrations(&conn).context("apply schema migrations")?;
563 Ok(conn)
564}
565
566#[derive(Debug, Clone)]
570pub struct TaskRow {
571 pub task_id: String,
572 pub title: String,
573 pub status: String,
574 pub last_event_at: String,
575 pub event_count: usize,
576}
577
578pub fn list_tasks_by_project(
582 conn: &Connection,
583 project_hash: &str,
584) -> anyhow::Result<Vec<TaskRow>> {
585 let mut stmt = conn.prepare(
586 "SELECT t.task_id, t.title, t.status, t.last_event_at,
587 COALESCE(c.cnt, 0) AS event_count
588 FROM tasks t
589 LEFT JOIN (
590 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
591 ) c ON c.task_id = t.task_id
592 WHERE t.project_hash = ?1
593 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
594 )?;
595 let rows = stmt
596 .query_map(rusqlite::params![project_hash], |r| {
597 Ok(TaskRow {
598 task_id: r.get::<_, String>(0)?,
599 title: r.get::<_, String>(1)?,
600 status: r.get::<_, String>(2)?,
601 last_event_at: r.get::<_, String>(3)?,
602 event_count: r.get::<_, i64>(4)? as usize,
603 })
604 })?
605 .collect::<Result<Vec<_>, _>>()?;
606 Ok(rows)
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612 use tempfile::TempDir;
613
614 #[test]
615 fn task_exists_returns_true_for_known_id_false_otherwise() {
616 let d = TempDir::new().unwrap();
617 let conn = open(d.path().join("s.sqlite")).unwrap();
618
619 assert!(!task_exists(&conn, "tj-nope").unwrap());
620
621 let e = make_open_event("tj-yes", "Hello");
622 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
623 index_event(&conn, &e).unwrap();
624
625 assert!(task_exists(&conn, "tj-yes").unwrap());
626 assert!(!task_exists(&conn, "tj-nope").unwrap());
627 }
628
629 #[test]
630 fn fresh_db_runs_all_migrations() {
631 let d = TempDir::new().unwrap();
632 let p = d.path().join("state.sqlite");
633 let conn = open(&p).unwrap();
634
635 let applied: Vec<i64> = conn
636 .prepare("SELECT version FROM schema_migrations ORDER BY version")
637 .unwrap()
638 .query_map([], |r| r.get::<_, i64>(0))
639 .unwrap()
640 .collect::<Result<_, _>>()
641 .unwrap();
642 assert_eq!(
643 applied,
644 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
645 "every declared migration must be recorded"
646 );
647 }
648
649 #[test]
650 fn apply_migrations_is_idempotent_across_reopens() {
651 let d = TempDir::new().unwrap();
652 let p = d.path().join("state.sqlite");
653 let _ = open(&p).unwrap();
654 let _ = open(&p).unwrap();
655
656 let count: i64 = open(&p)
657 .unwrap()
658 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
659 .unwrap();
660 assert_eq!(
661 count,
662 MIGRATIONS.len() as i64,
663 "schema_migrations must contain exactly one row per declared migration after repeated opens"
664 );
665 }
666
667 #[test]
668 fn open_creates_all_tables() {
669 let d = TempDir::new().unwrap();
670 let p = d.path().join("state.sqlite");
671 let conn = open(&p).unwrap();
672
673 let names: Vec<String> = conn
674 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
675 .unwrap()
676 .query_map([], |r| r.get::<_, String>(0))
677 .unwrap()
678 .collect::<Result<_, _>>()
679 .unwrap();
680
681 for required in [
682 "decisions",
683 "events_index",
684 "evidence",
685 "task_pack_cache",
686 "tasks",
687 "search_fts",
688 ] {
689 assert!(
690 names.iter().any(|n| n == required),
691 "missing table {required}, have {names:?}"
692 );
693 }
694 }
695
696 #[test]
697 fn open_is_idempotent() {
698 let d = TempDir::new().unwrap();
699 let p = d.path().join("state.sqlite");
700 let _ = open(&p).unwrap();
701 let _ = open(&p).unwrap();
702 }
703
704 #[test]
705 fn index_event_projects_evidence() {
706 let d = TempDir::new().unwrap();
707 let conn = open(d.path().join("s.sqlite")).unwrap();
708 let mut open_e = crate::event::Event::new(
709 "tj-e",
710 crate::event::EventType::Open,
711 crate::event::Author::User,
712 crate::event::Source::Cli,
713 "x".into(),
714 );
715 open_e.meta = serde_json::json!({"title": "T"});
716 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
717 index_event(&conn, &open_e).unwrap();
718
719 let mut ev = crate::event::Event::new(
720 "tj-e",
721 crate::event::EventType::Evidence,
722 crate::event::Author::Agent,
723 crate::event::Source::Chat,
724 "Hook startup measured at 12ms".into(),
725 );
726 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
727 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
728 index_event(&conn, &ev).unwrap();
729
730 let (text, strength): (String, String) = conn
731 .query_row(
732 "SELECT text, strength FROM evidence WHERE task_id=?1",
733 rusqlite::params!["tj-e"],
734 |r| Ok((r.get(0)?, r.get(1)?)),
735 )
736 .unwrap();
737 assert!(text.contains("12ms"));
738 assert_eq!(strength, "strong");
739 }
740
741 #[test]
742 fn supersede_event_marks_decision_superseded() {
743 let d = TempDir::new().unwrap();
744 let conn = open(d.path().join("s.sqlite")).unwrap();
745 let mut open_e = crate::event::Event::new(
746 "tj-s",
747 crate::event::EventType::Open,
748 crate::event::Author::User,
749 crate::event::Source::Cli,
750 "x".into(),
751 );
752 open_e.meta = serde_json::json!({"title": "T"});
753 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
754 index_event(&conn, &open_e).unwrap();
755
756 let dec = crate::event::Event::new(
757 "tj-s",
758 crate::event::EventType::Decision,
759 crate::event::Author::Agent,
760 crate::event::Source::Chat,
761 "Use TS".into(),
762 );
763 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
764 index_event(&conn, &dec).unwrap();
765
766 let mut sup = crate::event::Event::new(
767 "tj-s",
768 crate::event::EventType::Supersede,
769 crate::event::Author::Agent,
770 crate::event::Source::Chat,
771 "Replaced by Rust decision".into(),
772 );
773 sup.supersedes = Some(dec.event_id.clone());
774 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
775 index_event(&conn, &sup).unwrap();
776
777 let (status, by): (String, Option<String>) = conn
778 .query_row(
779 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
780 rusqlite::params![dec.event_id],
781 |r| Ok((r.get(0)?, r.get(1)?)),
782 )
783 .unwrap();
784 assert_eq!(status, "superseded");
785 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
786 }
787
788 #[test]
789 fn index_event_projects_decision_to_decisions_table() {
790 let d = TempDir::new().unwrap();
791 let conn = open(d.path().join("s.sqlite")).unwrap();
792
793 let mut open_e = crate::event::Event::new(
794 "tj-d",
795 crate::event::EventType::Open,
796 crate::event::Author::User,
797 crate::event::Source::Cli,
798 "x".into(),
799 );
800 open_e.meta = serde_json::json!({"title": "T"});
801 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
802 index_event(&conn, &open_e).unwrap();
803
804 let dec = crate::event::Event::new(
805 "tj-d",
806 crate::event::EventType::Decision,
807 crate::event::Author::Agent,
808 crate::event::Source::Chat,
809 "Adopt Rust".into(),
810 );
811 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
812 index_event(&conn, &dec).unwrap();
813
814 let (id, text, status): (String, String, String) = conn
815 .query_row(
816 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
817 rusqlite::params!["tj-d"],
818 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
819 )
820 .unwrap();
821 assert_eq!(id, dec.event_id);
822 assert_eq!(text, "Adopt Rust");
823 assert_eq!(status, "active");
824 }
825
826 #[test]
827 fn index_event_is_idempotent_no_search_fts_duplicates() {
828 let d = TempDir::new().unwrap();
829 let conn = open(d.path().join("s.sqlite")).unwrap();
830 let mut open_e = crate::event::Event::new(
831 "tj-id",
832 crate::event::EventType::Open,
833 crate::event::Author::User,
834 crate::event::Source::Cli,
835 "x".into(),
836 );
837 open_e.meta = serde_json::json!({"title": "Idempotent"});
838 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
839
840 index_event(&conn, &open_e).unwrap();
842 index_event(&conn, &open_e).unwrap();
843 index_event(&conn, &open_e).unwrap();
844
845 let n: i64 = conn
846 .query_row(
847 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
848 rusqlite::params![open_e.event_id],
849 |r| r.get(0),
850 )
851 .unwrap();
852 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
853 }
854
855 #[test]
856 fn list_all_projects_returns_hashes_from_state_dir() {
857 use std::fs::File;
858 let d = TempDir::new().unwrap();
859 let state_dir = d.path().join("state");
860 std::fs::create_dir_all(&state_dir).unwrap();
861 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
862 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
863 File::create(state_dir.join("not-a-project.txt")).unwrap();
864
865 let mut hashes = list_all_projects(&state_dir).unwrap();
866 hashes.sort();
867 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
868 }
869
870 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
871 use std::io::Write;
872 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
873 }
874
875 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
876 let mut e = crate::event::Event::new(
877 task_id,
878 crate::event::EventType::Open,
879 crate::event::Author::User,
880 crate::event::Source::Cli,
881 "x".into(),
882 );
883 e.meta = serde_json::json!({"title": title});
884 e
885 }
886
887 #[test]
888 fn ingest_new_events_picks_up_only_new_lines() {
889 let d = TempDir::new().unwrap();
890 let jsonl = d.path().join("events.jsonl");
891 let db = d.path().join("s.sqlite");
892 let project = "deadbeefdeadbeef";
893
894 let e1 = make_open_event("tj-i1", "first");
895 let e2 = make_open_event("tj-i2", "second");
896 let e3 = make_open_event("tj-i3", "third");
897
898 let mut f = std::fs::File::create(&jsonl).unwrap();
899 write_event_line(&mut f, &e1);
900 write_event_line(&mut f, &e2);
901 write_event_line(&mut f, &e3);
902 drop(f);
903
904 let conn = open(&db).unwrap();
906 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
907 assert_eq!(n_first, 3);
908
909 let e4 = make_open_event("tj-i4", "fourth");
911 let e5 = make_open_event("tj-i5", "fifth");
912 let mut f = std::fs::OpenOptions::new()
913 .append(true)
914 .open(&jsonl)
915 .unwrap();
916 write_event_line(&mut f, &e4);
917 write_event_line(&mut f, &e5);
918 drop(f);
919
920 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
922 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
923
924 let total: i64 = conn
925 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
926 .unwrap();
927 assert_eq!(total, 5);
928
929 let marker: String = conn
930 .query_row(
931 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
932 rusqlite::params![project],
933 |r| r.get(0),
934 )
935 .unwrap();
936 assert_eq!(marker, e5.event_id);
937 }
938
939 #[test]
940 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
941 let d = TempDir::new().unwrap();
942 let jsonl = d.path().join("events.jsonl");
943 let db = d.path().join("s.sqlite");
944 let project = "feedfacefeedface";
945
946 let e1 = make_open_event("tj-r1", "first");
947 let mut f = std::fs::File::create(&jsonl).unwrap();
948 write_event_line(&mut f, &e1);
949 drop(f);
950
951 let conn = open(&db).unwrap();
952 ingest_new_events(&conn, &jsonl, project).unwrap();
953
954 let e2 = make_open_event("tj-r2", "after-corruption");
957 let e3 = make_open_event("tj-r3", "after-corruption-2");
958 let mut f = std::fs::File::create(&jsonl).unwrap();
959 write_event_line(&mut f, &e2);
960 write_event_line(&mut f, &e3);
961 drop(f);
962
963 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
964 assert_eq!(n, 2, "missing marker must trigger full rebuild");
965 }
966
967 #[test]
968 fn rebuild_state_and_ingest_new_events_produce_same_state() {
969 let d = TempDir::new().unwrap();
970 let jsonl_a = d.path().join("a.jsonl");
971 let jsonl_b = d.path().join("b.jsonl");
972 let db_a = d.path().join("a.sqlite");
973 let db_b = d.path().join("b.sqlite");
974
975 let events: Vec<_> = (0..5)
976 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
977 .collect();
978 for path in [&jsonl_a, &jsonl_b] {
979 let mut f = std::fs::File::create(path).unwrap();
980 for e in &events {
981 write_event_line(&mut f, e);
982 }
983 }
984
985 let conn_a = open(&db_a).unwrap();
986 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
987
988 let conn_b = open(&db_b).unwrap();
989 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
990
991 assert_eq!(n_a, n_b);
992 assert_eq!(n_a, 5);
993
994 for table in ["tasks", "events_index"] {
995 let q = format!("SELECT COUNT(*) FROM {table}");
996 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
997 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
998 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
999 }
1000 }
1001
1002 #[test]
1003 fn rebuild_state_skips_malformed_jsonl_lines() {
1004 use std::io::Write;
1005 let d = TempDir::new().unwrap();
1006 let events_path = d.path().join("events.jsonl");
1007 let db_path = d.path().join("s.sqlite");
1008
1009 let mut f = std::fs::File::create(&events_path).unwrap();
1010
1011 let mut e1 = crate::event::Event::new(
1012 "tj-skip",
1013 crate::event::EventType::Open,
1014 crate::event::Author::User,
1015 crate::event::Source::Cli,
1016 "x".into(),
1017 );
1018 e1.meta = serde_json::json!({"title": "Skip test"});
1019 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1020
1021 writeln!(f, "this is not a json event line").unwrap();
1023
1024 writeln!(f, "{{\"foo\": 1}}").unwrap();
1026
1027 let e3 = crate::event::Event::new(
1028 "tj-skip",
1029 crate::event::EventType::Decision,
1030 crate::event::Author::Agent,
1031 crate::event::Source::Chat,
1032 "Adopt Rust".into(),
1033 );
1034 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1035 drop(f);
1036
1037 let conn = open(&db_path).unwrap();
1038 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1039 .expect("rebuild_state must succeed despite malformed lines");
1040 assert_eq!(
1041 n, 2,
1042 "expected 2 valid events indexed (2 malformed skipped)"
1043 );
1044
1045 let indexed: i64 = conn
1046 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1047 .unwrap();
1048 assert_eq!(indexed, 2);
1049 }
1050
1051 #[test]
1052 fn rebuild_state_reads_jsonl_and_populates_db() {
1053 use std::io::Write;
1054 let d = TempDir::new().unwrap();
1055 let events_path = d.path().join("events.jsonl");
1056 let db_path = d.path().join("s.sqlite");
1057
1058 let mut f = std::fs::File::create(&events_path).unwrap();
1059 let mut e1 = crate::event::Event::new(
1060 "tj-9",
1061 crate::event::EventType::Open,
1062 crate::event::Author::User,
1063 crate::event::Source::Cli,
1064 "x".into(),
1065 );
1066 e1.meta = serde_json::json!({"title": "Nine"});
1067 let e2 = crate::event::Event::new(
1068 "tj-9",
1069 crate::event::EventType::Decision,
1070 crate::event::Author::Agent,
1071 crate::event::Source::Chat,
1072 "Adopt Rust".into(),
1073 );
1074 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1075 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1076 drop(f);
1077
1078 let conn = open(&db_path).unwrap();
1079 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1080 assert_eq!(n, 2);
1081
1082 let n: i64 = conn
1083 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1084 .unwrap();
1085 assert_eq!(n, 1);
1086 let n: i64 = conn
1087 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1088 .unwrap();
1089 assert_eq!(n, 2);
1090 }
1091
1092 #[test]
1093 fn index_event_writes_index_and_fts() {
1094 let d = TempDir::new().unwrap();
1095 let conn = open(d.path().join("s.sqlite")).unwrap();
1096 let mut open_e = crate::event::Event::new(
1097 "tj-1",
1098 crate::event::EventType::Open,
1099 crate::event::Author::User,
1100 crate::event::Source::Cli,
1101 "Title".into(),
1102 );
1103 open_e.meta = serde_json::json!({"title": "Title"});
1104 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1105 index_event(&conn, &open_e).unwrap();
1106
1107 let mut decision = crate::event::Event::new(
1108 "tj-1",
1109 crate::event::EventType::Decision,
1110 crate::event::Author::Agent,
1111 crate::event::Source::Chat,
1112 "Adopt Rust".into(),
1113 );
1114 decision.confidence = Some(0.92);
1115 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1116 index_event(&conn, &decision).unwrap();
1117
1118 let count: i64 = conn
1119 .query_row(
1120 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1121 rusqlite::params!["tj-1"],
1122 |r| r.get(0),
1123 )
1124 .unwrap();
1125 assert_eq!(count, 2);
1126
1127 let mut stmt = conn
1128 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1129 .unwrap();
1130 let hits: Vec<String> = stmt
1131 .query_map(rusqlite::params!["Rust"], |r| {
1132 let s: String = r.get(0)?;
1133 Ok(s)
1134 })
1135 .unwrap()
1136 .collect::<Result<Vec<_>, _>>()
1137 .unwrap();
1138 assert_eq!(hits.len(), 1);
1139 assert_eq!(hits[0], decision.event_id);
1140 }
1141
1142 #[test]
1143 fn upsert_task_from_open_event_inserts_row() {
1144 let d = TempDir::new().unwrap();
1145 let conn = open(d.path().join("s.sqlite")).unwrap();
1146
1147 let mut e = crate::event::Event::new(
1148 "tj-7f3a",
1149 crate::event::EventType::Open,
1150 crate::event::Author::User,
1151 crate::event::Source::Cli,
1152 "Add OAuth".into(),
1153 );
1154 e.meta = serde_json::json!({ "title": "Add OAuth login" });
1155
1156 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1157
1158 let (id, title, status): (String, String, String) = conn
1159 .query_row(
1160 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1161 ["tj-7f3a"],
1162 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1163 )
1164 .unwrap();
1165
1166 assert_eq!(id, "tj-7f3a");
1167 assert_eq!(title, "Add OAuth login");
1168 assert_eq!(status, "open");
1169 }
1170}