1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal TEXT;
87ALTER TABLE tasks ADD COLUMN outcome TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103const MIGRATION_005: &str = r#"
106CREATE TABLE IF NOT EXISTS dream_state (
107 project_hash TEXT PRIMARY KEY,
108 last_dream_at TEXT NOT NULL,
109 updated_at TEXT NOT NULL
110);
111"#;
112
113const MIGRATION_006: &str = r#"
117ALTER TABLE tasks ADD COLUMN parent_id TEXT;
118CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_id);
119"#;
120
121const MIGRATION_007: &str = r#"
127ALTER TABLE decisions ADD COLUMN alternatives TEXT;
128DELETE FROM task_pack_cache;
129"#;
130
131const MIGRATIONS: &[Migration] = &[
134 Migration {
135 version: 1,
136 sql: MIGRATION_001,
137 },
138 Migration {
139 version: 2,
140 sql: MIGRATION_002,
141 },
142 Migration {
143 version: 3,
144 sql: MIGRATION_003,
145 },
146 Migration {
147 version: 4,
148 sql: MIGRATION_004,
149 },
150 Migration {
151 version: 5,
152 sql: MIGRATION_005,
153 },
154 Migration {
155 version: 6,
156 sql: MIGRATION_006,
157 },
158 Migration {
159 version: 7,
160 sql: MIGRATION_007,
161 },
162];
163
164fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
165 conn.execute_batch(
166 "CREATE TABLE IF NOT EXISTS schema_migrations (
167 version INTEGER PRIMARY KEY,
168 applied_at TEXT NOT NULL
169 )",
170 )
171 .context("create schema_migrations table")?;
172
173 let applied: HashSet<i64> = {
174 let mut stmt = conn
175 .prepare("SELECT version FROM schema_migrations")
176 .context("select applied versions")?;
177 let rows = stmt
178 .query_map([], |r| r.get::<_, i64>(0))
179 .context("iterate schema_migrations")?;
180 rows.collect::<rusqlite::Result<HashSet<_>>>()
181 .context("collect applied versions")?
182 };
183
184 for migration in MIGRATIONS {
185 if applied.contains(&migration.version) {
186 continue;
187 }
188 conn.execute_batch(migration.sql)
189 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
190 conn.execute(
191 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
192 rusqlite::params![
193 migration.version,
194 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
195 ],
196 )
197 .with_context(|| {
198 format!(
199 "record schema migration v{:03} as applied",
200 migration.version
201 )
202 })?;
203 }
204 Ok(())
205}
206
207use crate::event::{Event, EventType};
208
209pub fn upsert_task_from_event(
210 conn: &Connection,
211 event: &Event,
212 project_hash: &str,
213) -> anyhow::Result<()> {
214 match event.event_type {
215 EventType::Open => {
216 let title = event
217 .meta
218 .get("title")
219 .and_then(|v| v.as_str())
220 .unwrap_or(&event.text)
221 .to_string();
222 let parent_id = event
223 .meta
224 .get("parent_id")
225 .and_then(|v| v.as_str())
226 .map(str::to_string);
227 conn.execute(
230 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at, parent_id)
231 VALUES (?1, ?2, 'open', ?3, ?4, ?4, ?5)
232 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
233 rusqlite::params![event.task_id, title, project_hash, event.timestamp, parent_id],
234 )?;
235 }
236 EventType::Close => {
237 conn.execute(
238 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
239 rusqlite::params![event.task_id, event.timestamp],
240 )?;
241 }
242 EventType::Reopen => {
243 conn.execute(
244 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
245 rusqlite::params![event.task_id, event.timestamp],
246 )?;
247 }
248 _ => {
249 conn.execute(
250 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
251 rusqlite::params![event.task_id, event.timestamp],
252 )?;
253 }
254 }
255 Ok(())
256}
257
258use std::io::BufRead;
259
260pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
261 let dir = state_dir.as_ref();
262 if !dir.exists() {
263 return Ok(vec![]);
264 }
265 let mut out = Vec::new();
266 for entry in std::fs::read_dir(dir)? {
267 let entry = entry?;
268 let path = entry.path();
269 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
270 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
271 out.push(stem.to_string());
272 }
273 }
274 }
275 Ok(out)
276}
277
278pub fn rebuild_state(
279 conn: &Connection,
280 jsonl_path: impl AsRef<Path>,
281 project_hash: &str,
282) -> anyhow::Result<usize> {
283 let f = std::fs::File::open(&jsonl_path)
284 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
285 let reader = std::io::BufReader::new(f);
286
287 let tx = conn.unchecked_transaction()?;
288 let mut count = 0;
289 let mut last_event_id: Option<String> = None;
290 for (i, line) in reader.lines().enumerate() {
291 let line = line.with_context(|| format!("read line {i}"))?;
292 if line.trim().is_empty() {
293 continue;
294 }
295 let event: Event = match serde_json::from_str(&line) {
299 Ok(e) => e,
300 Err(err) => {
301 tracing::warn!(
302 line_number = i + 1,
303 error = %err,
304 "skipping malformed JSONL line in rebuild_state"
305 );
306 continue;
307 }
308 };
309 upsert_task_from_event(&tx, &event, project_hash)?;
310 index_event(&tx, &event)?;
311 last_event_id = Some(event.event_id.clone());
312 count += 1;
313 }
314 if let Some(eid) = last_event_id.as_deref() {
315 record_last_indexed(&tx, project_hash, eid)?;
316 }
317 tx.commit()?;
318 Ok(count)
319}
320
321pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
326 let count: i64 = conn.query_row(
327 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
328 rusqlite::params![task_id],
329 |r| r.get(0),
330 )?;
331 Ok(count > 0)
332}
333
334pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
338 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
339 let mut rows = stmt.query(rusqlite::params![task_id])?;
340 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
341}
342
343pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
347 conn.execute(
348 "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
349 rusqlite::params![goal, task_id],
350 )
351 .with_context(|| format!("set goal for {task_id}"))?;
352 conn.execute(
355 "DELETE FROM task_pack_cache WHERE task_id = ?1",
356 rusqlite::params![task_id],
357 )?;
358 Ok(())
359}
360
361pub fn set_task_outcome(
365 conn: &Connection,
366 task_id: &str,
367 outcome: &str,
368 outcome_tag: Option<&str>,
369) -> anyhow::Result<()> {
370 conn.execute(
371 "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
372 rusqlite::params![outcome, outcome_tag, task_id],
373 )
374 .with_context(|| format!("set outcome for {task_id}"))?;
375 conn.execute(
376 "DELETE FROM task_pack_cache WHERE task_id = ?1",
377 rusqlite::params![task_id],
378 )?;
379 Ok(())
380}
381
382pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
387 let current: Option<String> = conn
388 .query_row(
389 "SELECT external FROM tasks WHERE task_id = ?1",
390 rusqlite::params![task_id],
391 |r| r.get::<_, Option<String>>(0),
392 )
393 .with_context(|| format!("read external for {task_id}"))?;
394 let next = match current {
395 Some(s) if !s.is_empty() => format!("{s},{reference}"),
396 _ => reference.to_string(),
397 };
398 conn.execute(
399 "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
400 rusqlite::params![next, task_id],
401 )?;
402 conn.execute(
403 "DELETE FROM task_pack_cache WHERE task_id = ?1",
404 rusqlite::params![task_id],
405 )?;
406 Ok(())
407}
408
409#[derive(Debug, Clone, Default)]
412pub struct TaskMetadata {
413 pub goal: Option<String>,
414 pub outcome: Option<String>,
415 pub outcome_tag: Option<String>,
416 pub external: Option<String>,
417}
418
419pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
420 let mut stmt =
421 conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
422 let mut rows = stmt.query(rusqlite::params![task_id])?;
423 Ok(match rows.next()? {
424 Some(r) => Some(TaskMetadata {
425 goal: r.get::<_, Option<String>>(0)?,
426 outcome: r.get::<_, Option<String>>(1)?,
427 outcome_tag: r.get::<_, Option<String>>(2)?,
428 external: r.get::<_, Option<String>>(3)?,
429 }),
430 None => None,
431 })
432}
433
434#[derive(Debug, Clone)]
437pub struct StaleTask {
438 pub task_id: String,
439 pub title: String,
440 pub last_event_at: String,
441 pub days_idle: i64,
442}
443
444pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
447 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
448 let cutoff_str = cutoff.to_rfc3339();
449 let mut stmt = conn.prepare(
450 "SELECT task_id, title, last_event_at FROM tasks
451 WHERE status = 'open' AND last_event_at < ?1
452 ORDER BY last_event_at ASC",
453 )?;
454 let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
455 Ok((
456 r.get::<_, String>(0)?,
457 r.get::<_, String>(1)?,
458 r.get::<_, String>(2)?,
459 ))
460 })?;
461 let now = chrono::Utc::now();
462 let mut out = Vec::new();
463 for row in rows {
464 let (task_id, title, last_at) = row?;
465 let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
466 .map(|d| d.with_timezone(&chrono::Utc))
467 .unwrap_or(now);
468 let days_idle = (now - dt).num_days();
469 out.push(StaleTask {
470 task_id,
471 title,
472 last_event_at: last_at,
473 days_idle,
474 });
475 }
476 Ok(out)
477}
478
479#[derive(Debug, Clone)]
484pub struct RelatedTask {
485 pub task_id: String,
486 pub status: String,
487 pub score: f64,
488}
489
490pub fn find_related_tasks(
501 conn: &Connection,
502 arts: &crate::artifacts::Artifacts,
503) -> anyhow::Result<Vec<RelatedTask>> {
504 use std::collections::HashMap;
505 if arts.is_empty() {
506 return Ok(Vec::new());
507 }
508 let mut scores: HashMap<String, f64> = HashMap::new();
509 let mut last_seen: HashMap<String, String> = HashMap::new();
510
511 let needles: Vec<(String, f64)> = arts
512 .linked_issues
513 .iter()
514 .map(|s| (s.clone(), 1.0))
515 .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
516 .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
517 .collect();
518
519 for (needle, weight) in needles {
520 let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
521 let mut stmt = conn.prepare(
522 "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
523 WHERE artifacts LIKE ?1
524 GROUP BY task_id
525 ORDER BY ts DESC",
526 )?;
527 let rows = stmt.query_map(rusqlite::params![pattern], |r| {
528 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
529 })?;
530 for row in rows {
531 let (id, ts) = row?;
532 *scores.entry(id.clone()).or_insert(0.0) += weight;
533 last_seen.insert(id, ts);
534 }
535 }
536
537 let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
538 for (id, score) in scores {
539 let status: Option<String> = conn
540 .query_row(
541 "SELECT status FROM tasks WHERE task_id = ?1",
542 rusqlite::params![&id],
543 |r| r.get(0),
544 )
545 .ok();
546 if let Some(status) = status {
547 out.push(RelatedTask {
548 task_id: id,
549 status,
550 score,
551 });
552 }
553 }
554 out.sort_by(|a, b| {
555 b.score
556 .partial_cmp(&a.score)
557 .unwrap_or(std::cmp::Ordering::Equal)
558 .then_with(|| {
559 let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
560 let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
561 ts_b.cmp(&ts_a)
562 })
563 });
564 Ok(out)
565}
566
567pub fn find_tasks_by_linked_issues(
574 conn: &Connection,
575 issues: &[String],
576) -> anyhow::Result<Vec<(String, String)>> {
577 if issues.is_empty() {
578 return Ok(Vec::new());
579 }
580 let mut candidate_ids: Vec<String> = Vec::new();
586 for issue in issues {
587 let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
588 let mut stmt = conn.prepare(
589 "SELECT DISTINCT task_id FROM events_index
590 WHERE artifacts LIKE ?1
591 ORDER BY timestamp DESC",
592 )?;
593 let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
594 for r in rows {
595 let id = r?;
596 if !candidate_ids.contains(&id) {
597 candidate_ids.push(id);
598 }
599 }
600 }
601 let mut out = Vec::with_capacity(candidate_ids.len());
603 for id in candidate_ids {
604 let status: Option<String> = conn
605 .query_row(
606 "SELECT status FROM tasks WHERE task_id = ?1",
607 rusqlite::params![&id],
608 |r| r.get(0),
609 )
610 .ok();
611 if let Some(s) = status {
612 out.push((id, s));
613 }
614 }
615 Ok(out)
616}
617
618pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
624 let mut stmt = conn.prepare(
625 "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
626 LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
627 WHERE ei.task_id = ?1",
628 )?;
629 let rows: Vec<(String, String)> = stmt
630 .query_map(rusqlite::params![task_id], |r| {
631 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
632 })?
633 .collect::<Result<_, _>>()?;
634 let count = rows.len();
635 for (event_id, text) in rows {
636 let arts = crate::artifacts::extract(&text);
637 let json = if arts.is_empty() {
638 None
639 } else {
640 Some(serde_json::to_string(&arts)?)
641 };
642 conn.execute(
643 "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
644 rusqlite::params![json, event_id],
645 )?;
646 }
647 invalidate_pack_cascade(conn, task_id)?;
648 Ok(count)
649}
650
651pub fn task_artifacts(
657 conn: &Connection,
658 task_id: &str,
659) -> anyhow::Result<crate::artifacts::Artifacts> {
660 let mut stmt = conn.prepare(
661 "SELECT artifacts FROM events_index
662 WHERE task_id = ?1 AND artifacts IS NOT NULL
663 ORDER BY timestamp ASC",
664 )?;
665 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
666 let mut acc = crate::artifacts::Artifacts::default();
667 for row in rows {
668 let json = row?;
669 if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
670 acc.merge(parsed);
671 }
672 }
673 Ok(acc)
674}
675
676fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
680 let mut stmt =
681 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
682 let mut rows = stmt.query(rusqlite::params![project_hash])?;
683 if let Some(row) = rows.next()? {
684 Ok(Some(row.get::<_, String>(0)?))
685 } else {
686 Ok(None)
687 }
688}
689
690fn record_last_indexed(
691 conn: &Connection,
692 project_hash: &str,
693 event_id: &str,
694) -> anyhow::Result<()> {
695 conn.execute(
696 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
697 VALUES (?1, ?2, ?3)
698 ON CONFLICT(project_hash) DO UPDATE SET
699 last_indexed_event_id = excluded.last_indexed_event_id,
700 updated_at = excluded.updated_at",
701 rusqlite::params![
702 project_hash,
703 event_id,
704 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
705 ],
706 )?;
707 Ok(())
708}
709
710pub fn ingest_new_events(
720 conn: &Connection,
721 jsonl_path: impl AsRef<Path>,
722 project_hash: &str,
723) -> anyhow::Result<usize> {
724 let marker = match last_indexed_event_id(conn, project_hash)? {
725 Some(id) => id,
726 None => return rebuild_state(conn, jsonl_path, project_hash),
727 };
728
729 let f = std::fs::File::open(&jsonl_path)
730 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
731 let reader = std::io::BufReader::new(f);
732
733 let tx = conn.unchecked_transaction()?;
737 let mut found_marker = false;
738 let mut count = 0;
739 let mut last_event_id: Option<String> = None;
740 for (i, line) in reader.lines().enumerate() {
741 let line = line.with_context(|| format!("read line {i}"))?;
742 if line.trim().is_empty() {
743 continue;
744 }
745 let event: Event = match serde_json::from_str(&line) {
746 Ok(e) => e,
747 Err(err) => {
748 tracing::warn!(
749 line_number = i + 1,
750 error = %err,
751 "skipping malformed JSONL line in ingest_new_events"
752 );
753 continue;
754 }
755 };
756 if !found_marker {
757 if event.event_id == marker {
758 found_marker = true;
759 }
760 continue;
761 }
762 upsert_task_from_event(&tx, &event, project_hash)?;
763 index_event(&tx, &event)?;
764 last_event_id = Some(event.event_id.clone());
765 count += 1;
766 }
767
768 if !found_marker {
769 drop(tx);
771 tracing::warn!(
772 project_hash = project_hash,
773 marker = marker.as_str(),
774 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
775 );
776 return rebuild_state(conn, jsonl_path, project_hash);
777 }
778
779 if let Some(eid) = last_event_id.as_deref() {
780 record_last_indexed(&tx, project_hash, eid)?;
781 }
782 tx.commit()?;
783 Ok(count)
784}
785
786pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
787 let type_str = serde_json::to_value(event.event_type)?
788 .as_str()
789 .unwrap()
790 .to_string();
791 let status_str = serde_json::to_value(event.status)?
792 .as_str()
793 .unwrap()
794 .to_string();
795 let artifacts = crate::artifacts::extract(&event.text);
800 let artifacts_json = if artifacts.is_empty() {
801 None
802 } else {
803 Some(serde_json::to_string(&artifacts)?)
804 };
805 conn.execute(
806 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
807 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
808 rusqlite::params![
809 event.event_id, event.task_id, type_str,
810 event.timestamp, event.confidence, status_str, artifacts_json
811 ],
812 )?;
813 conn.execute(
815 "DELETE FROM search_fts WHERE event_id=?1",
816 rusqlite::params![event.event_id],
817 )?;
818 conn.execute(
819 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
820 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
821 )?;
822
823 if event.event_type == EventType::Decision {
824 let alternatives_json = match event.meta.get("alternatives") {
828 Some(v) if !v.is_null() => Some(serde_json::to_string(v)?),
829 _ => None,
830 };
831 conn.execute(
832 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status, alternatives)
833 VALUES (?1, ?2, ?3, 'active', ?4)",
834 rusqlite::params![event.event_id, event.task_id, event.text, alternatives_json],
835 )?;
836 }
837
838 if event.event_type == EventType::Supersede {
839 if let Some(target) = &event.supersedes {
840 conn.execute(
841 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
842 rusqlite::params![event.event_id, target],
843 )?;
844 }
845 }
846
847 if event.event_type == EventType::Evidence {
848 let strength_str = event
849 .evidence_strength
850 .map(|s| {
851 serde_json::to_value(s)
852 .unwrap()
853 .as_str()
854 .unwrap()
855 .to_string()
856 })
857 .unwrap_or_else(|| "medium".into());
858 conn.execute(
859 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
860 VALUES (?1, ?2, ?3, ?4)",
861 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
862 )?;
863 }
864
865 invalidate_pack_cascade(conn, &event.task_id)?;
868
869 Ok(())
870}
871
872pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
873 if let Some(parent) = path.as_ref().parent() {
874 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
875 }
876 let conn =
877 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
878 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
879 apply_migrations(&conn).context("apply schema migrations")?;
880 Ok(conn)
881}
882
883#[derive(Debug, Clone)]
887pub struct TaskRow {
888 pub task_id: String,
889 pub title: String,
890 pub status: String,
891 pub last_event_at: String,
892 pub event_count: usize,
893}
894
895pub fn list_tasks_by_project(
899 conn: &Connection,
900 project_hash: &str,
901) -> anyhow::Result<Vec<TaskRow>> {
902 let mut stmt = conn.prepare(
903 "SELECT t.task_id, t.title, t.status, t.last_event_at,
904 COALESCE(c.cnt, 0) AS event_count
905 FROM tasks t
906 LEFT JOIN (
907 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
908 ) c ON c.task_id = t.task_id
909 WHERE t.project_hash = ?1
910 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
911 )?;
912 let rows = stmt
913 .query_map(rusqlite::params![project_hash], |r| {
914 Ok(TaskRow {
915 task_id: r.get::<_, String>(0)?,
916 title: r.get::<_, String>(1)?,
917 status: r.get::<_, String>(2)?,
918 last_event_at: r.get::<_, String>(3)?,
919 event_count: r.get::<_, i64>(4)? as usize,
920 })
921 })?
922 .collect::<Result<Vec<_>, _>>()?;
923 Ok(rows)
924}
925
926pub fn top_level_tasks(conn: &Connection, project_hash: &str) -> anyhow::Result<Vec<TaskRow>> {
930 let mut stmt = conn.prepare(
931 "SELECT t.task_id, t.title, t.status, t.last_event_at,
932 COALESCE(c.cnt, 0) AS event_count
933 FROM tasks t
934 LEFT JOIN (
935 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
936 ) c ON c.task_id = t.task_id
937 WHERE t.project_hash = ?1 AND t.parent_id IS NULL
938 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
939 )?;
940 let rows = stmt
941 .query_map(rusqlite::params![project_hash], |r| {
942 Ok(TaskRow {
943 task_id: r.get::<_, String>(0)?,
944 title: r.get::<_, String>(1)?,
945 status: r.get::<_, String>(2)?,
946 last_event_at: r.get::<_, String>(3)?,
947 event_count: r.get::<_, i64>(4)? as usize,
948 })
949 })?
950 .collect::<Result<Vec<_>, _>>()?;
951 Ok(rows)
952}
953
954pub fn children_of(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<TaskRow>> {
956 let mut stmt = conn.prepare(
957 "SELECT t.task_id, t.title, t.status, t.last_event_at,
958 COALESCE(c.cnt, 0) AS event_count
959 FROM tasks t
960 LEFT JOIN (
961 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
962 ) c ON c.task_id = t.task_id
963 WHERE t.parent_id = ?1
964 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
965 )?;
966 let rows = stmt
967 .query_map(rusqlite::params![task_id], |r| {
968 Ok(TaskRow {
969 task_id: r.get::<_, String>(0)?,
970 title: r.get::<_, String>(1)?,
971 status: r.get::<_, String>(2)?,
972 last_event_at: r.get::<_, String>(3)?,
973 event_count: r.get::<_, i64>(4)? as usize,
974 })
975 })?
976 .collect::<Result<Vec<_>, _>>()?;
977 Ok(rows)
978}
979
980pub fn parent_of(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
982 let mut stmt = conn.prepare("SELECT parent_id FROM tasks WHERE task_id = ?1")?;
983 let mut rows = stmt.query(rusqlite::params![task_id])?;
984 Ok(match rows.next()? {
985 Some(r) => r.get::<_, Option<String>>(0)?,
986 None => None,
987 })
988}
989
990pub fn would_create_cycle(
995 conn: &Connection,
996 task_id: &str,
997 new_parent: &str,
998) -> anyhow::Result<bool> {
999 if task_id == new_parent {
1000 return Ok(true);
1001 }
1002 let mut cursor = Some(new_parent.to_string());
1003 for _ in 0..64 {
1004 let Some(cur) = cursor else {
1005 return Ok(false);
1006 };
1007 if cur == task_id {
1008 return Ok(true);
1009 }
1010 cursor = parent_of(conn, &cur)?;
1011 }
1012 Ok(true)
1014}
1015
1016pub fn count_open_children(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
1018 let n: i64 = conn.query_row(
1019 "SELECT COUNT(*) FROM tasks WHERE parent_id = ?1 AND status = 'open'",
1020 rusqlite::params![task_id],
1021 |r| r.get(0),
1022 )?;
1023 Ok(n as usize)
1024}
1025
1026pub fn invalidate_pack_cascade(conn: &Connection, task_id: &str) -> anyhow::Result<()> {
1028 conn.execute(
1029 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1030 rusqlite::params![task_id],
1031 )?;
1032 if let Some(parent) = parent_of(conn, task_id)? {
1033 conn.execute(
1034 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1035 rusqlite::params![parent],
1036 )?;
1037 }
1038 Ok(())
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::*;
1044 use tempfile::TempDir;
1045
1046 #[test]
1047 fn task_exists_returns_true_for_known_id_false_otherwise() {
1048 let d = TempDir::new().unwrap();
1049 let conn = open(d.path().join("s.sqlite")).unwrap();
1050
1051 assert!(!task_exists(&conn, "tj-nope").unwrap());
1052
1053 let e = make_open_event("tj-yes", "Hello");
1054 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
1055 index_event(&conn, &e).unwrap();
1056
1057 assert!(task_exists(&conn, "tj-yes").unwrap());
1058 assert!(!task_exists(&conn, "tj-nope").unwrap());
1059 }
1060
1061 #[test]
1062 fn fresh_db_runs_all_migrations() {
1063 let d = TempDir::new().unwrap();
1064 let p = d.path().join("state.sqlite");
1065 let conn = open(&p).unwrap();
1066
1067 let applied: Vec<i64> = conn
1068 .prepare("SELECT version FROM schema_migrations ORDER BY version")
1069 .unwrap()
1070 .query_map([], |r| r.get::<_, i64>(0))
1071 .unwrap()
1072 .collect::<Result<_, _>>()
1073 .unwrap();
1074 assert_eq!(
1075 applied,
1076 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
1077 "every declared migration must be recorded"
1078 );
1079 }
1080
1081 #[test]
1082 fn apply_migrations_is_idempotent_across_reopens() {
1083 let d = TempDir::new().unwrap();
1084 let p = d.path().join("state.sqlite");
1085 let _ = open(&p).unwrap();
1086 let _ = open(&p).unwrap();
1087
1088 let count: i64 = open(&p)
1089 .unwrap()
1090 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
1091 .unwrap();
1092 assert_eq!(
1093 count,
1094 MIGRATIONS.len() as i64,
1095 "schema_migrations must contain exactly one row per declared migration after repeated opens"
1096 );
1097 }
1098
1099 #[test]
1100 fn open_creates_all_tables() {
1101 let d = TempDir::new().unwrap();
1102 let p = d.path().join("state.sqlite");
1103 let conn = open(&p).unwrap();
1104
1105 let names: Vec<String> = conn
1106 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
1107 .unwrap()
1108 .query_map([], |r| r.get::<_, String>(0))
1109 .unwrap()
1110 .collect::<Result<_, _>>()
1111 .unwrap();
1112
1113 for required in [
1114 "decisions",
1115 "events_index",
1116 "evidence",
1117 "task_pack_cache",
1118 "tasks",
1119 "search_fts",
1120 ] {
1121 assert!(
1122 names.iter().any(|n| n == required),
1123 "missing table {required}, have {names:?}"
1124 );
1125 }
1126 }
1127
1128 #[test]
1129 fn open_is_idempotent() {
1130 let d = TempDir::new().unwrap();
1131 let p = d.path().join("state.sqlite");
1132 let _ = open(&p).unwrap();
1133 let _ = open(&p).unwrap();
1134 }
1135
1136 #[test]
1137 fn index_event_projects_evidence() {
1138 let d = TempDir::new().unwrap();
1139 let conn = open(d.path().join("s.sqlite")).unwrap();
1140 let mut open_e = crate::event::Event::new(
1141 "tj-e",
1142 crate::event::EventType::Open,
1143 crate::event::Author::User,
1144 crate::event::Source::Cli,
1145 "x".into(),
1146 );
1147 open_e.meta = serde_json::json!({"title": "T"});
1148 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1149 index_event(&conn, &open_e).unwrap();
1150
1151 let mut ev = crate::event::Event::new(
1152 "tj-e",
1153 crate::event::EventType::Evidence,
1154 crate::event::Author::Agent,
1155 crate::event::Source::Chat,
1156 "Hook startup measured at 12ms".into(),
1157 );
1158 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
1159 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
1160 index_event(&conn, &ev).unwrap();
1161
1162 let (text, strength): (String, String) = conn
1163 .query_row(
1164 "SELECT text, strength FROM evidence WHERE task_id=?1",
1165 rusqlite::params!["tj-e"],
1166 |r| Ok((r.get(0)?, r.get(1)?)),
1167 )
1168 .unwrap();
1169 assert!(text.contains("12ms"));
1170 assert_eq!(strength, "strong");
1171 }
1172
1173 #[test]
1174 fn supersede_event_marks_decision_superseded() {
1175 let d = TempDir::new().unwrap();
1176 let conn = open(d.path().join("s.sqlite")).unwrap();
1177 let mut open_e = crate::event::Event::new(
1178 "tj-s",
1179 crate::event::EventType::Open,
1180 crate::event::Author::User,
1181 crate::event::Source::Cli,
1182 "x".into(),
1183 );
1184 open_e.meta = serde_json::json!({"title": "T"});
1185 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1186 index_event(&conn, &open_e).unwrap();
1187
1188 let dec = crate::event::Event::new(
1189 "tj-s",
1190 crate::event::EventType::Decision,
1191 crate::event::Author::Agent,
1192 crate::event::Source::Chat,
1193 "Use TS".into(),
1194 );
1195 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1196 index_event(&conn, &dec).unwrap();
1197
1198 let mut sup = crate::event::Event::new(
1199 "tj-s",
1200 crate::event::EventType::Supersede,
1201 crate::event::Author::Agent,
1202 crate::event::Source::Chat,
1203 "Replaced by Rust decision".into(),
1204 );
1205 sup.supersedes = Some(dec.event_id.clone());
1206 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1207 index_event(&conn, &sup).unwrap();
1208
1209 let (status, by): (String, Option<String>) = conn
1210 .query_row(
1211 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1212 rusqlite::params![dec.event_id],
1213 |r| Ok((r.get(0)?, r.get(1)?)),
1214 )
1215 .unwrap();
1216 assert_eq!(status, "superseded");
1217 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1218 }
1219
1220 #[test]
1221 fn index_event_projects_decision_to_decisions_table() {
1222 let d = TempDir::new().unwrap();
1223 let conn = open(d.path().join("s.sqlite")).unwrap();
1224
1225 let mut open_e = crate::event::Event::new(
1226 "tj-d",
1227 crate::event::EventType::Open,
1228 crate::event::Author::User,
1229 crate::event::Source::Cli,
1230 "x".into(),
1231 );
1232 open_e.meta = serde_json::json!({"title": "T"});
1233 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1234 index_event(&conn, &open_e).unwrap();
1235
1236 let dec = crate::event::Event::new(
1237 "tj-d",
1238 crate::event::EventType::Decision,
1239 crate::event::Author::Agent,
1240 crate::event::Source::Chat,
1241 "Adopt Rust".into(),
1242 );
1243 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1244 index_event(&conn, &dec).unwrap();
1245
1246 let (id, text, status): (String, String, String) = conn
1247 .query_row(
1248 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1249 rusqlite::params!["tj-d"],
1250 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1251 )
1252 .unwrap();
1253 assert_eq!(id, dec.event_id);
1254 assert_eq!(text, "Adopt Rust");
1255 assert_eq!(status, "active");
1256 }
1257
1258 #[test]
1259 fn index_event_projects_decision_alternatives_into_column() {
1260 let d = TempDir::new().unwrap();
1261 let conn = open(d.path().join("s.sqlite")).unwrap();
1262
1263 let mut dec = crate::event::Event::new(
1264 "tj-alt",
1265 crate::event::EventType::Decision,
1266 crate::event::Author::Agent,
1267 crate::event::Source::Chat,
1268 "Use SQLite".into(),
1269 );
1270 dec.meta = serde_json::json!({
1271 "alternatives": [
1272 {"option": "SQLite", "chosen": true, "rationale": "embedded, zero-ops"},
1273 {"option": "Postgres", "chosen": false, "rationale": "too heavy for local tool"}
1274 ]
1275 });
1276 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1277 index_event(&conn, &dec).unwrap();
1278
1279 let alts: Option<String> = conn
1280 .query_row(
1281 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1282 rusqlite::params![dec.event_id],
1283 |r| r.get(0),
1284 )
1285 .unwrap();
1286 let alts = alts.expect("alternatives column should be populated");
1287 let parsed: serde_json::Value = serde_json::from_str(&alts).unwrap();
1288 assert_eq!(parsed.as_array().unwrap().len(), 2);
1289 assert_eq!(parsed[0]["option"], "SQLite");
1290 assert_eq!(parsed[0]["chosen"], true);
1291 }
1292
1293 #[test]
1294 fn index_event_decision_without_alternatives_leaves_column_null() {
1295 let d = TempDir::new().unwrap();
1296 let conn = open(d.path().join("s.sqlite")).unwrap();
1297
1298 let dec = crate::event::Event::new(
1299 "tj-noalt",
1300 crate::event::EventType::Decision,
1301 crate::event::Author::Agent,
1302 crate::event::Source::Chat,
1303 "Plain decision".into(),
1304 );
1305 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1306 index_event(&conn, &dec).unwrap();
1307
1308 let alts: Option<String> = conn
1309 .query_row(
1310 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1311 rusqlite::params![dec.event_id],
1312 |r| r.get(0),
1313 )
1314 .unwrap();
1315 assert!(alts.is_none());
1316 }
1317
1318 #[test]
1319 fn index_event_is_idempotent_no_search_fts_duplicates() {
1320 let d = TempDir::new().unwrap();
1321 let conn = open(d.path().join("s.sqlite")).unwrap();
1322 let mut open_e = crate::event::Event::new(
1323 "tj-id",
1324 crate::event::EventType::Open,
1325 crate::event::Author::User,
1326 crate::event::Source::Cli,
1327 "x".into(),
1328 );
1329 open_e.meta = serde_json::json!({"title": "Idempotent"});
1330 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1331
1332 index_event(&conn, &open_e).unwrap();
1334 index_event(&conn, &open_e).unwrap();
1335 index_event(&conn, &open_e).unwrap();
1336
1337 let n: i64 = conn
1338 .query_row(
1339 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1340 rusqlite::params![open_e.event_id],
1341 |r| r.get(0),
1342 )
1343 .unwrap();
1344 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1345 }
1346
1347 #[test]
1348 fn list_all_projects_returns_hashes_from_state_dir() {
1349 use std::fs::File;
1350 let d = TempDir::new().unwrap();
1351 let state_dir = d.path().join("state");
1352 std::fs::create_dir_all(&state_dir).unwrap();
1353 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1354 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1355 File::create(state_dir.join("not-a-project.txt")).unwrap();
1356
1357 let mut hashes = list_all_projects(&state_dir).unwrap();
1358 hashes.sort();
1359 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1360 }
1361
1362 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1363 use std::io::Write;
1364 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1365 }
1366
1367 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1368 let mut e = crate::event::Event::new(
1369 task_id,
1370 crate::event::EventType::Open,
1371 crate::event::Author::User,
1372 crate::event::Source::Cli,
1373 "x".into(),
1374 );
1375 e.meta = serde_json::json!({"title": title});
1376 e
1377 }
1378
1379 #[test]
1380 fn ingest_new_events_picks_up_only_new_lines() {
1381 let d = TempDir::new().unwrap();
1382 let jsonl = d.path().join("events.jsonl");
1383 let db = d.path().join("s.sqlite");
1384 let project = "deadbeefdeadbeef";
1385
1386 let e1 = make_open_event("tj-i1", "first");
1387 let e2 = make_open_event("tj-i2", "second");
1388 let e3 = make_open_event("tj-i3", "third");
1389
1390 let mut f = std::fs::File::create(&jsonl).unwrap();
1391 write_event_line(&mut f, &e1);
1392 write_event_line(&mut f, &e2);
1393 write_event_line(&mut f, &e3);
1394 drop(f);
1395
1396 let conn = open(&db).unwrap();
1398 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1399 assert_eq!(n_first, 3);
1400
1401 let e4 = make_open_event("tj-i4", "fourth");
1403 let e5 = make_open_event("tj-i5", "fifth");
1404 let mut f = std::fs::OpenOptions::new()
1405 .append(true)
1406 .open(&jsonl)
1407 .unwrap();
1408 write_event_line(&mut f, &e4);
1409 write_event_line(&mut f, &e5);
1410 drop(f);
1411
1412 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1414 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1415
1416 let total: i64 = conn
1417 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1418 .unwrap();
1419 assert_eq!(total, 5);
1420
1421 let marker: String = conn
1422 .query_row(
1423 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1424 rusqlite::params![project],
1425 |r| r.get(0),
1426 )
1427 .unwrap();
1428 assert_eq!(marker, e5.event_id);
1429 }
1430
1431 #[test]
1432 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1433 let d = TempDir::new().unwrap();
1434 let jsonl = d.path().join("events.jsonl");
1435 let db = d.path().join("s.sqlite");
1436 let project = "feedfacefeedface";
1437
1438 let e1 = make_open_event("tj-r1", "first");
1439 let mut f = std::fs::File::create(&jsonl).unwrap();
1440 write_event_line(&mut f, &e1);
1441 drop(f);
1442
1443 let conn = open(&db).unwrap();
1444 ingest_new_events(&conn, &jsonl, project).unwrap();
1445
1446 let e2 = make_open_event("tj-r2", "after-corruption");
1449 let e3 = make_open_event("tj-r3", "after-corruption-2");
1450 let mut f = std::fs::File::create(&jsonl).unwrap();
1451 write_event_line(&mut f, &e2);
1452 write_event_line(&mut f, &e3);
1453 drop(f);
1454
1455 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1456 assert_eq!(n, 2, "missing marker must trigger full rebuild");
1457 }
1458
1459 #[test]
1460 fn rebuild_state_and_ingest_new_events_produce_same_state() {
1461 let d = TempDir::new().unwrap();
1462 let jsonl_a = d.path().join("a.jsonl");
1463 let jsonl_b = d.path().join("b.jsonl");
1464 let db_a = d.path().join("a.sqlite");
1465 let db_b = d.path().join("b.sqlite");
1466
1467 let events: Vec<_> = (0..5)
1468 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1469 .collect();
1470 for path in [&jsonl_a, &jsonl_b] {
1471 let mut f = std::fs::File::create(path).unwrap();
1472 for e in &events {
1473 write_event_line(&mut f, e);
1474 }
1475 }
1476
1477 let conn_a = open(&db_a).unwrap();
1478 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1479
1480 let conn_b = open(&db_b).unwrap();
1481 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1482
1483 assert_eq!(n_a, n_b);
1484 assert_eq!(n_a, 5);
1485
1486 for table in ["tasks", "events_index"] {
1487 let q = format!("SELECT COUNT(*) FROM {table}");
1488 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1489 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1490 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1491 }
1492 }
1493
1494 #[test]
1495 fn rebuild_state_skips_malformed_jsonl_lines() {
1496 use std::io::Write;
1497 let d = TempDir::new().unwrap();
1498 let events_path = d.path().join("events.jsonl");
1499 let db_path = d.path().join("s.sqlite");
1500
1501 let mut f = std::fs::File::create(&events_path).unwrap();
1502
1503 let mut e1 = crate::event::Event::new(
1504 "tj-skip",
1505 crate::event::EventType::Open,
1506 crate::event::Author::User,
1507 crate::event::Source::Cli,
1508 "x".into(),
1509 );
1510 e1.meta = serde_json::json!({"title": "Skip test"});
1511 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1512
1513 writeln!(f, "this is not a json event line").unwrap();
1515
1516 writeln!(f, "{{\"foo\": 1}}").unwrap();
1518
1519 let e3 = crate::event::Event::new(
1520 "tj-skip",
1521 crate::event::EventType::Decision,
1522 crate::event::Author::Agent,
1523 crate::event::Source::Chat,
1524 "Adopt Rust".into(),
1525 );
1526 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1527 drop(f);
1528
1529 let conn = open(&db_path).unwrap();
1530 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1531 .expect("rebuild_state must succeed despite malformed lines");
1532 assert_eq!(
1533 n, 2,
1534 "expected 2 valid events indexed (2 malformed skipped)"
1535 );
1536
1537 let indexed: i64 = conn
1538 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1539 .unwrap();
1540 assert_eq!(indexed, 2);
1541 }
1542
1543 #[test]
1544 fn rebuild_state_reads_jsonl_and_populates_db() {
1545 use std::io::Write;
1546 let d = TempDir::new().unwrap();
1547 let events_path = d.path().join("events.jsonl");
1548 let db_path = d.path().join("s.sqlite");
1549
1550 let mut f = std::fs::File::create(&events_path).unwrap();
1551 let mut e1 = crate::event::Event::new(
1552 "tj-9",
1553 crate::event::EventType::Open,
1554 crate::event::Author::User,
1555 crate::event::Source::Cli,
1556 "x".into(),
1557 );
1558 e1.meta = serde_json::json!({"title": "Nine"});
1559 let e2 = crate::event::Event::new(
1560 "tj-9",
1561 crate::event::EventType::Decision,
1562 crate::event::Author::Agent,
1563 crate::event::Source::Chat,
1564 "Adopt Rust".into(),
1565 );
1566 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1567 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1568 drop(f);
1569
1570 let conn = open(&db_path).unwrap();
1571 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1572 assert_eq!(n, 2);
1573
1574 let n: i64 = conn
1575 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1576 .unwrap();
1577 assert_eq!(n, 1);
1578 let n: i64 = conn
1579 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1580 .unwrap();
1581 assert_eq!(n, 2);
1582 }
1583
1584 #[test]
1585 fn index_event_writes_index_and_fts() {
1586 let d = TempDir::new().unwrap();
1587 let conn = open(d.path().join("s.sqlite")).unwrap();
1588 let mut open_e = crate::event::Event::new(
1589 "tj-1",
1590 crate::event::EventType::Open,
1591 crate::event::Author::User,
1592 crate::event::Source::Cli,
1593 "Title".into(),
1594 );
1595 open_e.meta = serde_json::json!({"title": "Title"});
1596 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1597 index_event(&conn, &open_e).unwrap();
1598
1599 let mut decision = crate::event::Event::new(
1600 "tj-1",
1601 crate::event::EventType::Decision,
1602 crate::event::Author::Agent,
1603 crate::event::Source::Chat,
1604 "Adopt Rust".into(),
1605 );
1606 decision.confidence = Some(0.92);
1607 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1608 index_event(&conn, &decision).unwrap();
1609
1610 let count: i64 = conn
1611 .query_row(
1612 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1613 rusqlite::params!["tj-1"],
1614 |r| r.get(0),
1615 )
1616 .unwrap();
1617 assert_eq!(count, 2);
1618
1619 let mut stmt = conn
1620 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1621 .unwrap();
1622 let hits: Vec<String> = stmt
1623 .query_map(rusqlite::params!["Rust"], |r| {
1624 let s: String = r.get(0)?;
1625 Ok(s)
1626 })
1627 .unwrap()
1628 .collect::<Result<Vec<_>, _>>()
1629 .unwrap();
1630 assert_eq!(hits.len(), 1);
1631 assert_eq!(hits[0], decision.event_id);
1632 }
1633
1634 #[test]
1635 fn upsert_task_from_open_event_inserts_row() {
1636 let d = TempDir::new().unwrap();
1637 let conn = open(d.path().join("s.sqlite")).unwrap();
1638
1639 let mut e = crate::event::Event::new(
1640 "tj-7f3a",
1641 crate::event::EventType::Open,
1642 crate::event::Author::User,
1643 crate::event::Source::Cli,
1644 "Add OAuth".into(),
1645 );
1646 e.meta = serde_json::json!({ "title": "Add OAuth login" });
1647
1648 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1649
1650 let (id, title, status): (String, String, String) = conn
1651 .query_row(
1652 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1653 ["tj-7f3a"],
1654 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1655 )
1656 .unwrap();
1657
1658 assert_eq!(id, "tj-7f3a");
1659 assert_eq!(title, "Add OAuth login");
1660 assert_eq!(status, "open");
1661 }
1662
1663 #[test]
1664 fn migration_adds_parent_id_column_nullable() {
1665 let d = tempfile::TempDir::new().unwrap();
1666 let conn = open(d.path().join("s.sqlite")).unwrap();
1667
1668 let e = make_open_event("tj-a", "Top");
1670 upsert_task_from_event(&conn, &e, "ph").unwrap();
1671
1672 let parent: Option<String> = conn
1673 .query_row(
1674 "SELECT parent_id FROM tasks WHERE task_id = ?1",
1675 rusqlite::params!["tj-a"],
1676 |r| r.get(0),
1677 )
1678 .unwrap();
1679 assert_eq!(parent, None);
1680 }
1681
1682 #[test]
1683 fn open_event_meta_parent_id_is_persisted() {
1684 let d = tempfile::TempDir::new().unwrap();
1685 let conn = open(d.path().join("s.sqlite")).unwrap();
1686
1687 upsert_task_from_event(&conn, &make_open_event("tj-parent", "Parent"), "ph").unwrap();
1689
1690 let mut child = make_open_event("tj-child", "Child");
1692 child.meta = serde_json::json!({"title": "Child", "parent_id": "tj-parent"});
1693 upsert_task_from_event(&conn, &child, "ph").unwrap();
1694
1695 let parent: Option<String> = conn
1696 .query_row(
1697 "SELECT parent_id FROM tasks WHERE task_id = ?1",
1698 rusqlite::params!["tj-child"],
1699 |r| r.get(0),
1700 )
1701 .unwrap();
1702 assert_eq!(parent.as_deref(), Some("tj-parent"));
1703 }
1704
1705 #[test]
1706 fn children_of_and_parent_of_work() {
1707 let d = tempfile::TempDir::new().unwrap();
1708 let conn = open(d.path().join("s.sqlite")).unwrap();
1709 upsert_task_from_event(&conn, &make_open_event("p", "Parent"), "ph").unwrap();
1710
1711 let mut c1 = make_open_event("c1", "Child1");
1712 c1.meta = serde_json::json!({"title": "Child1", "parent_id": "p"});
1713 upsert_task_from_event(&conn, &c1, "ph").unwrap();
1714 let mut c2 = make_open_event("c2", "Child2");
1715 c2.meta = serde_json::json!({"title": "Child2", "parent_id": "p"});
1716 upsert_task_from_event(&conn, &c2, "ph").unwrap();
1717
1718 let kids = children_of(&conn, "p").unwrap();
1719 let ids: Vec<&str> = kids.iter().map(|t| t.task_id.as_str()).collect();
1720 assert!(ids.contains(&"c1") && ids.contains(&"c2"));
1721 assert_eq!(kids.len(), 2);
1722
1723 assert_eq!(parent_of(&conn, "c1").unwrap().as_deref(), Some("p"));
1724 assert_eq!(parent_of(&conn, "p").unwrap(), None);
1725 }
1726
1727 #[test]
1728 fn cycle_guard_rejects_self_and_ancestor() {
1729 let d = tempfile::TempDir::new().unwrap();
1730 let conn = open(d.path().join("s.sqlite")).unwrap();
1731 upsert_task_from_event(&conn, &make_open_event("a", "A"), "ph").unwrap();
1732 let mut b = make_open_event("b", "B");
1733 b.meta = serde_json::json!({"title": "B", "parent_id": "a"});
1734 upsert_task_from_event(&conn, &b, "ph").unwrap();
1735
1736 assert!(would_create_cycle(&conn, "a", "b").unwrap());
1738 assert!(would_create_cycle(&conn, "a", "a").unwrap());
1740 upsert_task_from_event(&conn, &make_open_event("x", "X"), "ph").unwrap();
1742 assert!(!would_create_cycle(&conn, "x", "a").unwrap());
1743 }
1744
1745 #[test]
1746 fn invalidate_cascade_clears_parent_pack() {
1747 let d = tempfile::TempDir::new().unwrap();
1748 let conn = open(d.path().join("s.sqlite")).unwrap();
1749 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
1750 let mut c = make_open_event("c", "C");
1751 c.meta = serde_json::json!({"title": "C", "parent_id": "p"});
1752 upsert_task_from_event(&conn, &c, "ph").unwrap();
1753
1754 for id in ["p", "c"] {
1756 conn.execute(
1757 "INSERT INTO task_pack_cache(task_id, mode, text, generated_at, source_event_count)
1758 VALUES (?1, 'compact', 'x', '2026-01-01T00:00:00Z', 1)",
1759 rusqlite::params![id],
1760 )
1761 .unwrap();
1762 }
1763
1764 invalidate_pack_cascade(&conn, "c").unwrap();
1765
1766 let remaining: i64 = conn
1767 .query_row("SELECT COUNT(*) FROM task_pack_cache", [], |r| r.get(0))
1768 .unwrap();
1769 assert_eq!(remaining, 0, "both child and parent pack caches cleared");
1770 }
1771
1772 #[test]
1773 fn count_open_children_counts_only_open() {
1774 let d = tempfile::TempDir::new().unwrap();
1775 let conn = open(d.path().join("s.sqlite")).unwrap();
1776 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
1777 let mut c1 = make_open_event("c1", "C1");
1778 c1.meta = serde_json::json!({"title": "C1", "parent_id": "p"});
1779 upsert_task_from_event(&conn, &c1, "ph").unwrap();
1780 let mut close = crate::event::Event::new(
1782 "c1",
1783 crate::event::EventType::Close,
1784 crate::event::Author::User,
1785 crate::event::Source::Cli,
1786 "done".into(),
1787 );
1788 close.timestamp = "2026-01-02T00:00:00Z".into();
1789 upsert_task_from_event(&conn, &close, "ph").unwrap();
1790 let mut c2 = make_open_event("c2", "C2");
1791 c2.meta = serde_json::json!({"title": "C2", "parent_id": "p"});
1792 upsert_task_from_event(&conn, &c2, "ph").unwrap();
1793
1794 assert_eq!(count_open_children(&conn, "p").unwrap(), 1); }
1796}