1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATIONS: &[Migration] = &[
84 Migration {
85 version: 1,
86 sql: MIGRATION_001,
87 },
88 Migration {
89 version: 2,
90 sql: MIGRATION_002,
91 },
92];
93
94fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
95 conn.execute_batch(
96 "CREATE TABLE IF NOT EXISTS schema_migrations (
97 version INTEGER PRIMARY KEY,
98 applied_at TEXT NOT NULL
99 )",
100 )
101 .context("create schema_migrations table")?;
102
103 let applied: HashSet<i64> = {
104 let mut stmt = conn
105 .prepare("SELECT version FROM schema_migrations")
106 .context("select applied versions")?;
107 let rows = stmt
108 .query_map([], |r| r.get::<_, i64>(0))
109 .context("iterate schema_migrations")?;
110 rows.collect::<rusqlite::Result<HashSet<_>>>()
111 .context("collect applied versions")?
112 };
113
114 for migration in MIGRATIONS {
115 if applied.contains(&migration.version) {
116 continue;
117 }
118 conn.execute_batch(migration.sql)
119 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
120 conn.execute(
121 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
122 rusqlite::params![
123 migration.version,
124 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
125 ],
126 )
127 .with_context(|| {
128 format!(
129 "record schema migration v{:03} as applied",
130 migration.version
131 )
132 })?;
133 }
134 Ok(())
135}
136
137use crate::event::{Event, EventType};
138
139pub fn upsert_task_from_event(
140 conn: &Connection,
141 event: &Event,
142 project_hash: &str,
143) -> anyhow::Result<()> {
144 match event.event_type {
145 EventType::Open => {
146 let title = event
147 .meta
148 .get("title")
149 .and_then(|v| v.as_str())
150 .unwrap_or(&event.text)
151 .to_string();
152 conn.execute(
153 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
154 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
155 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
156 rusqlite::params![event.task_id, title, project_hash, event.timestamp],
157 )?;
158 }
159 EventType::Close => {
160 conn.execute(
161 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
162 rusqlite::params![event.task_id, event.timestamp],
163 )?;
164 }
165 EventType::Reopen => {
166 conn.execute(
167 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
168 rusqlite::params![event.task_id, event.timestamp],
169 )?;
170 }
171 _ => {
172 conn.execute(
173 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
174 rusqlite::params![event.task_id, event.timestamp],
175 )?;
176 }
177 }
178 Ok(())
179}
180
181use std::io::BufRead;
182
183pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
184 let dir = state_dir.as_ref();
185 if !dir.exists() {
186 return Ok(vec![]);
187 }
188 let mut out = Vec::new();
189 for entry in std::fs::read_dir(dir)? {
190 let entry = entry?;
191 let path = entry.path();
192 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
193 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
194 out.push(stem.to_string());
195 }
196 }
197 }
198 Ok(out)
199}
200
201pub fn rebuild_state(
202 conn: &Connection,
203 jsonl_path: impl AsRef<Path>,
204 project_hash: &str,
205) -> anyhow::Result<usize> {
206 let f = std::fs::File::open(&jsonl_path)
207 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
208 let reader = std::io::BufReader::new(f);
209
210 let tx = conn.unchecked_transaction()?;
211 let mut count = 0;
212 let mut last_event_id: Option<String> = None;
213 for (i, line) in reader.lines().enumerate() {
214 let line = line.with_context(|| format!("read line {i}"))?;
215 if line.trim().is_empty() {
216 continue;
217 }
218 let event: Event = match serde_json::from_str(&line) {
222 Ok(e) => e,
223 Err(err) => {
224 tracing::warn!(
225 line_number = i + 1,
226 error = %err,
227 "skipping malformed JSONL line in rebuild_state"
228 );
229 continue;
230 }
231 };
232 upsert_task_from_event(&tx, &event, project_hash)?;
233 index_event(&tx, &event)?;
234 last_event_id = Some(event.event_id.clone());
235 count += 1;
236 }
237 if let Some(eid) = last_event_id.as_deref() {
238 record_last_indexed(&tx, project_hash, eid)?;
239 }
240 tx.commit()?;
241 Ok(count)
242}
243
244pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
249 let count: i64 = conn.query_row(
250 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
251 rusqlite::params![task_id],
252 |r| r.get(0),
253 )?;
254 Ok(count > 0)
255}
256
257fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
261 let mut stmt =
262 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
263 let mut rows = stmt.query(rusqlite::params![project_hash])?;
264 if let Some(row) = rows.next()? {
265 Ok(Some(row.get::<_, String>(0)?))
266 } else {
267 Ok(None)
268 }
269}
270
271fn record_last_indexed(
272 conn: &Connection,
273 project_hash: &str,
274 event_id: &str,
275) -> anyhow::Result<()> {
276 conn.execute(
277 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
278 VALUES (?1, ?2, ?3)
279 ON CONFLICT(project_hash) DO UPDATE SET
280 last_indexed_event_id = excluded.last_indexed_event_id,
281 updated_at = excluded.updated_at",
282 rusqlite::params![
283 project_hash,
284 event_id,
285 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
286 ],
287 )?;
288 Ok(())
289}
290
291pub fn ingest_new_events(
301 conn: &Connection,
302 jsonl_path: impl AsRef<Path>,
303 project_hash: &str,
304) -> anyhow::Result<usize> {
305 let marker = match last_indexed_event_id(conn, project_hash)? {
306 Some(id) => id,
307 None => return rebuild_state(conn, jsonl_path, project_hash),
308 };
309
310 let f = std::fs::File::open(&jsonl_path)
311 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
312 let reader = std::io::BufReader::new(f);
313
314 let tx = conn.unchecked_transaction()?;
318 let mut found_marker = false;
319 let mut count = 0;
320 let mut last_event_id: Option<String> = None;
321 for (i, line) in reader.lines().enumerate() {
322 let line = line.with_context(|| format!("read line {i}"))?;
323 if line.trim().is_empty() {
324 continue;
325 }
326 let event: Event = match serde_json::from_str(&line) {
327 Ok(e) => e,
328 Err(err) => {
329 tracing::warn!(
330 line_number = i + 1,
331 error = %err,
332 "skipping malformed JSONL line in ingest_new_events"
333 );
334 continue;
335 }
336 };
337 if !found_marker {
338 if event.event_id == marker {
339 found_marker = true;
340 }
341 continue;
342 }
343 upsert_task_from_event(&tx, &event, project_hash)?;
344 index_event(&tx, &event)?;
345 last_event_id = Some(event.event_id.clone());
346 count += 1;
347 }
348
349 if !found_marker {
350 drop(tx);
352 tracing::warn!(
353 project_hash = project_hash,
354 marker = marker.as_str(),
355 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
356 );
357 return rebuild_state(conn, jsonl_path, project_hash);
358 }
359
360 if let Some(eid) = last_event_id.as_deref() {
361 record_last_indexed(&tx, project_hash, eid)?;
362 }
363 tx.commit()?;
364 Ok(count)
365}
366
367pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
368 let type_str = serde_json::to_value(event.event_type)?
369 .as_str()
370 .unwrap()
371 .to_string();
372 let status_str = serde_json::to_value(event.status)?
373 .as_str()
374 .unwrap()
375 .to_string();
376 conn.execute(
377 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status)
378 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
379 rusqlite::params![
380 event.event_id, event.task_id, type_str,
381 event.timestamp, event.confidence, status_str
382 ],
383 )?;
384 conn.execute(
386 "DELETE FROM search_fts WHERE event_id=?1",
387 rusqlite::params![event.event_id],
388 )?;
389 conn.execute(
390 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
391 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
392 )?;
393
394 if event.event_type == EventType::Decision {
395 conn.execute(
396 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
397 VALUES (?1, ?2, ?3, 'active')",
398 rusqlite::params![event.event_id, event.task_id, event.text],
399 )?;
400 }
401
402 if event.event_type == EventType::Supersede {
403 if let Some(target) = &event.supersedes {
404 conn.execute(
405 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
406 rusqlite::params![event.event_id, target],
407 )?;
408 }
409 }
410
411 if event.event_type == EventType::Evidence {
412 let strength_str = event
413 .evidence_strength
414 .map(|s| {
415 serde_json::to_value(s)
416 .unwrap()
417 .as_str()
418 .unwrap()
419 .to_string()
420 })
421 .unwrap_or_else(|| "medium".into());
422 conn.execute(
423 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
424 VALUES (?1, ?2, ?3, ?4)",
425 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
426 )?;
427 }
428
429 conn.execute(
431 "DELETE FROM task_pack_cache WHERE task_id=?1",
432 rusqlite::params![event.task_id],
433 )?;
434
435 Ok(())
436}
437
438pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
439 if let Some(parent) = path.as_ref().parent() {
440 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
441 }
442 let conn =
443 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
444 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
445 apply_migrations(&conn).context("apply schema migrations")?;
446 Ok(conn)
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use tempfile::TempDir;
453
454 #[test]
455 fn task_exists_returns_true_for_known_id_false_otherwise() {
456 let d = TempDir::new().unwrap();
457 let conn = open(d.path().join("s.sqlite")).unwrap();
458
459 assert!(!task_exists(&conn, "tj-nope").unwrap());
460
461 let e = make_open_event("tj-yes", "Hello");
462 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
463 index_event(&conn, &e).unwrap();
464
465 assert!(task_exists(&conn, "tj-yes").unwrap());
466 assert!(!task_exists(&conn, "tj-nope").unwrap());
467 }
468
469 #[test]
470 fn fresh_db_runs_all_migrations() {
471 let d = TempDir::new().unwrap();
472 let p = d.path().join("state.sqlite");
473 let conn = open(&p).unwrap();
474
475 let applied: Vec<i64> = conn
476 .prepare("SELECT version FROM schema_migrations ORDER BY version")
477 .unwrap()
478 .query_map([], |r| r.get::<_, i64>(0))
479 .unwrap()
480 .collect::<Result<_, _>>()
481 .unwrap();
482 assert_eq!(
483 applied,
484 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
485 "every declared migration must be recorded"
486 );
487 }
488
489 #[test]
490 fn apply_migrations_is_idempotent_across_reopens() {
491 let d = TempDir::new().unwrap();
492 let p = d.path().join("state.sqlite");
493 let _ = open(&p).unwrap();
494 let _ = open(&p).unwrap();
495
496 let count: i64 = open(&p)
497 .unwrap()
498 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
499 .unwrap();
500 assert_eq!(
501 count,
502 MIGRATIONS.len() as i64,
503 "schema_migrations must contain exactly one row per declared migration after repeated opens"
504 );
505 }
506
507 #[test]
508 fn open_creates_all_tables() {
509 let d = TempDir::new().unwrap();
510 let p = d.path().join("state.sqlite");
511 let conn = open(&p).unwrap();
512
513 let names: Vec<String> = conn
514 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
515 .unwrap()
516 .query_map([], |r| r.get::<_, String>(0))
517 .unwrap()
518 .collect::<Result<_, _>>()
519 .unwrap();
520
521 for required in [
522 "decisions",
523 "events_index",
524 "evidence",
525 "task_pack_cache",
526 "tasks",
527 "search_fts",
528 ] {
529 assert!(
530 names.iter().any(|n| n == required),
531 "missing table {required}, have {names:?}"
532 );
533 }
534 }
535
536 #[test]
537 fn open_is_idempotent() {
538 let d = TempDir::new().unwrap();
539 let p = d.path().join("state.sqlite");
540 let _ = open(&p).unwrap();
541 let _ = open(&p).unwrap();
542 }
543
544 #[test]
545 fn index_event_projects_evidence() {
546 let d = TempDir::new().unwrap();
547 let conn = open(d.path().join("s.sqlite")).unwrap();
548 let mut open_e = crate::event::Event::new(
549 "tj-e",
550 crate::event::EventType::Open,
551 crate::event::Author::User,
552 crate::event::Source::Cli,
553 "x".into(),
554 );
555 open_e.meta = serde_json::json!({"title": "T"});
556 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
557 index_event(&conn, &open_e).unwrap();
558
559 let mut ev = crate::event::Event::new(
560 "tj-e",
561 crate::event::EventType::Evidence,
562 crate::event::Author::Agent,
563 crate::event::Source::Chat,
564 "Hook startup measured at 12ms".into(),
565 );
566 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
567 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
568 index_event(&conn, &ev).unwrap();
569
570 let (text, strength): (String, String) = conn
571 .query_row(
572 "SELECT text, strength FROM evidence WHERE task_id=?1",
573 rusqlite::params!["tj-e"],
574 |r| Ok((r.get(0)?, r.get(1)?)),
575 )
576 .unwrap();
577 assert!(text.contains("12ms"));
578 assert_eq!(strength, "strong");
579 }
580
581 #[test]
582 fn supersede_event_marks_decision_superseded() {
583 let d = TempDir::new().unwrap();
584 let conn = open(d.path().join("s.sqlite")).unwrap();
585 let mut open_e = crate::event::Event::new(
586 "tj-s",
587 crate::event::EventType::Open,
588 crate::event::Author::User,
589 crate::event::Source::Cli,
590 "x".into(),
591 );
592 open_e.meta = serde_json::json!({"title": "T"});
593 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
594 index_event(&conn, &open_e).unwrap();
595
596 let dec = crate::event::Event::new(
597 "tj-s",
598 crate::event::EventType::Decision,
599 crate::event::Author::Agent,
600 crate::event::Source::Chat,
601 "Use TS".into(),
602 );
603 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
604 index_event(&conn, &dec).unwrap();
605
606 let mut sup = crate::event::Event::new(
607 "tj-s",
608 crate::event::EventType::Supersede,
609 crate::event::Author::Agent,
610 crate::event::Source::Chat,
611 "Replaced by Rust decision".into(),
612 );
613 sup.supersedes = Some(dec.event_id.clone());
614 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
615 index_event(&conn, &sup).unwrap();
616
617 let (status, by): (String, Option<String>) = conn
618 .query_row(
619 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
620 rusqlite::params![dec.event_id],
621 |r| Ok((r.get(0)?, r.get(1)?)),
622 )
623 .unwrap();
624 assert_eq!(status, "superseded");
625 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
626 }
627
628 #[test]
629 fn index_event_projects_decision_to_decisions_table() {
630 let d = TempDir::new().unwrap();
631 let conn = open(d.path().join("s.sqlite")).unwrap();
632
633 let mut open_e = crate::event::Event::new(
634 "tj-d",
635 crate::event::EventType::Open,
636 crate::event::Author::User,
637 crate::event::Source::Cli,
638 "x".into(),
639 );
640 open_e.meta = serde_json::json!({"title": "T"});
641 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
642 index_event(&conn, &open_e).unwrap();
643
644 let dec = crate::event::Event::new(
645 "tj-d",
646 crate::event::EventType::Decision,
647 crate::event::Author::Agent,
648 crate::event::Source::Chat,
649 "Adopt Rust".into(),
650 );
651 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
652 index_event(&conn, &dec).unwrap();
653
654 let (id, text, status): (String, String, String) = conn
655 .query_row(
656 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
657 rusqlite::params!["tj-d"],
658 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
659 )
660 .unwrap();
661 assert_eq!(id, dec.event_id);
662 assert_eq!(text, "Adopt Rust");
663 assert_eq!(status, "active");
664 }
665
666 #[test]
667 fn index_event_is_idempotent_no_search_fts_duplicates() {
668 let d = TempDir::new().unwrap();
669 let conn = open(d.path().join("s.sqlite")).unwrap();
670 let mut open_e = crate::event::Event::new(
671 "tj-id",
672 crate::event::EventType::Open,
673 crate::event::Author::User,
674 crate::event::Source::Cli,
675 "x".into(),
676 );
677 open_e.meta = serde_json::json!({"title": "Idempotent"});
678 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
679
680 index_event(&conn, &open_e).unwrap();
682 index_event(&conn, &open_e).unwrap();
683 index_event(&conn, &open_e).unwrap();
684
685 let n: i64 = conn
686 .query_row(
687 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
688 rusqlite::params![open_e.event_id],
689 |r| r.get(0),
690 )
691 .unwrap();
692 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
693 }
694
695 #[test]
696 fn list_all_projects_returns_hashes_from_state_dir() {
697 use std::fs::File;
698 let d = TempDir::new().unwrap();
699 let state_dir = d.path().join("state");
700 std::fs::create_dir_all(&state_dir).unwrap();
701 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
702 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
703 File::create(state_dir.join("not-a-project.txt")).unwrap();
704
705 let mut hashes = list_all_projects(&state_dir).unwrap();
706 hashes.sort();
707 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
708 }
709
710 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
711 use std::io::Write;
712 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
713 }
714
715 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
716 let mut e = crate::event::Event::new(
717 task_id,
718 crate::event::EventType::Open,
719 crate::event::Author::User,
720 crate::event::Source::Cli,
721 "x".into(),
722 );
723 e.meta = serde_json::json!({"title": title});
724 e
725 }
726
727 #[test]
728 fn ingest_new_events_picks_up_only_new_lines() {
729 let d = TempDir::new().unwrap();
730 let jsonl = d.path().join("events.jsonl");
731 let db = d.path().join("s.sqlite");
732 let project = "deadbeefdeadbeef";
733
734 let e1 = make_open_event("tj-i1", "first");
735 let e2 = make_open_event("tj-i2", "second");
736 let e3 = make_open_event("tj-i3", "third");
737
738 let mut f = std::fs::File::create(&jsonl).unwrap();
739 write_event_line(&mut f, &e1);
740 write_event_line(&mut f, &e2);
741 write_event_line(&mut f, &e3);
742 drop(f);
743
744 let conn = open(&db).unwrap();
746 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
747 assert_eq!(n_first, 3);
748
749 let e4 = make_open_event("tj-i4", "fourth");
751 let e5 = make_open_event("tj-i5", "fifth");
752 let mut f = std::fs::OpenOptions::new()
753 .append(true)
754 .open(&jsonl)
755 .unwrap();
756 write_event_line(&mut f, &e4);
757 write_event_line(&mut f, &e5);
758 drop(f);
759
760 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
762 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
763
764 let total: i64 = conn
765 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
766 .unwrap();
767 assert_eq!(total, 5);
768
769 let marker: String = conn
770 .query_row(
771 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
772 rusqlite::params![project],
773 |r| r.get(0),
774 )
775 .unwrap();
776 assert_eq!(marker, e5.event_id);
777 }
778
779 #[test]
780 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
781 let d = TempDir::new().unwrap();
782 let jsonl = d.path().join("events.jsonl");
783 let db = d.path().join("s.sqlite");
784 let project = "feedfacefeedface";
785
786 let e1 = make_open_event("tj-r1", "first");
787 let mut f = std::fs::File::create(&jsonl).unwrap();
788 write_event_line(&mut f, &e1);
789 drop(f);
790
791 let conn = open(&db).unwrap();
792 ingest_new_events(&conn, &jsonl, project).unwrap();
793
794 let e2 = make_open_event("tj-r2", "after-corruption");
797 let e3 = make_open_event("tj-r3", "after-corruption-2");
798 let mut f = std::fs::File::create(&jsonl).unwrap();
799 write_event_line(&mut f, &e2);
800 write_event_line(&mut f, &e3);
801 drop(f);
802
803 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
804 assert_eq!(n, 2, "missing marker must trigger full rebuild");
805 }
806
807 #[test]
808 fn rebuild_state_and_ingest_new_events_produce_same_state() {
809 let d = TempDir::new().unwrap();
810 let jsonl_a = d.path().join("a.jsonl");
811 let jsonl_b = d.path().join("b.jsonl");
812 let db_a = d.path().join("a.sqlite");
813 let db_b = d.path().join("b.sqlite");
814
815 let events: Vec<_> = (0..5)
816 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
817 .collect();
818 for path in [&jsonl_a, &jsonl_b] {
819 let mut f = std::fs::File::create(path).unwrap();
820 for e in &events {
821 write_event_line(&mut f, e);
822 }
823 }
824
825 let conn_a = open(&db_a).unwrap();
826 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
827
828 let conn_b = open(&db_b).unwrap();
829 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
830
831 assert_eq!(n_a, n_b);
832 assert_eq!(n_a, 5);
833
834 for table in ["tasks", "events_index"] {
835 let q = format!("SELECT COUNT(*) FROM {table}");
836 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
837 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
838 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
839 }
840 }
841
842 #[test]
843 fn rebuild_state_skips_malformed_jsonl_lines() {
844 use std::io::Write;
845 let d = TempDir::new().unwrap();
846 let events_path = d.path().join("events.jsonl");
847 let db_path = d.path().join("s.sqlite");
848
849 let mut f = std::fs::File::create(&events_path).unwrap();
850
851 let mut e1 = crate::event::Event::new(
852 "tj-skip",
853 crate::event::EventType::Open,
854 crate::event::Author::User,
855 crate::event::Source::Cli,
856 "x".into(),
857 );
858 e1.meta = serde_json::json!({"title": "Skip test"});
859 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
860
861 writeln!(f, "this is not a json event line").unwrap();
863
864 writeln!(f, "{{\"foo\": 1}}").unwrap();
866
867 let e3 = crate::event::Event::new(
868 "tj-skip",
869 crate::event::EventType::Decision,
870 crate::event::Author::Agent,
871 crate::event::Source::Chat,
872 "Adopt Rust".into(),
873 );
874 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
875 drop(f);
876
877 let conn = open(&db_path).unwrap();
878 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
879 .expect("rebuild_state must succeed despite malformed lines");
880 assert_eq!(
881 n, 2,
882 "expected 2 valid events indexed (2 malformed skipped)"
883 );
884
885 let indexed: i64 = conn
886 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
887 .unwrap();
888 assert_eq!(indexed, 2);
889 }
890
891 #[test]
892 fn rebuild_state_reads_jsonl_and_populates_db() {
893 use std::io::Write;
894 let d = TempDir::new().unwrap();
895 let events_path = d.path().join("events.jsonl");
896 let db_path = d.path().join("s.sqlite");
897
898 let mut f = std::fs::File::create(&events_path).unwrap();
899 let mut e1 = crate::event::Event::new(
900 "tj-9",
901 crate::event::EventType::Open,
902 crate::event::Author::User,
903 crate::event::Source::Cli,
904 "x".into(),
905 );
906 e1.meta = serde_json::json!({"title": "Nine"});
907 let e2 = crate::event::Event::new(
908 "tj-9",
909 crate::event::EventType::Decision,
910 crate::event::Author::Agent,
911 crate::event::Source::Chat,
912 "Adopt Rust".into(),
913 );
914 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
915 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
916 drop(f);
917
918 let conn = open(&db_path).unwrap();
919 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
920 assert_eq!(n, 2);
921
922 let n: i64 = conn
923 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
924 .unwrap();
925 assert_eq!(n, 1);
926 let n: i64 = conn
927 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
928 .unwrap();
929 assert_eq!(n, 2);
930 }
931
932 #[test]
933 fn index_event_writes_index_and_fts() {
934 let d = TempDir::new().unwrap();
935 let conn = open(d.path().join("s.sqlite")).unwrap();
936 let mut open_e = crate::event::Event::new(
937 "tj-1",
938 crate::event::EventType::Open,
939 crate::event::Author::User,
940 crate::event::Source::Cli,
941 "Title".into(),
942 );
943 open_e.meta = serde_json::json!({"title": "Title"});
944 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
945 index_event(&conn, &open_e).unwrap();
946
947 let mut decision = crate::event::Event::new(
948 "tj-1",
949 crate::event::EventType::Decision,
950 crate::event::Author::Agent,
951 crate::event::Source::Chat,
952 "Adopt Rust".into(),
953 );
954 decision.confidence = Some(0.92);
955 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
956 index_event(&conn, &decision).unwrap();
957
958 let count: i64 = conn
959 .query_row(
960 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
961 rusqlite::params!["tj-1"],
962 |r| r.get(0),
963 )
964 .unwrap();
965 assert_eq!(count, 2);
966
967 let mut stmt = conn
968 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
969 .unwrap();
970 let hits: Vec<String> = stmt
971 .query_map(rusqlite::params!["Rust"], |r| {
972 let s: String = r.get(0)?;
973 Ok(s)
974 })
975 .unwrap()
976 .collect::<Result<Vec<_>, _>>()
977 .unwrap();
978 assert_eq!(hits.len(), 1);
979 assert_eq!(hits[0], decision.event_id);
980 }
981
982 #[test]
983 fn upsert_task_from_open_event_inserts_row() {
984 let d = TempDir::new().unwrap();
985 let conn = open(d.path().join("s.sqlite")).unwrap();
986
987 let mut e = crate::event::Event::new(
988 "tj-7f3a",
989 crate::event::EventType::Open,
990 crate::event::Author::User,
991 crate::event::Source::Cli,
992 "Add OAuth".into(),
993 );
994 e.meta = serde_json::json!({ "title": "Add OAuth login" });
995
996 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
997
998 let (id, title, status): (String, String, String) = conn
999 .query_row(
1000 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1001 ["tj-7f3a"],
1002 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1003 )
1004 .unwrap();
1005
1006 assert_eq!(id, "tj-7f3a");
1007 assert_eq!(title, "Add OAuth login");
1008 assert_eq!(status, "open");
1009 }
1010}