1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal TEXT;
87ALTER TABLE tasks ADD COLUMN outcome TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103const MIGRATIONS: &[Migration] = &[
106 Migration {
107 version: 1,
108 sql: MIGRATION_001,
109 },
110 Migration {
111 version: 2,
112 sql: MIGRATION_002,
113 },
114 Migration {
115 version: 3,
116 sql: MIGRATION_003,
117 },
118 Migration {
119 version: 4,
120 sql: MIGRATION_004,
121 },
122];
123
124fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
125 conn.execute_batch(
126 "CREATE TABLE IF NOT EXISTS schema_migrations (
127 version INTEGER PRIMARY KEY,
128 applied_at TEXT NOT NULL
129 )",
130 )
131 .context("create schema_migrations table")?;
132
133 let applied: HashSet<i64> = {
134 let mut stmt = conn
135 .prepare("SELECT version FROM schema_migrations")
136 .context("select applied versions")?;
137 let rows = stmt
138 .query_map([], |r| r.get::<_, i64>(0))
139 .context("iterate schema_migrations")?;
140 rows.collect::<rusqlite::Result<HashSet<_>>>()
141 .context("collect applied versions")?
142 };
143
144 for migration in MIGRATIONS {
145 if applied.contains(&migration.version) {
146 continue;
147 }
148 conn.execute_batch(migration.sql)
149 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
150 conn.execute(
151 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
152 rusqlite::params![
153 migration.version,
154 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
155 ],
156 )
157 .with_context(|| {
158 format!(
159 "record schema migration v{:03} as applied",
160 migration.version
161 )
162 })?;
163 }
164 Ok(())
165}
166
167use crate::event::{Event, EventType};
168
169pub fn upsert_task_from_event(
170 conn: &Connection,
171 event: &Event,
172 project_hash: &str,
173) -> anyhow::Result<()> {
174 match event.event_type {
175 EventType::Open => {
176 let title = event
177 .meta
178 .get("title")
179 .and_then(|v| v.as_str())
180 .unwrap_or(&event.text)
181 .to_string();
182 conn.execute(
183 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at)
184 VALUES (?1, ?2, 'open', ?3, ?4, ?4)
185 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
186 rusqlite::params![event.task_id, title, project_hash, event.timestamp],
187 )?;
188 }
189 EventType::Close => {
190 conn.execute(
191 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
192 rusqlite::params![event.task_id, event.timestamp],
193 )?;
194 }
195 EventType::Reopen => {
196 conn.execute(
197 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
198 rusqlite::params![event.task_id, event.timestamp],
199 )?;
200 }
201 _ => {
202 conn.execute(
203 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
204 rusqlite::params![event.task_id, event.timestamp],
205 )?;
206 }
207 }
208 Ok(())
209}
210
211use std::io::BufRead;
212
213pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
214 let dir = state_dir.as_ref();
215 if !dir.exists() {
216 return Ok(vec![]);
217 }
218 let mut out = Vec::new();
219 for entry in std::fs::read_dir(dir)? {
220 let entry = entry?;
221 let path = entry.path();
222 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
223 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
224 out.push(stem.to_string());
225 }
226 }
227 }
228 Ok(out)
229}
230
231pub fn rebuild_state(
232 conn: &Connection,
233 jsonl_path: impl AsRef<Path>,
234 project_hash: &str,
235) -> anyhow::Result<usize> {
236 let f = std::fs::File::open(&jsonl_path)
237 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
238 let reader = std::io::BufReader::new(f);
239
240 let tx = conn.unchecked_transaction()?;
241 let mut count = 0;
242 let mut last_event_id: Option<String> = None;
243 for (i, line) in reader.lines().enumerate() {
244 let line = line.with_context(|| format!("read line {i}"))?;
245 if line.trim().is_empty() {
246 continue;
247 }
248 let event: Event = match serde_json::from_str(&line) {
252 Ok(e) => e,
253 Err(err) => {
254 tracing::warn!(
255 line_number = i + 1,
256 error = %err,
257 "skipping malformed JSONL line in rebuild_state"
258 );
259 continue;
260 }
261 };
262 upsert_task_from_event(&tx, &event, project_hash)?;
263 index_event(&tx, &event)?;
264 last_event_id = Some(event.event_id.clone());
265 count += 1;
266 }
267 if let Some(eid) = last_event_id.as_deref() {
268 record_last_indexed(&tx, project_hash, eid)?;
269 }
270 tx.commit()?;
271 Ok(count)
272}
273
274pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
279 let count: i64 = conn.query_row(
280 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
281 rusqlite::params![task_id],
282 |r| r.get(0),
283 )?;
284 Ok(count > 0)
285}
286
287pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
291 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
292 let mut rows = stmt.query(rusqlite::params![task_id])?;
293 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
294}
295
296pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
300 conn.execute(
301 "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
302 rusqlite::params![goal, task_id],
303 )
304 .with_context(|| format!("set goal for {task_id}"))?;
305 conn.execute(
308 "DELETE FROM task_pack_cache WHERE task_id = ?1",
309 rusqlite::params![task_id],
310 )?;
311 Ok(())
312}
313
314pub fn set_task_outcome(
318 conn: &Connection,
319 task_id: &str,
320 outcome: &str,
321 outcome_tag: Option<&str>,
322) -> anyhow::Result<()> {
323 conn.execute(
324 "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
325 rusqlite::params![outcome, outcome_tag, task_id],
326 )
327 .with_context(|| format!("set outcome for {task_id}"))?;
328 conn.execute(
329 "DELETE FROM task_pack_cache WHERE task_id = ?1",
330 rusqlite::params![task_id],
331 )?;
332 Ok(())
333}
334
335pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
340 let current: Option<String> = conn
341 .query_row(
342 "SELECT external FROM tasks WHERE task_id = ?1",
343 rusqlite::params![task_id],
344 |r| r.get::<_, Option<String>>(0),
345 )
346 .with_context(|| format!("read external for {task_id}"))?;
347 let next = match current {
348 Some(s) if !s.is_empty() => format!("{s},{reference}"),
349 _ => reference.to_string(),
350 };
351 conn.execute(
352 "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
353 rusqlite::params![next, task_id],
354 )?;
355 conn.execute(
356 "DELETE FROM task_pack_cache WHERE task_id = ?1",
357 rusqlite::params![task_id],
358 )?;
359 Ok(())
360}
361
362#[derive(Debug, Clone, Default)]
365pub struct TaskMetadata {
366 pub goal: Option<String>,
367 pub outcome: Option<String>,
368 pub outcome_tag: Option<String>,
369 pub external: Option<String>,
370}
371
372pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
373 let mut stmt =
374 conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
375 let mut rows = stmt.query(rusqlite::params![task_id])?;
376 Ok(match rows.next()? {
377 Some(r) => Some(TaskMetadata {
378 goal: r.get::<_, Option<String>>(0)?,
379 outcome: r.get::<_, Option<String>>(1)?,
380 outcome_tag: r.get::<_, Option<String>>(2)?,
381 external: r.get::<_, Option<String>>(3)?,
382 }),
383 None => None,
384 })
385}
386
387#[derive(Debug, Clone)]
390pub struct StaleTask {
391 pub task_id: String,
392 pub title: String,
393 pub last_event_at: String,
394 pub days_idle: i64,
395}
396
397pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
400 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
401 let cutoff_str = cutoff.to_rfc3339();
402 let mut stmt = conn.prepare(
403 "SELECT task_id, title, last_event_at FROM tasks
404 WHERE status = 'open' AND last_event_at < ?1
405 ORDER BY last_event_at ASC",
406 )?;
407 let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
408 Ok((
409 r.get::<_, String>(0)?,
410 r.get::<_, String>(1)?,
411 r.get::<_, String>(2)?,
412 ))
413 })?;
414 let now = chrono::Utc::now();
415 let mut out = Vec::new();
416 for row in rows {
417 let (task_id, title, last_at) = row?;
418 let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
419 .map(|d| d.with_timezone(&chrono::Utc))
420 .unwrap_or(now);
421 let days_idle = (now - dt).num_days();
422 out.push(StaleTask {
423 task_id,
424 title,
425 last_event_at: last_at,
426 days_idle,
427 });
428 }
429 Ok(out)
430}
431
432#[derive(Debug, Clone)]
437pub struct RelatedTask {
438 pub task_id: String,
439 pub status: String,
440 pub score: f64,
441}
442
443pub fn find_related_tasks(
454 conn: &Connection,
455 arts: &crate::artifacts::Artifacts,
456) -> anyhow::Result<Vec<RelatedTask>> {
457 use std::collections::HashMap;
458 if arts.is_empty() {
459 return Ok(Vec::new());
460 }
461 let mut scores: HashMap<String, f64> = HashMap::new();
462 let mut last_seen: HashMap<String, String> = HashMap::new();
463
464 let needles: Vec<(String, f64)> = arts
465 .linked_issues
466 .iter()
467 .map(|s| (s.clone(), 1.0))
468 .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
469 .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
470 .collect();
471
472 for (needle, weight) in needles {
473 let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
474 let mut stmt = conn.prepare(
475 "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
476 WHERE artifacts LIKE ?1
477 GROUP BY task_id
478 ORDER BY ts DESC",
479 )?;
480 let rows = stmt.query_map(rusqlite::params![pattern], |r| {
481 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
482 })?;
483 for row in rows {
484 let (id, ts) = row?;
485 *scores.entry(id.clone()).or_insert(0.0) += weight;
486 last_seen.insert(id, ts);
487 }
488 }
489
490 let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
491 for (id, score) in scores {
492 let status: Option<String> = conn
493 .query_row(
494 "SELECT status FROM tasks WHERE task_id = ?1",
495 rusqlite::params![&id],
496 |r| r.get(0),
497 )
498 .ok();
499 if let Some(status) = status {
500 out.push(RelatedTask {
501 task_id: id,
502 status,
503 score,
504 });
505 }
506 }
507 out.sort_by(|a, b| {
508 b.score
509 .partial_cmp(&a.score)
510 .unwrap_or(std::cmp::Ordering::Equal)
511 .then_with(|| {
512 let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
513 let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
514 ts_b.cmp(&ts_a)
515 })
516 });
517 Ok(out)
518}
519
520pub fn find_tasks_by_linked_issues(
527 conn: &Connection,
528 issues: &[String],
529) -> anyhow::Result<Vec<(String, String)>> {
530 if issues.is_empty() {
531 return Ok(Vec::new());
532 }
533 let mut candidate_ids: Vec<String> = Vec::new();
539 for issue in issues {
540 let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
541 let mut stmt = conn.prepare(
542 "SELECT DISTINCT task_id FROM events_index
543 WHERE artifacts LIKE ?1
544 ORDER BY timestamp DESC",
545 )?;
546 let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
547 for r in rows {
548 let id = r?;
549 if !candidate_ids.contains(&id) {
550 candidate_ids.push(id);
551 }
552 }
553 }
554 let mut out = Vec::with_capacity(candidate_ids.len());
556 for id in candidate_ids {
557 let status: Option<String> = conn
558 .query_row(
559 "SELECT status FROM tasks WHERE task_id = ?1",
560 rusqlite::params![&id],
561 |r| r.get(0),
562 )
563 .ok();
564 if let Some(s) = status {
565 out.push((id, s));
566 }
567 }
568 Ok(out)
569}
570
571pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
577 let mut stmt = conn.prepare(
578 "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
579 LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
580 WHERE ei.task_id = ?1",
581 )?;
582 let rows: Vec<(String, String)> = stmt
583 .query_map(rusqlite::params![task_id], |r| {
584 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
585 })?
586 .collect::<Result<_, _>>()?;
587 let count = rows.len();
588 for (event_id, text) in rows {
589 let arts = crate::artifacts::extract(&text);
590 let json = if arts.is_empty() {
591 None
592 } else {
593 Some(serde_json::to_string(&arts)?)
594 };
595 conn.execute(
596 "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
597 rusqlite::params![json, event_id],
598 )?;
599 }
600 conn.execute(
601 "DELETE FROM task_pack_cache WHERE task_id = ?1",
602 rusqlite::params![task_id],
603 )?;
604 Ok(count)
605}
606
607pub fn task_artifacts(
613 conn: &Connection,
614 task_id: &str,
615) -> anyhow::Result<crate::artifacts::Artifacts> {
616 let mut stmt = conn.prepare(
617 "SELECT artifacts FROM events_index
618 WHERE task_id = ?1 AND artifacts IS NOT NULL
619 ORDER BY timestamp ASC",
620 )?;
621 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
622 let mut acc = crate::artifacts::Artifacts::default();
623 for row in rows {
624 let json = row?;
625 if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
626 acc.merge(parsed);
627 }
628 }
629 Ok(acc)
630}
631
632fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
636 let mut stmt =
637 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
638 let mut rows = stmt.query(rusqlite::params![project_hash])?;
639 if let Some(row) = rows.next()? {
640 Ok(Some(row.get::<_, String>(0)?))
641 } else {
642 Ok(None)
643 }
644}
645
646fn record_last_indexed(
647 conn: &Connection,
648 project_hash: &str,
649 event_id: &str,
650) -> anyhow::Result<()> {
651 conn.execute(
652 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
653 VALUES (?1, ?2, ?3)
654 ON CONFLICT(project_hash) DO UPDATE SET
655 last_indexed_event_id = excluded.last_indexed_event_id,
656 updated_at = excluded.updated_at",
657 rusqlite::params![
658 project_hash,
659 event_id,
660 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
661 ],
662 )?;
663 Ok(())
664}
665
666pub fn ingest_new_events(
676 conn: &Connection,
677 jsonl_path: impl AsRef<Path>,
678 project_hash: &str,
679) -> anyhow::Result<usize> {
680 let marker = match last_indexed_event_id(conn, project_hash)? {
681 Some(id) => id,
682 None => return rebuild_state(conn, jsonl_path, project_hash),
683 };
684
685 let f = std::fs::File::open(&jsonl_path)
686 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
687 let reader = std::io::BufReader::new(f);
688
689 let tx = conn.unchecked_transaction()?;
693 let mut found_marker = false;
694 let mut count = 0;
695 let mut last_event_id: Option<String> = None;
696 for (i, line) in reader.lines().enumerate() {
697 let line = line.with_context(|| format!("read line {i}"))?;
698 if line.trim().is_empty() {
699 continue;
700 }
701 let event: Event = match serde_json::from_str(&line) {
702 Ok(e) => e,
703 Err(err) => {
704 tracing::warn!(
705 line_number = i + 1,
706 error = %err,
707 "skipping malformed JSONL line in ingest_new_events"
708 );
709 continue;
710 }
711 };
712 if !found_marker {
713 if event.event_id == marker {
714 found_marker = true;
715 }
716 continue;
717 }
718 upsert_task_from_event(&tx, &event, project_hash)?;
719 index_event(&tx, &event)?;
720 last_event_id = Some(event.event_id.clone());
721 count += 1;
722 }
723
724 if !found_marker {
725 drop(tx);
727 tracing::warn!(
728 project_hash = project_hash,
729 marker = marker.as_str(),
730 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
731 );
732 return rebuild_state(conn, jsonl_path, project_hash);
733 }
734
735 if let Some(eid) = last_event_id.as_deref() {
736 record_last_indexed(&tx, project_hash, eid)?;
737 }
738 tx.commit()?;
739 Ok(count)
740}
741
742pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
743 let type_str = serde_json::to_value(event.event_type)?
744 .as_str()
745 .unwrap()
746 .to_string();
747 let status_str = serde_json::to_value(event.status)?
748 .as_str()
749 .unwrap()
750 .to_string();
751 let artifacts = crate::artifacts::extract(&event.text);
756 let artifacts_json = if artifacts.is_empty() {
757 None
758 } else {
759 Some(serde_json::to_string(&artifacts)?)
760 };
761 conn.execute(
762 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
763 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
764 rusqlite::params![
765 event.event_id, event.task_id, type_str,
766 event.timestamp, event.confidence, status_str, artifacts_json
767 ],
768 )?;
769 conn.execute(
771 "DELETE FROM search_fts WHERE event_id=?1",
772 rusqlite::params![event.event_id],
773 )?;
774 conn.execute(
775 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
776 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
777 )?;
778
779 if event.event_type == EventType::Decision {
780 conn.execute(
781 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status)
782 VALUES (?1, ?2, ?3, 'active')",
783 rusqlite::params![event.event_id, event.task_id, event.text],
784 )?;
785 }
786
787 if event.event_type == EventType::Supersede {
788 if let Some(target) = &event.supersedes {
789 conn.execute(
790 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
791 rusqlite::params![event.event_id, target],
792 )?;
793 }
794 }
795
796 if event.event_type == EventType::Evidence {
797 let strength_str = event
798 .evidence_strength
799 .map(|s| {
800 serde_json::to_value(s)
801 .unwrap()
802 .as_str()
803 .unwrap()
804 .to_string()
805 })
806 .unwrap_or_else(|| "medium".into());
807 conn.execute(
808 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
809 VALUES (?1, ?2, ?3, ?4)",
810 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
811 )?;
812 }
813
814 conn.execute(
816 "DELETE FROM task_pack_cache WHERE task_id=?1",
817 rusqlite::params![event.task_id],
818 )?;
819
820 Ok(())
821}
822
823pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
824 if let Some(parent) = path.as_ref().parent() {
825 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
826 }
827 let conn =
828 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
829 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
830 apply_migrations(&conn).context("apply schema migrations")?;
831 Ok(conn)
832}
833
834#[derive(Debug, Clone)]
838pub struct TaskRow {
839 pub task_id: String,
840 pub title: String,
841 pub status: String,
842 pub last_event_at: String,
843 pub event_count: usize,
844}
845
846pub fn list_tasks_by_project(
850 conn: &Connection,
851 project_hash: &str,
852) -> anyhow::Result<Vec<TaskRow>> {
853 let mut stmt = conn.prepare(
854 "SELECT t.task_id, t.title, t.status, t.last_event_at,
855 COALESCE(c.cnt, 0) AS event_count
856 FROM tasks t
857 LEFT JOIN (
858 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
859 ) c ON c.task_id = t.task_id
860 WHERE t.project_hash = ?1
861 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
862 )?;
863 let rows = stmt
864 .query_map(rusqlite::params![project_hash], |r| {
865 Ok(TaskRow {
866 task_id: r.get::<_, String>(0)?,
867 title: r.get::<_, String>(1)?,
868 status: r.get::<_, String>(2)?,
869 last_event_at: r.get::<_, String>(3)?,
870 event_count: r.get::<_, i64>(4)? as usize,
871 })
872 })?
873 .collect::<Result<Vec<_>, _>>()?;
874 Ok(rows)
875}
876
877#[cfg(test)]
878mod tests {
879 use super::*;
880 use tempfile::TempDir;
881
882 #[test]
883 fn task_exists_returns_true_for_known_id_false_otherwise() {
884 let d = TempDir::new().unwrap();
885 let conn = open(d.path().join("s.sqlite")).unwrap();
886
887 assert!(!task_exists(&conn, "tj-nope").unwrap());
888
889 let e = make_open_event("tj-yes", "Hello");
890 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
891 index_event(&conn, &e).unwrap();
892
893 assert!(task_exists(&conn, "tj-yes").unwrap());
894 assert!(!task_exists(&conn, "tj-nope").unwrap());
895 }
896
897 #[test]
898 fn fresh_db_runs_all_migrations() {
899 let d = TempDir::new().unwrap();
900 let p = d.path().join("state.sqlite");
901 let conn = open(&p).unwrap();
902
903 let applied: Vec<i64> = conn
904 .prepare("SELECT version FROM schema_migrations ORDER BY version")
905 .unwrap()
906 .query_map([], |r| r.get::<_, i64>(0))
907 .unwrap()
908 .collect::<Result<_, _>>()
909 .unwrap();
910 assert_eq!(
911 applied,
912 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
913 "every declared migration must be recorded"
914 );
915 }
916
917 #[test]
918 fn apply_migrations_is_idempotent_across_reopens() {
919 let d = TempDir::new().unwrap();
920 let p = d.path().join("state.sqlite");
921 let _ = open(&p).unwrap();
922 let _ = open(&p).unwrap();
923
924 let count: i64 = open(&p)
925 .unwrap()
926 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
927 .unwrap();
928 assert_eq!(
929 count,
930 MIGRATIONS.len() as i64,
931 "schema_migrations must contain exactly one row per declared migration after repeated opens"
932 );
933 }
934
935 #[test]
936 fn open_creates_all_tables() {
937 let d = TempDir::new().unwrap();
938 let p = d.path().join("state.sqlite");
939 let conn = open(&p).unwrap();
940
941 let names: Vec<String> = conn
942 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
943 .unwrap()
944 .query_map([], |r| r.get::<_, String>(0))
945 .unwrap()
946 .collect::<Result<_, _>>()
947 .unwrap();
948
949 for required in [
950 "decisions",
951 "events_index",
952 "evidence",
953 "task_pack_cache",
954 "tasks",
955 "search_fts",
956 ] {
957 assert!(
958 names.iter().any(|n| n == required),
959 "missing table {required}, have {names:?}"
960 );
961 }
962 }
963
964 #[test]
965 fn open_is_idempotent() {
966 let d = TempDir::new().unwrap();
967 let p = d.path().join("state.sqlite");
968 let _ = open(&p).unwrap();
969 let _ = open(&p).unwrap();
970 }
971
972 #[test]
973 fn index_event_projects_evidence() {
974 let d = TempDir::new().unwrap();
975 let conn = open(d.path().join("s.sqlite")).unwrap();
976 let mut open_e = crate::event::Event::new(
977 "tj-e",
978 crate::event::EventType::Open,
979 crate::event::Author::User,
980 crate::event::Source::Cli,
981 "x".into(),
982 );
983 open_e.meta = serde_json::json!({"title": "T"});
984 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
985 index_event(&conn, &open_e).unwrap();
986
987 let mut ev = crate::event::Event::new(
988 "tj-e",
989 crate::event::EventType::Evidence,
990 crate::event::Author::Agent,
991 crate::event::Source::Chat,
992 "Hook startup measured at 12ms".into(),
993 );
994 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
995 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
996 index_event(&conn, &ev).unwrap();
997
998 let (text, strength): (String, String) = conn
999 .query_row(
1000 "SELECT text, strength FROM evidence WHERE task_id=?1",
1001 rusqlite::params!["tj-e"],
1002 |r| Ok((r.get(0)?, r.get(1)?)),
1003 )
1004 .unwrap();
1005 assert!(text.contains("12ms"));
1006 assert_eq!(strength, "strong");
1007 }
1008
1009 #[test]
1010 fn supersede_event_marks_decision_superseded() {
1011 let d = TempDir::new().unwrap();
1012 let conn = open(d.path().join("s.sqlite")).unwrap();
1013 let mut open_e = crate::event::Event::new(
1014 "tj-s",
1015 crate::event::EventType::Open,
1016 crate::event::Author::User,
1017 crate::event::Source::Cli,
1018 "x".into(),
1019 );
1020 open_e.meta = serde_json::json!({"title": "T"});
1021 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1022 index_event(&conn, &open_e).unwrap();
1023
1024 let dec = crate::event::Event::new(
1025 "tj-s",
1026 crate::event::EventType::Decision,
1027 crate::event::Author::Agent,
1028 crate::event::Source::Chat,
1029 "Use TS".into(),
1030 );
1031 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1032 index_event(&conn, &dec).unwrap();
1033
1034 let mut sup = crate::event::Event::new(
1035 "tj-s",
1036 crate::event::EventType::Supersede,
1037 crate::event::Author::Agent,
1038 crate::event::Source::Chat,
1039 "Replaced by Rust decision".into(),
1040 );
1041 sup.supersedes = Some(dec.event_id.clone());
1042 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1043 index_event(&conn, &sup).unwrap();
1044
1045 let (status, by): (String, Option<String>) = conn
1046 .query_row(
1047 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1048 rusqlite::params![dec.event_id],
1049 |r| Ok((r.get(0)?, r.get(1)?)),
1050 )
1051 .unwrap();
1052 assert_eq!(status, "superseded");
1053 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1054 }
1055
1056 #[test]
1057 fn index_event_projects_decision_to_decisions_table() {
1058 let d = TempDir::new().unwrap();
1059 let conn = open(d.path().join("s.sqlite")).unwrap();
1060
1061 let mut open_e = crate::event::Event::new(
1062 "tj-d",
1063 crate::event::EventType::Open,
1064 crate::event::Author::User,
1065 crate::event::Source::Cli,
1066 "x".into(),
1067 );
1068 open_e.meta = serde_json::json!({"title": "T"});
1069 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1070 index_event(&conn, &open_e).unwrap();
1071
1072 let dec = crate::event::Event::new(
1073 "tj-d",
1074 crate::event::EventType::Decision,
1075 crate::event::Author::Agent,
1076 crate::event::Source::Chat,
1077 "Adopt Rust".into(),
1078 );
1079 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1080 index_event(&conn, &dec).unwrap();
1081
1082 let (id, text, status): (String, String, String) = conn
1083 .query_row(
1084 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1085 rusqlite::params!["tj-d"],
1086 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1087 )
1088 .unwrap();
1089 assert_eq!(id, dec.event_id);
1090 assert_eq!(text, "Adopt Rust");
1091 assert_eq!(status, "active");
1092 }
1093
1094 #[test]
1095 fn index_event_is_idempotent_no_search_fts_duplicates() {
1096 let d = TempDir::new().unwrap();
1097 let conn = open(d.path().join("s.sqlite")).unwrap();
1098 let mut open_e = crate::event::Event::new(
1099 "tj-id",
1100 crate::event::EventType::Open,
1101 crate::event::Author::User,
1102 crate::event::Source::Cli,
1103 "x".into(),
1104 );
1105 open_e.meta = serde_json::json!({"title": "Idempotent"});
1106 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1107
1108 index_event(&conn, &open_e).unwrap();
1110 index_event(&conn, &open_e).unwrap();
1111 index_event(&conn, &open_e).unwrap();
1112
1113 let n: i64 = conn
1114 .query_row(
1115 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1116 rusqlite::params![open_e.event_id],
1117 |r| r.get(0),
1118 )
1119 .unwrap();
1120 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1121 }
1122
1123 #[test]
1124 fn list_all_projects_returns_hashes_from_state_dir() {
1125 use std::fs::File;
1126 let d = TempDir::new().unwrap();
1127 let state_dir = d.path().join("state");
1128 std::fs::create_dir_all(&state_dir).unwrap();
1129 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1130 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1131 File::create(state_dir.join("not-a-project.txt")).unwrap();
1132
1133 let mut hashes = list_all_projects(&state_dir).unwrap();
1134 hashes.sort();
1135 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1136 }
1137
1138 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1139 use std::io::Write;
1140 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1141 }
1142
1143 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1144 let mut e = crate::event::Event::new(
1145 task_id,
1146 crate::event::EventType::Open,
1147 crate::event::Author::User,
1148 crate::event::Source::Cli,
1149 "x".into(),
1150 );
1151 e.meta = serde_json::json!({"title": title});
1152 e
1153 }
1154
1155 #[test]
1156 fn ingest_new_events_picks_up_only_new_lines() {
1157 let d = TempDir::new().unwrap();
1158 let jsonl = d.path().join("events.jsonl");
1159 let db = d.path().join("s.sqlite");
1160 let project = "deadbeefdeadbeef";
1161
1162 let e1 = make_open_event("tj-i1", "first");
1163 let e2 = make_open_event("tj-i2", "second");
1164 let e3 = make_open_event("tj-i3", "third");
1165
1166 let mut f = std::fs::File::create(&jsonl).unwrap();
1167 write_event_line(&mut f, &e1);
1168 write_event_line(&mut f, &e2);
1169 write_event_line(&mut f, &e3);
1170 drop(f);
1171
1172 let conn = open(&db).unwrap();
1174 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1175 assert_eq!(n_first, 3);
1176
1177 let e4 = make_open_event("tj-i4", "fourth");
1179 let e5 = make_open_event("tj-i5", "fifth");
1180 let mut f = std::fs::OpenOptions::new()
1181 .append(true)
1182 .open(&jsonl)
1183 .unwrap();
1184 write_event_line(&mut f, &e4);
1185 write_event_line(&mut f, &e5);
1186 drop(f);
1187
1188 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1190 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1191
1192 let total: i64 = conn
1193 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1194 .unwrap();
1195 assert_eq!(total, 5);
1196
1197 let marker: String = conn
1198 .query_row(
1199 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1200 rusqlite::params![project],
1201 |r| r.get(0),
1202 )
1203 .unwrap();
1204 assert_eq!(marker, e5.event_id);
1205 }
1206
1207 #[test]
1208 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1209 let d = TempDir::new().unwrap();
1210 let jsonl = d.path().join("events.jsonl");
1211 let db = d.path().join("s.sqlite");
1212 let project = "feedfacefeedface";
1213
1214 let e1 = make_open_event("tj-r1", "first");
1215 let mut f = std::fs::File::create(&jsonl).unwrap();
1216 write_event_line(&mut f, &e1);
1217 drop(f);
1218
1219 let conn = open(&db).unwrap();
1220 ingest_new_events(&conn, &jsonl, project).unwrap();
1221
1222 let e2 = make_open_event("tj-r2", "after-corruption");
1225 let e3 = make_open_event("tj-r3", "after-corruption-2");
1226 let mut f = std::fs::File::create(&jsonl).unwrap();
1227 write_event_line(&mut f, &e2);
1228 write_event_line(&mut f, &e3);
1229 drop(f);
1230
1231 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1232 assert_eq!(n, 2, "missing marker must trigger full rebuild");
1233 }
1234
1235 #[test]
1236 fn rebuild_state_and_ingest_new_events_produce_same_state() {
1237 let d = TempDir::new().unwrap();
1238 let jsonl_a = d.path().join("a.jsonl");
1239 let jsonl_b = d.path().join("b.jsonl");
1240 let db_a = d.path().join("a.sqlite");
1241 let db_b = d.path().join("b.sqlite");
1242
1243 let events: Vec<_> = (0..5)
1244 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1245 .collect();
1246 for path in [&jsonl_a, &jsonl_b] {
1247 let mut f = std::fs::File::create(path).unwrap();
1248 for e in &events {
1249 write_event_line(&mut f, e);
1250 }
1251 }
1252
1253 let conn_a = open(&db_a).unwrap();
1254 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1255
1256 let conn_b = open(&db_b).unwrap();
1257 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1258
1259 assert_eq!(n_a, n_b);
1260 assert_eq!(n_a, 5);
1261
1262 for table in ["tasks", "events_index"] {
1263 let q = format!("SELECT COUNT(*) FROM {table}");
1264 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1265 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1266 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1267 }
1268 }
1269
1270 #[test]
1271 fn rebuild_state_skips_malformed_jsonl_lines() {
1272 use std::io::Write;
1273 let d = TempDir::new().unwrap();
1274 let events_path = d.path().join("events.jsonl");
1275 let db_path = d.path().join("s.sqlite");
1276
1277 let mut f = std::fs::File::create(&events_path).unwrap();
1278
1279 let mut e1 = crate::event::Event::new(
1280 "tj-skip",
1281 crate::event::EventType::Open,
1282 crate::event::Author::User,
1283 crate::event::Source::Cli,
1284 "x".into(),
1285 );
1286 e1.meta = serde_json::json!({"title": "Skip test"});
1287 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1288
1289 writeln!(f, "this is not a json event line").unwrap();
1291
1292 writeln!(f, "{{\"foo\": 1}}").unwrap();
1294
1295 let e3 = crate::event::Event::new(
1296 "tj-skip",
1297 crate::event::EventType::Decision,
1298 crate::event::Author::Agent,
1299 crate::event::Source::Chat,
1300 "Adopt Rust".into(),
1301 );
1302 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1303 drop(f);
1304
1305 let conn = open(&db_path).unwrap();
1306 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1307 .expect("rebuild_state must succeed despite malformed lines");
1308 assert_eq!(
1309 n, 2,
1310 "expected 2 valid events indexed (2 malformed skipped)"
1311 );
1312
1313 let indexed: i64 = conn
1314 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1315 .unwrap();
1316 assert_eq!(indexed, 2);
1317 }
1318
1319 #[test]
1320 fn rebuild_state_reads_jsonl_and_populates_db() {
1321 use std::io::Write;
1322 let d = TempDir::new().unwrap();
1323 let events_path = d.path().join("events.jsonl");
1324 let db_path = d.path().join("s.sqlite");
1325
1326 let mut f = std::fs::File::create(&events_path).unwrap();
1327 let mut e1 = crate::event::Event::new(
1328 "tj-9",
1329 crate::event::EventType::Open,
1330 crate::event::Author::User,
1331 crate::event::Source::Cli,
1332 "x".into(),
1333 );
1334 e1.meta = serde_json::json!({"title": "Nine"});
1335 let e2 = crate::event::Event::new(
1336 "tj-9",
1337 crate::event::EventType::Decision,
1338 crate::event::Author::Agent,
1339 crate::event::Source::Chat,
1340 "Adopt Rust".into(),
1341 );
1342 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1343 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1344 drop(f);
1345
1346 let conn = open(&db_path).unwrap();
1347 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1348 assert_eq!(n, 2);
1349
1350 let n: i64 = conn
1351 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1352 .unwrap();
1353 assert_eq!(n, 1);
1354 let n: i64 = conn
1355 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1356 .unwrap();
1357 assert_eq!(n, 2);
1358 }
1359
1360 #[test]
1361 fn index_event_writes_index_and_fts() {
1362 let d = TempDir::new().unwrap();
1363 let conn = open(d.path().join("s.sqlite")).unwrap();
1364 let mut open_e = crate::event::Event::new(
1365 "tj-1",
1366 crate::event::EventType::Open,
1367 crate::event::Author::User,
1368 crate::event::Source::Cli,
1369 "Title".into(),
1370 );
1371 open_e.meta = serde_json::json!({"title": "Title"});
1372 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1373 index_event(&conn, &open_e).unwrap();
1374
1375 let mut decision = crate::event::Event::new(
1376 "tj-1",
1377 crate::event::EventType::Decision,
1378 crate::event::Author::Agent,
1379 crate::event::Source::Chat,
1380 "Adopt Rust".into(),
1381 );
1382 decision.confidence = Some(0.92);
1383 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1384 index_event(&conn, &decision).unwrap();
1385
1386 let count: i64 = conn
1387 .query_row(
1388 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1389 rusqlite::params!["tj-1"],
1390 |r| r.get(0),
1391 )
1392 .unwrap();
1393 assert_eq!(count, 2);
1394
1395 let mut stmt = conn
1396 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1397 .unwrap();
1398 let hits: Vec<String> = stmt
1399 .query_map(rusqlite::params!["Rust"], |r| {
1400 let s: String = r.get(0)?;
1401 Ok(s)
1402 })
1403 .unwrap()
1404 .collect::<Result<Vec<_>, _>>()
1405 .unwrap();
1406 assert_eq!(hits.len(), 1);
1407 assert_eq!(hits[0], decision.event_id);
1408 }
1409
1410 #[test]
1411 fn upsert_task_from_open_event_inserts_row() {
1412 let d = TempDir::new().unwrap();
1413 let conn = open(d.path().join("s.sqlite")).unwrap();
1414
1415 let mut e = crate::event::Event::new(
1416 "tj-7f3a",
1417 crate::event::EventType::Open,
1418 crate::event::Author::User,
1419 crate::event::Source::Cli,
1420 "Add OAuth".into(),
1421 );
1422 e.meta = serde_json::json!({ "title": "Add OAuth login" });
1423
1424 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1425
1426 let (id, title, status): (String, String, String) = conn
1427 .query_row(
1428 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1429 ["tj-7f3a"],
1430 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1431 )
1432 .unwrap();
1433
1434 assert_eq!(id, "tj-7f3a");
1435 assert_eq!(title, "Add OAuth login");
1436 assert_eq!(status, "open");
1437 }
1438}