1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATIONS: &[Migration] = &[
84 Migration {
85 version: 1,
86 sql: MIGRATION_001,
87 },
88 Migration {
89 version: 2,
90 sql: MIGRATION_002,
91 },
92];
93
94fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
95 conn.execute_batch(
96 "CREATE TABLE IF NOT EXISTS schema_migrations (
97 version INTEGER PRIMARY KEY,
98 applied_at TEXT NOT NULL
99 )",
100 )
101 .context("create schema_migrations table")?;
102
103 let applied: HashSet<i64> = {
104 let mut stmt = conn
105 .prepare("SELECT version FROM schema_migrations")
106 .context("select applied versions")?;
107 let rows = stmt
108 .query_map([], |r| r.get::<_, i64>(0))
109 .context("iterate schema_migrations")?;
110 rows.collect::<rusqlite::Result<HashSet<_>>>()
111 .context("collect applied versions")?
112 };
113
114 for migration in MIGRATIONS {
115 if applied.contains(&migration.version) {
116 continue;
117 }
118 conn.execute_batch(migration.sql)
119 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
120 conn.execute(
121 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
122 rusqlite::params![
123 migration.version,
124 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
125 ],
126 )
127 .with_context(|| {
128 format!(
129 "record schema migration v{:03} as applied",
130 migration.version
131 )
132 })?;
133 }
134 Ok(())
135}
136
137use crate::event::{Event, EventType};
138
139pub fn upsert_task_from_event(
140 conn: &Connection,
141 event: &Event,
142 project_hash: &str,
143) -> anyhow::Result<()> {
144 match event.event_type {
145 EventType::Open => {
146 let title = event
147 .meta
148 .get("title")
149 .and_then(|v| v.as_str())
150 .unwrap_or(&event.text)
151 .to_string();
152 conn.execute(
153 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
154 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
155 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
156 rusqlite::params![event.task_id, title, project_hash, event.timestamp],
157 )?;
158 }
159 EventType::Close => {
160 conn.execute(
161 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
162 rusqlite::params![event.task_id, event.timestamp],
163 )?;
164 }
165 EventType::Reopen => {
166 conn.execute(
167 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
168 rusqlite::params![event.task_id, event.timestamp],
169 )?;
170 }
171 _ => {
172 conn.execute(
173 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
174 rusqlite::params![event.task_id, event.timestamp],
175 )?;
176 }
177 }
178 Ok(())
179}
180
181use std::io::BufRead;
182
183pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
184 let dir = state_dir.as_ref();
185 if !dir.exists() {
186 return Ok(vec![]);
187 }
188 let mut out = Vec::new();
189 for entry in std::fs::read_dir(dir)? {
190 let entry = entry?;
191 let path = entry.path();
192 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
193 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
194 out.push(stem.to_string());
195 }
196 }
197 }
198 Ok(out)
199}
200
201pub fn rebuild_state(
202 conn: &Connection,
203 jsonl_path: impl AsRef<Path>,
204 project_hash: &str,
205) -> anyhow::Result<usize> {
206 let f = std::fs::File::open(&jsonl_path)
207 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
208 let reader = std::io::BufReader::new(f);
209
210 let tx = conn.unchecked_transaction()?;
211 let mut count = 0;
212 let mut last_event_id: Option<String> = None;
213 for (i, line) in reader.lines().enumerate() {
214 let line = line.with_context(|| format!("read line {i}"))?;
215 if line.trim().is_empty() {
216 continue;
217 }
218 let event: Event = match serde_json::from_str(&line) {
222 Ok(e) => e,
223 Err(err) => {
224 tracing::warn!(
225 line_number = i + 1,
226 error = %err,
227 "skipping malformed JSONL line in rebuild_state"
228 );
229 continue;
230 }
231 };
232 upsert_task_from_event(&tx, &event, project_hash)?;
233 index_event(&tx, &event)?;
234 last_event_id = Some(event.event_id.clone());
235 count += 1;
236 }
237 if let Some(eid) = last_event_id.as_deref() {
238 record_last_indexed(&tx, project_hash, eid)?;
239 }
240 tx.commit()?;
241 Ok(count)
242}
243
244pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
249 let count: i64 = conn.query_row(
250 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
251 rusqlite::params![task_id],
252 |r| r.get(0),
253 )?;
254 Ok(count > 0)
255}
256
257pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
261 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
262 let mut rows = stmt.query(rusqlite::params![task_id])?;
263 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
264}
265
266fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
270 let mut stmt =
271 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
272 let mut rows = stmt.query(rusqlite::params![project_hash])?;
273 if let Some(row) = rows.next()? {
274 Ok(Some(row.get::<_, String>(0)?))
275 } else {
276 Ok(None)
277 }
278}
279
280fn record_last_indexed(
281 conn: &Connection,
282 project_hash: &str,
283 event_id: &str,
284) -> anyhow::Result<()> {
285 conn.execute(
286 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
287 VALUES (?1, ?2, ?3)
288 ON CONFLICT(project_hash) DO UPDATE SET
289 last_indexed_event_id = excluded.last_indexed_event_id,
290 updated_at = excluded.updated_at",
291 rusqlite::params![
292 project_hash,
293 event_id,
294 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
295 ],
296 )?;
297 Ok(())
298}
299
300pub fn ingest_new_events(
310 conn: &Connection,
311 jsonl_path: impl AsRef<Path>,
312 project_hash: &str,
313) -> anyhow::Result<usize> {
314 let marker = match last_indexed_event_id(conn, project_hash)? {
315 Some(id) => id,
316 None => return rebuild_state(conn, jsonl_path, project_hash),
317 };
318
319 let f = std::fs::File::open(&jsonl_path)
320 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
321 let reader = std::io::BufReader::new(f);
322
323 let tx = conn.unchecked_transaction()?;
327 let mut found_marker = false;
328 let mut count = 0;
329 let mut last_event_id: Option<String> = None;
330 for (i, line) in reader.lines().enumerate() {
331 let line = line.with_context(|| format!("read line {i}"))?;
332 if line.trim().is_empty() {
333 continue;
334 }
335 let event: Event = match serde_json::from_str(&line) {
336 Ok(e) => e,
337 Err(err) => {
338 tracing::warn!(
339 line_number = i + 1,
340 error = %err,
341 "skipping malformed JSONL line in ingest_new_events"
342 );
343 continue;
344 }
345 };
346 if !found_marker {
347 if event.event_id == marker {
348 found_marker = true;
349 }
350 continue;
351 }
352 upsert_task_from_event(&tx, &event, project_hash)?;
353 index_event(&tx, &event)?;
354 last_event_id = Some(event.event_id.clone());
355 count += 1;
356 }
357
358 if !found_marker {
359 drop(tx);
361 tracing::warn!(
362 project_hash = project_hash,
363 marker = marker.as_str(),
364 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
365 );
366 return rebuild_state(conn, jsonl_path, project_hash);
367 }
368
369 if let Some(eid) = last_event_id.as_deref() {
370 record_last_indexed(&tx, project_hash, eid)?;
371 }
372 tx.commit()?;
373 Ok(count)
374}
375
376pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
377 let type_str = serde_json::to_value(event.event_type)?
378 .as_str()
379 .unwrap()
380 .to_string();
381 let status_str = serde_json::to_value(event.status)?
382 .as_str()
383 .unwrap()
384 .to_string();
385 conn.execute(
386 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status)
387 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
388 rusqlite::params![
389 event.event_id, event.task_id, type_str,
390 event.timestamp, event.confidence, status_str
391 ],
392 )?;
393 conn.execute(
395 "DELETE FROM search_fts WHERE event_id=?1",
396 rusqlite::params![event.event_id],
397 )?;
398 conn.execute(
399 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
400 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
401 )?;
402
403 if event.event_type == EventType::Decision {
404 conn.execute(
405 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
406 VALUES (?1, ?2, ?3, 'active')",
407 rusqlite::params![event.event_id, event.task_id, event.text],
408 )?;
409 }
410
411 if event.event_type == EventType::Supersede {
412 if let Some(target) = &event.supersedes {
413 conn.execute(
414 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
415 rusqlite::params![event.event_id, target],
416 )?;
417 }
418 }
419
420 if event.event_type == EventType::Evidence {
421 let strength_str = event
422 .evidence_strength
423 .map(|s| {
424 serde_json::to_value(s)
425 .unwrap()
426 .as_str()
427 .unwrap()
428 .to_string()
429 })
430 .unwrap_or_else(|| "medium".into());
431 conn.execute(
432 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
433 VALUES (?1, ?2, ?3, ?4)",
434 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
435 )?;
436 }
437
438 conn.execute(
440 "DELETE FROM task_pack_cache WHERE task_id=?1",
441 rusqlite::params![event.task_id],
442 )?;
443
444 Ok(())
445}
446
447pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
448 if let Some(parent) = path.as_ref().parent() {
449 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
450 }
451 let conn =
452 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
453 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
454 apply_migrations(&conn).context("apply schema migrations")?;
455 Ok(conn)
456}
457
458#[derive(Debug, Clone)]
462pub struct TaskRow {
463 pub task_id: String,
464 pub title: String,
465 pub status: String,
466 pub last_event_at: String,
467 pub event_count: usize,
468}
469
470pub fn list_tasks_by_project(
474 conn: &Connection,
475 project_hash: &str,
476) -> anyhow::Result<Vec<TaskRow>> {
477 let mut stmt = conn.prepare(
478 "SELECT t.task_id, t.title, t.status, t.last_event_at,
479 COALESCE(c.cnt, 0) AS event_count
480 FROM tasks t
481 LEFT JOIN (
482 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
483 ) c ON c.task_id = t.task_id
484 WHERE t.project_hash = ?1
485 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
486 )?;
487 let rows = stmt
488 .query_map(rusqlite::params![project_hash], |r| {
489 Ok(TaskRow {
490 task_id: r.get::<_, String>(0)?,
491 title: r.get::<_, String>(1)?,
492 status: r.get::<_, String>(2)?,
493 last_event_at: r.get::<_, String>(3)?,
494 event_count: r.get::<_, i64>(4)? as usize,
495 })
496 })?
497 .collect::<Result<Vec<_>, _>>()?;
498 Ok(rows)
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use tempfile::TempDir;
505
506 #[test]
507 fn task_exists_returns_true_for_known_id_false_otherwise() {
508 let d = TempDir::new().unwrap();
509 let conn = open(d.path().join("s.sqlite")).unwrap();
510
511 assert!(!task_exists(&conn, "tj-nope").unwrap());
512
513 let e = make_open_event("tj-yes", "Hello");
514 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
515 index_event(&conn, &e).unwrap();
516
517 assert!(task_exists(&conn, "tj-yes").unwrap());
518 assert!(!task_exists(&conn, "tj-nope").unwrap());
519 }
520
521 #[test]
522 fn fresh_db_runs_all_migrations() {
523 let d = TempDir::new().unwrap();
524 let p = d.path().join("state.sqlite");
525 let conn = open(&p).unwrap();
526
527 let applied: Vec<i64> = conn
528 .prepare("SELECT version FROM schema_migrations ORDER BY version")
529 .unwrap()
530 .query_map([], |r| r.get::<_, i64>(0))
531 .unwrap()
532 .collect::<Result<_, _>>()
533 .unwrap();
534 assert_eq!(
535 applied,
536 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
537 "every declared migration must be recorded"
538 );
539 }
540
541 #[test]
542 fn apply_migrations_is_idempotent_across_reopens() {
543 let d = TempDir::new().unwrap();
544 let p = d.path().join("state.sqlite");
545 let _ = open(&p).unwrap();
546 let _ = open(&p).unwrap();
547
548 let count: i64 = open(&p)
549 .unwrap()
550 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
551 .unwrap();
552 assert_eq!(
553 count,
554 MIGRATIONS.len() as i64,
555 "schema_migrations must contain exactly one row per declared migration after repeated opens"
556 );
557 }
558
559 #[test]
560 fn open_creates_all_tables() {
561 let d = TempDir::new().unwrap();
562 let p = d.path().join("state.sqlite");
563 let conn = open(&p).unwrap();
564
565 let names: Vec<String> = conn
566 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
567 .unwrap()
568 .query_map([], |r| r.get::<_, String>(0))
569 .unwrap()
570 .collect::<Result<_, _>>()
571 .unwrap();
572
573 for required in [
574 "decisions",
575 "events_index",
576 "evidence",
577 "task_pack_cache",
578 "tasks",
579 "search_fts",
580 ] {
581 assert!(
582 names.iter().any(|n| n == required),
583 "missing table {required}, have {names:?}"
584 );
585 }
586 }
587
588 #[test]
589 fn open_is_idempotent() {
590 let d = TempDir::new().unwrap();
591 let p = d.path().join("state.sqlite");
592 let _ = open(&p).unwrap();
593 let _ = open(&p).unwrap();
594 }
595
596 #[test]
597 fn index_event_projects_evidence() {
598 let d = TempDir::new().unwrap();
599 let conn = open(d.path().join("s.sqlite")).unwrap();
600 let mut open_e = crate::event::Event::new(
601 "tj-e",
602 crate::event::EventType::Open,
603 crate::event::Author::User,
604 crate::event::Source::Cli,
605 "x".into(),
606 );
607 open_e.meta = serde_json::json!({"title": "T"});
608 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
609 index_event(&conn, &open_e).unwrap();
610
611 let mut ev = crate::event::Event::new(
612 "tj-e",
613 crate::event::EventType::Evidence,
614 crate::event::Author::Agent,
615 crate::event::Source::Chat,
616 "Hook startup measured at 12ms".into(),
617 );
618 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
619 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
620 index_event(&conn, &ev).unwrap();
621
622 let (text, strength): (String, String) = conn
623 .query_row(
624 "SELECT text, strength FROM evidence WHERE task_id=?1",
625 rusqlite::params!["tj-e"],
626 |r| Ok((r.get(0)?, r.get(1)?)),
627 )
628 .unwrap();
629 assert!(text.contains("12ms"));
630 assert_eq!(strength, "strong");
631 }
632
633 #[test]
634 fn supersede_event_marks_decision_superseded() {
635 let d = TempDir::new().unwrap();
636 let conn = open(d.path().join("s.sqlite")).unwrap();
637 let mut open_e = crate::event::Event::new(
638 "tj-s",
639 crate::event::EventType::Open,
640 crate::event::Author::User,
641 crate::event::Source::Cli,
642 "x".into(),
643 );
644 open_e.meta = serde_json::json!({"title": "T"});
645 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
646 index_event(&conn, &open_e).unwrap();
647
648 let dec = crate::event::Event::new(
649 "tj-s",
650 crate::event::EventType::Decision,
651 crate::event::Author::Agent,
652 crate::event::Source::Chat,
653 "Use TS".into(),
654 );
655 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
656 index_event(&conn, &dec).unwrap();
657
658 let mut sup = crate::event::Event::new(
659 "tj-s",
660 crate::event::EventType::Supersede,
661 crate::event::Author::Agent,
662 crate::event::Source::Chat,
663 "Replaced by Rust decision".into(),
664 );
665 sup.supersedes = Some(dec.event_id.clone());
666 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
667 index_event(&conn, &sup).unwrap();
668
669 let (status, by): (String, Option<String>) = conn
670 .query_row(
671 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
672 rusqlite::params![dec.event_id],
673 |r| Ok((r.get(0)?, r.get(1)?)),
674 )
675 .unwrap();
676 assert_eq!(status, "superseded");
677 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
678 }
679
680 #[test]
681 fn index_event_projects_decision_to_decisions_table() {
682 let d = TempDir::new().unwrap();
683 let conn = open(d.path().join("s.sqlite")).unwrap();
684
685 let mut open_e = crate::event::Event::new(
686 "tj-d",
687 crate::event::EventType::Open,
688 crate::event::Author::User,
689 crate::event::Source::Cli,
690 "x".into(),
691 );
692 open_e.meta = serde_json::json!({"title": "T"});
693 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
694 index_event(&conn, &open_e).unwrap();
695
696 let dec = crate::event::Event::new(
697 "tj-d",
698 crate::event::EventType::Decision,
699 crate::event::Author::Agent,
700 crate::event::Source::Chat,
701 "Adopt Rust".into(),
702 );
703 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
704 index_event(&conn, &dec).unwrap();
705
706 let (id, text, status): (String, String, String) = conn
707 .query_row(
708 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
709 rusqlite::params!["tj-d"],
710 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
711 )
712 .unwrap();
713 assert_eq!(id, dec.event_id);
714 assert_eq!(text, "Adopt Rust");
715 assert_eq!(status, "active");
716 }
717
718 #[test]
719 fn index_event_is_idempotent_no_search_fts_duplicates() {
720 let d = TempDir::new().unwrap();
721 let conn = open(d.path().join("s.sqlite")).unwrap();
722 let mut open_e = crate::event::Event::new(
723 "tj-id",
724 crate::event::EventType::Open,
725 crate::event::Author::User,
726 crate::event::Source::Cli,
727 "x".into(),
728 );
729 open_e.meta = serde_json::json!({"title": "Idempotent"});
730 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
731
732 index_event(&conn, &open_e).unwrap();
734 index_event(&conn, &open_e).unwrap();
735 index_event(&conn, &open_e).unwrap();
736
737 let n: i64 = conn
738 .query_row(
739 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
740 rusqlite::params![open_e.event_id],
741 |r| r.get(0),
742 )
743 .unwrap();
744 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
745 }
746
747 #[test]
748 fn list_all_projects_returns_hashes_from_state_dir() {
749 use std::fs::File;
750 let d = TempDir::new().unwrap();
751 let state_dir = d.path().join("state");
752 std::fs::create_dir_all(&state_dir).unwrap();
753 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
754 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
755 File::create(state_dir.join("not-a-project.txt")).unwrap();
756
757 let mut hashes = list_all_projects(&state_dir).unwrap();
758 hashes.sort();
759 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
760 }
761
762 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
763 use std::io::Write;
764 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
765 }
766
767 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
768 let mut e = crate::event::Event::new(
769 task_id,
770 crate::event::EventType::Open,
771 crate::event::Author::User,
772 crate::event::Source::Cli,
773 "x".into(),
774 );
775 e.meta = serde_json::json!({"title": title});
776 e
777 }
778
779 #[test]
780 fn ingest_new_events_picks_up_only_new_lines() {
781 let d = TempDir::new().unwrap();
782 let jsonl = d.path().join("events.jsonl");
783 let db = d.path().join("s.sqlite");
784 let project = "deadbeefdeadbeef";
785
786 let e1 = make_open_event("tj-i1", "first");
787 let e2 = make_open_event("tj-i2", "second");
788 let e3 = make_open_event("tj-i3", "third");
789
790 let mut f = std::fs::File::create(&jsonl).unwrap();
791 write_event_line(&mut f, &e1);
792 write_event_line(&mut f, &e2);
793 write_event_line(&mut f, &e3);
794 drop(f);
795
796 let conn = open(&db).unwrap();
798 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
799 assert_eq!(n_first, 3);
800
801 let e4 = make_open_event("tj-i4", "fourth");
803 let e5 = make_open_event("tj-i5", "fifth");
804 let mut f = std::fs::OpenOptions::new()
805 .append(true)
806 .open(&jsonl)
807 .unwrap();
808 write_event_line(&mut f, &e4);
809 write_event_line(&mut f, &e5);
810 drop(f);
811
812 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
814 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
815
816 let total: i64 = conn
817 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
818 .unwrap();
819 assert_eq!(total, 5);
820
821 let marker: String = conn
822 .query_row(
823 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
824 rusqlite::params![project],
825 |r| r.get(0),
826 )
827 .unwrap();
828 assert_eq!(marker, e5.event_id);
829 }
830
831 #[test]
832 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
833 let d = TempDir::new().unwrap();
834 let jsonl = d.path().join("events.jsonl");
835 let db = d.path().join("s.sqlite");
836 let project = "feedfacefeedface";
837
838 let e1 = make_open_event("tj-r1", "first");
839 let mut f = std::fs::File::create(&jsonl).unwrap();
840 write_event_line(&mut f, &e1);
841 drop(f);
842
843 let conn = open(&db).unwrap();
844 ingest_new_events(&conn, &jsonl, project).unwrap();
845
846 let e2 = make_open_event("tj-r2", "after-corruption");
849 let e3 = make_open_event("tj-r3", "after-corruption-2");
850 let mut f = std::fs::File::create(&jsonl).unwrap();
851 write_event_line(&mut f, &e2);
852 write_event_line(&mut f, &e3);
853 drop(f);
854
855 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
856 assert_eq!(n, 2, "missing marker must trigger full rebuild");
857 }
858
859 #[test]
860 fn rebuild_state_and_ingest_new_events_produce_same_state() {
861 let d = TempDir::new().unwrap();
862 let jsonl_a = d.path().join("a.jsonl");
863 let jsonl_b = d.path().join("b.jsonl");
864 let db_a = d.path().join("a.sqlite");
865 let db_b = d.path().join("b.sqlite");
866
867 let events: Vec<_> = (0..5)
868 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
869 .collect();
870 for path in [&jsonl_a, &jsonl_b] {
871 let mut f = std::fs::File::create(path).unwrap();
872 for e in &events {
873 write_event_line(&mut f, e);
874 }
875 }
876
877 let conn_a = open(&db_a).unwrap();
878 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
879
880 let conn_b = open(&db_b).unwrap();
881 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
882
883 assert_eq!(n_a, n_b);
884 assert_eq!(n_a, 5);
885
886 for table in ["tasks", "events_index"] {
887 let q = format!("SELECT COUNT(*) FROM {table}");
888 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
889 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
890 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
891 }
892 }
893
894 #[test]
895 fn rebuild_state_skips_malformed_jsonl_lines() {
896 use std::io::Write;
897 let d = TempDir::new().unwrap();
898 let events_path = d.path().join("events.jsonl");
899 let db_path = d.path().join("s.sqlite");
900
901 let mut f = std::fs::File::create(&events_path).unwrap();
902
903 let mut e1 = crate::event::Event::new(
904 "tj-skip",
905 crate::event::EventType::Open,
906 crate::event::Author::User,
907 crate::event::Source::Cli,
908 "x".into(),
909 );
910 e1.meta = serde_json::json!({"title": "Skip test"});
911 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
912
913 writeln!(f, "this is not a json event line").unwrap();
915
916 writeln!(f, "{{\"foo\": 1}}").unwrap();
918
919 let e3 = crate::event::Event::new(
920 "tj-skip",
921 crate::event::EventType::Decision,
922 crate::event::Author::Agent,
923 crate::event::Source::Chat,
924 "Adopt Rust".into(),
925 );
926 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
927 drop(f);
928
929 let conn = open(&db_path).unwrap();
930 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
931 .expect("rebuild_state must succeed despite malformed lines");
932 assert_eq!(
933 n, 2,
934 "expected 2 valid events indexed (2 malformed skipped)"
935 );
936
937 let indexed: i64 = conn
938 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
939 .unwrap();
940 assert_eq!(indexed, 2);
941 }
942
943 #[test]
944 fn rebuild_state_reads_jsonl_and_populates_db() {
945 use std::io::Write;
946 let d = TempDir::new().unwrap();
947 let events_path = d.path().join("events.jsonl");
948 let db_path = d.path().join("s.sqlite");
949
950 let mut f = std::fs::File::create(&events_path).unwrap();
951 let mut e1 = crate::event::Event::new(
952 "tj-9",
953 crate::event::EventType::Open,
954 crate::event::Author::User,
955 crate::event::Source::Cli,
956 "x".into(),
957 );
958 e1.meta = serde_json::json!({"title": "Nine"});
959 let e2 = crate::event::Event::new(
960 "tj-9",
961 crate::event::EventType::Decision,
962 crate::event::Author::Agent,
963 crate::event::Source::Chat,
964 "Adopt Rust".into(),
965 );
966 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
967 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
968 drop(f);
969
970 let conn = open(&db_path).unwrap();
971 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
972 assert_eq!(n, 2);
973
974 let n: i64 = conn
975 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
976 .unwrap();
977 assert_eq!(n, 1);
978 let n: i64 = conn
979 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
980 .unwrap();
981 assert_eq!(n, 2);
982 }
983
984 #[test]
985 fn index_event_writes_index_and_fts() {
986 let d = TempDir::new().unwrap();
987 let conn = open(d.path().join("s.sqlite")).unwrap();
988 let mut open_e = crate::event::Event::new(
989 "tj-1",
990 crate::event::EventType::Open,
991 crate::event::Author::User,
992 crate::event::Source::Cli,
993 "Title".into(),
994 );
995 open_e.meta = serde_json::json!({"title": "Title"});
996 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
997 index_event(&conn, &open_e).unwrap();
998
999 let mut decision = crate::event::Event::new(
1000 "tj-1",
1001 crate::event::EventType::Decision,
1002 crate::event::Author::Agent,
1003 crate::event::Source::Chat,
1004 "Adopt Rust".into(),
1005 );
1006 decision.confidence = Some(0.92);
1007 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1008 index_event(&conn, &decision).unwrap();
1009
1010 let count: i64 = conn
1011 .query_row(
1012 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1013 rusqlite::params!["tj-1"],
1014 |r| r.get(0),
1015 )
1016 .unwrap();
1017 assert_eq!(count, 2);
1018
1019 let mut stmt = conn
1020 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1021 .unwrap();
1022 let hits: Vec<String> = stmt
1023 .query_map(rusqlite::params!["Rust"], |r| {
1024 let s: String = r.get(0)?;
1025 Ok(s)
1026 })
1027 .unwrap()
1028 .collect::<Result<Vec<_>, _>>()
1029 .unwrap();
1030 assert_eq!(hits.len(), 1);
1031 assert_eq!(hits[0], decision.event_id);
1032 }
1033
1034 #[test]
1035 fn upsert_task_from_open_event_inserts_row() {
1036 let d = TempDir::new().unwrap();
1037 let conn = open(d.path().join("s.sqlite")).unwrap();
1038
1039 let mut e = crate::event::Event::new(
1040 "tj-7f3a",
1041 crate::event::EventType::Open,
1042 crate::event::Author::User,
1043 crate::event::Source::Cli,
1044 "Add OAuth".into(),
1045 );
1046 e.meta = serde_json::json!({ "title": "Add OAuth login" });
1047
1048 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1049
1050 let (id, title, status): (String, String, String) = conn
1051 .query_row(
1052 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1053 ["tj-7f3a"],
1054 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1055 )
1056 .unwrap();
1057
1058 assert_eq!(id, "tj-7f3a");
1059 assert_eq!(title, "Add OAuth login");
1060 assert_eq!(status, "open");
1061 }
1062}