1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal TEXT;
87ALTER TABLE tasks ADD COLUMN outcome TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103const MIGRATION_005: &str = r#"
106CREATE TABLE IF NOT EXISTS dream_state (
107 project_hash TEXT PRIMARY KEY,
108 last_dream_at TEXT NOT NULL,
109 updated_at TEXT NOT NULL
110);
111"#;
112
113const MIGRATION_006: &str = r#"
117ALTER TABLE tasks ADD COLUMN parent_id TEXT;
118CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_id);
119"#;
120
121const MIGRATION_007: &str = r#"
127ALTER TABLE decisions ADD COLUMN alternatives TEXT;
128DELETE FROM task_pack_cache;
129"#;
130
131const MIGRATION_008: &str = r#"
139CREATE TABLE IF NOT EXISTS embeddings (
140 event_id TEXT PRIMARY KEY,
141 task_id TEXT NOT NULL,
142 project_hash TEXT NOT NULL,
143 tier TEXT NOT NULL DEFAULT 'episodic',
144 model TEXT NOT NULL,
145 dim INTEGER NOT NULL,
146 vec BLOB NOT NULL,
147 created_at TEXT NOT NULL
148);
149CREATE INDEX IF NOT EXISTS idx_emb_project_tier ON embeddings(project_hash, tier);
150ALTER TABLE events_index ADD COLUMN memory_tier TEXT NOT NULL DEFAULT 'episodic';
151"#;
152
153const MIGRATIONS: &[Migration] = &[
156 Migration {
157 version: 1,
158 sql: MIGRATION_001,
159 },
160 Migration {
161 version: 2,
162 sql: MIGRATION_002,
163 },
164 Migration {
165 version: 3,
166 sql: MIGRATION_003,
167 },
168 Migration {
169 version: 4,
170 sql: MIGRATION_004,
171 },
172 Migration {
173 version: 5,
174 sql: MIGRATION_005,
175 },
176 Migration {
177 version: 6,
178 sql: MIGRATION_006,
179 },
180 Migration {
181 version: 7,
182 sql: MIGRATION_007,
183 },
184 Migration {
185 version: 8,
186 sql: MIGRATION_008,
187 },
188];
189
190fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
191 conn.execute_batch(
192 "CREATE TABLE IF NOT EXISTS schema_migrations (
193 version INTEGER PRIMARY KEY,
194 applied_at TEXT NOT NULL
195 )",
196 )
197 .context("create schema_migrations table")?;
198
199 let applied: HashSet<i64> = {
200 let mut stmt = conn
201 .prepare("SELECT version FROM schema_migrations")
202 .context("select applied versions")?;
203 let rows = stmt
204 .query_map([], |r| r.get::<_, i64>(0))
205 .context("iterate schema_migrations")?;
206 rows.collect::<rusqlite::Result<HashSet<_>>>()
207 .context("collect applied versions")?
208 };
209
210 for migration in MIGRATIONS {
211 if applied.contains(&migration.version) {
212 continue;
213 }
214 conn.execute_batch(migration.sql)
215 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
216 conn.execute(
217 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
218 rusqlite::params![
219 migration.version,
220 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
221 ],
222 )
223 .with_context(|| {
224 format!(
225 "record schema migration v{:03} as applied",
226 migration.version
227 )
228 })?;
229 }
230 Ok(())
231}
232
233use crate::event::{Event, EventType};
234
235pub fn upsert_task_from_event(
236 conn: &Connection,
237 event: &Event,
238 project_hash: &str,
239) -> anyhow::Result<()> {
240 match event.event_type {
241 EventType::Open => {
242 let title = event
243 .meta
244 .get("title")
245 .and_then(|v| v.as_str())
246 .unwrap_or(&event.text)
247 .to_string();
248 let parent_id = event
249 .meta
250 .get("parent_id")
251 .and_then(|v| v.as_str())
252 .map(str::to_string);
253 conn.execute(
256 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at, parent_id)
257 VALUES (?1, ?2, 'open', ?3, ?4, ?4, ?5)
258 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
259 rusqlite::params![event.task_id, title, project_hash, event.timestamp, parent_id],
260 )?;
261 }
262 EventType::Close => {
263 conn.execute(
264 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
265 rusqlite::params![event.task_id, event.timestamp],
266 )?;
267 if let Some(outcome) = event.meta.get("outcome").and_then(|v| v.as_str()) {
271 let tag = event.meta.get("outcome_tag").and_then(|v| v.as_str());
272 conn.execute(
273 "UPDATE tasks SET outcome=?2, outcome_tag=?3 WHERE task_id=?1",
274 rusqlite::params![event.task_id, outcome, tag],
275 )?;
276 }
277 }
278 EventType::Reopen => {
279 conn.execute(
280 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
281 rusqlite::params![event.task_id, event.timestamp],
282 )?;
283 }
284 EventType::Rename => {
285 let title = event
288 .meta
289 .get("title")
290 .and_then(|v| v.as_str())
291 .unwrap_or(&event.text);
292 conn.execute(
293 "UPDATE tasks SET title=?2, last_event_at=?3 WHERE task_id=?1",
294 rusqlite::params![event.task_id, title, event.timestamp],
295 )?;
296 }
297 _ => {
298 conn.execute(
299 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
300 rusqlite::params![event.task_id, event.timestamp],
301 )?;
302 }
303 }
304 Ok(())
305}
306
307use std::io::BufRead;
308
309pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
310 let dir = state_dir.as_ref();
311 if !dir.exists() {
312 return Ok(vec![]);
313 }
314 let mut out = Vec::new();
315 for entry in std::fs::read_dir(dir)? {
316 let entry = entry?;
317 let path = entry.path();
318 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
319 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
320 out.push(stem.to_string());
321 }
322 }
323 }
324 Ok(out)
325}
326
327pub fn rebuild_state(
328 conn: &Connection,
329 jsonl_path: impl AsRef<Path>,
330 project_hash: &str,
331) -> anyhow::Result<usize> {
332 let f = std::fs::File::open(&jsonl_path)
333 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
334 let reader = std::io::BufReader::new(f);
335
336 let tx = conn.unchecked_transaction()?;
337 let mut count = 0;
338 let mut last_event_id: Option<String> = None;
339 for (i, line) in reader.lines().enumerate() {
340 let line = line.with_context(|| format!("read line {i}"))?;
341 if line.trim().is_empty() {
342 continue;
343 }
344 let event: Event = match serde_json::from_str(&line) {
348 Ok(e) => e,
349 Err(err) => {
350 tracing::warn!(
351 line_number = i + 1,
352 error = %err,
353 "skipping malformed JSONL line in rebuild_state"
354 );
355 continue;
356 }
357 };
358 upsert_task_from_event(&tx, &event, project_hash)?;
359 index_event(&tx, &event)?;
360 last_event_id = Some(event.event_id.clone());
361 count += 1;
362 }
363 if let Some(eid) = last_event_id.as_deref() {
364 record_last_indexed(&tx, project_hash, eid)?;
365 }
366 tx.commit()?;
367 Ok(count)
368}
369
370pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
375 let count: i64 = conn.query_row(
376 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
377 rusqlite::params![task_id],
378 |r| r.get(0),
379 )?;
380 Ok(count > 0)
381}
382
383pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
387 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
388 let mut rows = stmt.query(rusqlite::params![task_id])?;
389 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
390}
391
392pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
396 conn.execute(
397 "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
398 rusqlite::params![goal, task_id],
399 )
400 .with_context(|| format!("set goal for {task_id}"))?;
401 conn.execute(
404 "DELETE FROM task_pack_cache WHERE task_id = ?1",
405 rusqlite::params![task_id],
406 )?;
407 Ok(())
408}
409
410pub fn set_task_outcome(
414 conn: &Connection,
415 task_id: &str,
416 outcome: &str,
417 outcome_tag: Option<&str>,
418) -> anyhow::Result<()> {
419 conn.execute(
420 "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
421 rusqlite::params![outcome, outcome_tag, task_id],
422 )
423 .with_context(|| format!("set outcome for {task_id}"))?;
424 conn.execute(
425 "DELETE FROM task_pack_cache WHERE task_id = ?1",
426 rusqlite::params![task_id],
427 )?;
428 Ok(())
429}
430
431pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
436 let current: Option<String> = conn
437 .query_row(
438 "SELECT external FROM tasks WHERE task_id = ?1",
439 rusqlite::params![task_id],
440 |r| r.get::<_, Option<String>>(0),
441 )
442 .with_context(|| format!("read external for {task_id}"))?;
443 let next = match current {
444 Some(s) if !s.is_empty() => format!("{s},{reference}"),
445 _ => reference.to_string(),
446 };
447 conn.execute(
448 "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
449 rusqlite::params![next, task_id],
450 )?;
451 conn.execute(
452 "DELETE FROM task_pack_cache WHERE task_id = ?1",
453 rusqlite::params![task_id],
454 )?;
455 Ok(())
456}
457
458#[derive(Debug, Clone, Default)]
461pub struct TaskMetadata {
462 pub goal: Option<String>,
463 pub outcome: Option<String>,
464 pub outcome_tag: Option<String>,
465 pub external: Option<String>,
466}
467
468pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
469 let mut stmt =
470 conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
471 let mut rows = stmt.query(rusqlite::params![task_id])?;
472 Ok(match rows.next()? {
473 Some(r) => Some(TaskMetadata {
474 goal: r.get::<_, Option<String>>(0)?,
475 outcome: r.get::<_, Option<String>>(1)?,
476 outcome_tag: r.get::<_, Option<String>>(2)?,
477 external: r.get::<_, Option<String>>(3)?,
478 }),
479 None => None,
480 })
481}
482
483#[derive(Debug, Clone)]
486pub struct StaleTask {
487 pub task_id: String,
488 pub title: String,
489 pub last_event_at: String,
490 pub days_idle: i64,
491}
492
493pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
496 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
497 let cutoff_str = cutoff.to_rfc3339();
498 let mut stmt = conn.prepare(
499 "SELECT task_id, title, last_event_at FROM tasks
500 WHERE status = 'open' AND last_event_at < ?1
501 ORDER BY last_event_at ASC",
502 )?;
503 let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
504 Ok((
505 r.get::<_, String>(0)?,
506 r.get::<_, String>(1)?,
507 r.get::<_, String>(2)?,
508 ))
509 })?;
510 let now = chrono::Utc::now();
511 let mut out = Vec::new();
512 for row in rows {
513 let (task_id, title, last_at) = row?;
514 let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
515 .map(|d| d.with_timezone(&chrono::Utc))
516 .unwrap_or(now);
517 let days_idle = (now - dt).num_days();
518 out.push(StaleTask {
519 task_id,
520 title,
521 last_event_at: last_at,
522 days_idle,
523 });
524 }
525 Ok(out)
526}
527
528#[derive(Debug, Clone)]
533pub struct RelatedTask {
534 pub task_id: String,
535 pub status: String,
536 pub score: f64,
537}
538
539pub fn find_related_tasks(
550 conn: &Connection,
551 arts: &crate::artifacts::Artifacts,
552) -> anyhow::Result<Vec<RelatedTask>> {
553 use std::collections::HashMap;
554 if arts.is_empty() {
555 return Ok(Vec::new());
556 }
557 let mut scores: HashMap<String, f64> = HashMap::new();
558 let mut last_seen: HashMap<String, String> = HashMap::new();
559
560 let needles: Vec<(String, f64)> = arts
561 .linked_issues
562 .iter()
563 .map(|s| (s.clone(), 1.0))
564 .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
565 .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
566 .collect();
567
568 for (needle, weight) in needles {
569 let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
570 let mut stmt = conn.prepare(
571 "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
572 WHERE artifacts LIKE ?1
573 GROUP BY task_id
574 ORDER BY ts DESC",
575 )?;
576 let rows = stmt.query_map(rusqlite::params![pattern], |r| {
577 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
578 })?;
579 for row in rows {
580 let (id, ts) = row?;
581 *scores.entry(id.clone()).or_insert(0.0) += weight;
582 last_seen.insert(id, ts);
583 }
584 }
585
586 let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
587 for (id, score) in scores {
588 let status: Option<String> = conn
589 .query_row(
590 "SELECT status FROM tasks WHERE task_id = ?1",
591 rusqlite::params![&id],
592 |r| r.get(0),
593 )
594 .ok();
595 if let Some(status) = status {
596 out.push(RelatedTask {
597 task_id: id,
598 status,
599 score,
600 });
601 }
602 }
603 out.sort_by(|a, b| {
604 b.score
605 .partial_cmp(&a.score)
606 .unwrap_or(std::cmp::Ordering::Equal)
607 .then_with(|| {
608 let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
609 let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
610 ts_b.cmp(&ts_a)
611 })
612 });
613 Ok(out)
614}
615
616pub fn find_tasks_by_linked_issues(
623 conn: &Connection,
624 issues: &[String],
625) -> anyhow::Result<Vec<(String, String)>> {
626 if issues.is_empty() {
627 return Ok(Vec::new());
628 }
629 let mut candidate_ids: Vec<String> = Vec::new();
635 for issue in issues {
636 let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
637 let mut stmt = conn.prepare(
638 "SELECT DISTINCT task_id FROM events_index
639 WHERE artifacts LIKE ?1
640 ORDER BY timestamp DESC",
641 )?;
642 let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
643 for r in rows {
644 let id = r?;
645 if !candidate_ids.contains(&id) {
646 candidate_ids.push(id);
647 }
648 }
649 }
650 let mut out = Vec::with_capacity(candidate_ids.len());
652 for id in candidate_ids {
653 let status: Option<String> = conn
654 .query_row(
655 "SELECT status FROM tasks WHERE task_id = ?1",
656 rusqlite::params![&id],
657 |r| r.get(0),
658 )
659 .ok();
660 if let Some(s) = status {
661 out.push((id, s));
662 }
663 }
664 Ok(out)
665}
666
667pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
673 let mut stmt = conn.prepare(
674 "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
675 LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
676 WHERE ei.task_id = ?1",
677 )?;
678 let rows: Vec<(String, String)> = stmt
679 .query_map(rusqlite::params![task_id], |r| {
680 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
681 })?
682 .collect::<Result<_, _>>()?;
683 let count = rows.len();
684 for (event_id, text) in rows {
685 let arts = crate::artifacts::extract(&text);
686 let json = if arts.is_empty() {
687 None
688 } else {
689 Some(serde_json::to_string(&arts)?)
690 };
691 conn.execute(
692 "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
693 rusqlite::params![json, event_id],
694 )?;
695 }
696 invalidate_pack_cascade(conn, task_id)?;
697 Ok(count)
698}
699
700pub fn task_artifacts(
706 conn: &Connection,
707 task_id: &str,
708) -> anyhow::Result<crate::artifacts::Artifacts> {
709 let mut stmt = conn.prepare(
710 "SELECT artifacts FROM events_index
711 WHERE task_id = ?1 AND artifacts IS NOT NULL
712 ORDER BY timestamp ASC",
713 )?;
714 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
715 let mut acc = crate::artifacts::Artifacts::default();
716 for row in rows {
717 let json = row?;
718 if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
719 acc.merge(parsed);
720 }
721 }
722 Ok(acc)
723}
724
725fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
729 let mut stmt =
730 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
731 let mut rows = stmt.query(rusqlite::params![project_hash])?;
732 if let Some(row) = rows.next()? {
733 Ok(Some(row.get::<_, String>(0)?))
734 } else {
735 Ok(None)
736 }
737}
738
739fn record_last_indexed(
740 conn: &Connection,
741 project_hash: &str,
742 event_id: &str,
743) -> anyhow::Result<()> {
744 conn.execute(
745 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
746 VALUES (?1, ?2, ?3)
747 ON CONFLICT(project_hash) DO UPDATE SET
748 last_indexed_event_id = excluded.last_indexed_event_id,
749 updated_at = excluded.updated_at",
750 rusqlite::params![
751 project_hash,
752 event_id,
753 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
754 ],
755 )?;
756 Ok(())
757}
758
759pub fn ingest_new_events(
769 conn: &Connection,
770 jsonl_path: impl AsRef<Path>,
771 project_hash: &str,
772) -> anyhow::Result<usize> {
773 let marker = match last_indexed_event_id(conn, project_hash)? {
774 Some(id) => id,
775 None => return rebuild_state(conn, jsonl_path, project_hash),
776 };
777
778 let f = std::fs::File::open(&jsonl_path)
779 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
780 let reader = std::io::BufReader::new(f);
781
782 let tx = conn.unchecked_transaction()?;
786 let mut found_marker = false;
787 let mut count = 0;
788 let mut last_event_id: Option<String> = None;
789 for (i, line) in reader.lines().enumerate() {
790 let line = line.with_context(|| format!("read line {i}"))?;
791 if line.trim().is_empty() {
792 continue;
793 }
794 let event: Event = match serde_json::from_str(&line) {
795 Ok(e) => e,
796 Err(err) => {
797 tracing::warn!(
798 line_number = i + 1,
799 error = %err,
800 "skipping malformed JSONL line in ingest_new_events"
801 );
802 continue;
803 }
804 };
805 if !found_marker {
806 if event.event_id == marker {
807 found_marker = true;
808 }
809 continue;
810 }
811 upsert_task_from_event(&tx, &event, project_hash)?;
812 index_event(&tx, &event)?;
813 last_event_id = Some(event.event_id.clone());
814 count += 1;
815 }
816
817 if !found_marker {
818 drop(tx);
820 tracing::warn!(
821 project_hash = project_hash,
822 marker = marker.as_str(),
823 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
824 );
825 return rebuild_state(conn, jsonl_path, project_hash);
826 }
827
828 if let Some(eid) = last_event_id.as_deref() {
829 record_last_indexed(&tx, project_hash, eid)?;
830 }
831 tx.commit()?;
832 Ok(count)
833}
834
835pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
836 let type_str = serde_json::to_value(event.event_type)?
837 .as_str()
838 .unwrap()
839 .to_string();
840 let status_str = serde_json::to_value(event.status)?
841 .as_str()
842 .unwrap()
843 .to_string();
844 let artifacts = crate::artifacts::extract(&event.text);
849 let artifacts_json = if artifacts.is_empty() {
850 None
851 } else {
852 Some(serde_json::to_string(&artifacts)?)
853 };
854 conn.execute(
855 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
856 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
857 rusqlite::params![
858 event.event_id, event.task_id, type_str,
859 event.timestamp, event.confidence, status_str, artifacts_json
860 ],
861 )?;
862 conn.execute(
864 "DELETE FROM search_fts WHERE event_id=?1",
865 rusqlite::params![event.event_id],
866 )?;
867 conn.execute(
868 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
869 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
870 )?;
871
872 if event.event_type == EventType::Decision {
873 let alternatives_json = match event.meta.get("alternatives") {
877 Some(v) if !v.is_null() => Some(serde_json::to_string(v)?),
878 _ => None,
879 };
880 conn.execute(
881 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status, alternatives)
882 VALUES (?1, ?2, ?3, 'active', ?4)",
883 rusqlite::params![event.event_id, event.task_id, event.text, alternatives_json],
884 )?;
885 }
886
887 if event.event_type == EventType::Supersede {
888 if let Some(target) = &event.supersedes {
889 conn.execute(
890 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
891 rusqlite::params![event.event_id, target],
892 )?;
893 }
894 }
895
896 if event.event_type == EventType::Evidence {
897 let strength_str = event
898 .evidence_strength
899 .map(|s| {
900 serde_json::to_value(s)
901 .unwrap()
902 .as_str()
903 .unwrap()
904 .to_string()
905 })
906 .unwrap_or_else(|| "medium".into());
907 conn.execute(
908 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
909 VALUES (?1, ?2, ?3, ?4)",
910 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
911 )?;
912 }
913
914 invalidate_pack_cascade(conn, &event.task_id)?;
917
918 Ok(())
919}
920
921pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
922 if let Some(parent) = path.as_ref().parent() {
923 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
924 }
925 let conn =
926 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
927 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
928 apply_migrations(&conn).context("apply schema migrations")?;
929 Ok(conn)
930}
931
932#[derive(Debug, Clone)]
936pub struct TaskRow {
937 pub task_id: String,
938 pub title: String,
939 pub status: String,
940 pub last_event_at: String,
941 pub event_count: usize,
942}
943
944pub fn list_tasks_by_project(
948 conn: &Connection,
949 project_hash: &str,
950) -> anyhow::Result<Vec<TaskRow>> {
951 let mut stmt = conn.prepare(
952 "SELECT t.task_id, t.title, t.status, t.last_event_at,
953 COALESCE(c.cnt, 0) AS event_count
954 FROM tasks t
955 LEFT JOIN (
956 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
957 ) c ON c.task_id = t.task_id
958 WHERE t.project_hash = ?1
959 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
960 )?;
961 let rows = stmt
962 .query_map(rusqlite::params![project_hash], |r| {
963 Ok(TaskRow {
964 task_id: r.get::<_, String>(0)?,
965 title: r.get::<_, String>(1)?,
966 status: r.get::<_, String>(2)?,
967 last_event_at: r.get::<_, String>(3)?,
968 event_count: r.get::<_, i64>(4)? as usize,
969 })
970 })?
971 .collect::<Result<Vec<_>, _>>()?;
972 Ok(rows)
973}
974
975pub fn top_level_tasks(conn: &Connection, project_hash: &str) -> anyhow::Result<Vec<TaskRow>> {
979 let mut stmt = conn.prepare(
980 "SELECT t.task_id, t.title, t.status, t.last_event_at,
981 COALESCE(c.cnt, 0) AS event_count
982 FROM tasks t
983 LEFT JOIN (
984 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
985 ) c ON c.task_id = t.task_id
986 WHERE t.project_hash = ?1 AND t.parent_id IS NULL
987 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
988 )?;
989 let rows = stmt
990 .query_map(rusqlite::params![project_hash], |r| {
991 Ok(TaskRow {
992 task_id: r.get::<_, String>(0)?,
993 title: r.get::<_, String>(1)?,
994 status: r.get::<_, String>(2)?,
995 last_event_at: r.get::<_, String>(3)?,
996 event_count: r.get::<_, i64>(4)? as usize,
997 })
998 })?
999 .collect::<Result<Vec<_>, _>>()?;
1000 Ok(rows)
1001}
1002
1003pub fn children_of(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<TaskRow>> {
1005 let mut stmt = conn.prepare(
1006 "SELECT t.task_id, t.title, t.status, t.last_event_at,
1007 COALESCE(c.cnt, 0) AS event_count
1008 FROM tasks t
1009 LEFT JOIN (
1010 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
1011 ) c ON c.task_id = t.task_id
1012 WHERE t.parent_id = ?1
1013 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
1014 )?;
1015 let rows = stmt
1016 .query_map(rusqlite::params![task_id], |r| {
1017 Ok(TaskRow {
1018 task_id: r.get::<_, String>(0)?,
1019 title: r.get::<_, String>(1)?,
1020 status: r.get::<_, String>(2)?,
1021 last_event_at: r.get::<_, String>(3)?,
1022 event_count: r.get::<_, i64>(4)? as usize,
1023 })
1024 })?
1025 .collect::<Result<Vec<_>, _>>()?;
1026 Ok(rows)
1027}
1028
1029pub fn parent_of(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
1031 let mut stmt = conn.prepare("SELECT parent_id FROM tasks WHERE task_id = ?1")?;
1032 let mut rows = stmt.query(rusqlite::params![task_id])?;
1033 Ok(match rows.next()? {
1034 Some(r) => r.get::<_, Option<String>>(0)?,
1035 None => None,
1036 })
1037}
1038
1039pub fn would_create_cycle(
1044 conn: &Connection,
1045 task_id: &str,
1046 new_parent: &str,
1047) -> anyhow::Result<bool> {
1048 if task_id == new_parent {
1049 return Ok(true);
1050 }
1051 let mut cursor = Some(new_parent.to_string());
1052 for _ in 0..64 {
1053 let Some(cur) = cursor else {
1054 return Ok(false);
1055 };
1056 if cur == task_id {
1057 return Ok(true);
1058 }
1059 cursor = parent_of(conn, &cur)?;
1060 }
1061 Ok(true)
1063}
1064
1065pub fn count_open_children(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
1067 let n: i64 = conn.query_row(
1068 "SELECT COUNT(*) FROM tasks WHERE parent_id = ?1 AND status = 'open'",
1069 rusqlite::params![task_id],
1070 |r| r.get(0),
1071 )?;
1072 Ok(n as usize)
1073}
1074
1075pub fn invalidate_pack_cascade(conn: &Connection, task_id: &str) -> anyhow::Result<()> {
1077 conn.execute(
1078 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1079 rusqlite::params![task_id],
1080 )?;
1081 if let Some(parent) = parent_of(conn, task_id)? {
1082 conn.execute(
1083 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1084 rusqlite::params![parent],
1085 )?;
1086 }
1087 Ok(())
1088}
1089
1090pub struct PendingEmbed {
1096 pub event_id: String,
1097 pub task_id: String,
1098 pub text: String,
1099}
1100
1101pub fn events_needing_embedding(
1105 conn: &Connection,
1106 model: &str,
1107 limit: usize,
1108) -> anyhow::Result<Vec<PendingEmbed>> {
1109 let mut stmt = conn.prepare(
1110 "SELECT f.event_id, f.task_id, f.text
1111 FROM search_fts f
1112 LEFT JOIN embeddings e ON e.event_id = f.event_id AND e.model = ?1
1113 WHERE e.event_id IS NULL
1114 LIMIT ?2",
1115 )?;
1116 let rows = stmt.query_map(rusqlite::params![model, limit as i64], |r| {
1117 Ok(PendingEmbed {
1118 event_id: r.get(0)?,
1119 task_id: r.get(1)?,
1120 text: r.get(2)?,
1121 })
1122 })?;
1123 let mut out = Vec::new();
1124 for r in rows {
1125 out.push(r?);
1126 }
1127 Ok(out)
1128}
1129
1130#[allow(clippy::too_many_arguments)]
1133pub fn upsert_embedding(
1134 conn: &Connection,
1135 event_id: &str,
1136 task_id: &str,
1137 project_hash: &str,
1138 tier: &str,
1139 model: &str,
1140 dim: usize,
1141 vec: &[f32],
1142 created_at: &str,
1143) -> anyhow::Result<()> {
1144 conn.execute(
1145 "INSERT OR REPLACE INTO embeddings(event_id, task_id, project_hash, tier, model, dim, vec, created_at)
1146 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1147 rusqlite::params![
1148 event_id,
1149 task_id,
1150 project_hash,
1151 tier,
1152 model,
1153 dim as i64,
1154 crate::embed::to_blob(vec),
1155 created_at
1156 ],
1157 )?;
1158 Ok(())
1159}
1160
1161pub fn high_signal_events(
1164 conn: &Connection,
1165 limit: usize,
1166) -> anyhow::Result<Vec<(String, String)>> {
1167 let mut stmt = conn.prepare(
1168 "SELECT f.event_id, f.text
1169 FROM search_fts f
1170 JOIN events_index ei ON ei.event_id = f.event_id
1171 WHERE f.type IN ('decision', 'constraint', 'rejection')
1172 ORDER BY ei.timestamp DESC
1173 LIMIT ?1",
1174 )?;
1175 let rows = stmt.query_map(rusqlite::params![limit as i64], |r| {
1176 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1177 })?;
1178 let mut out = Vec::new();
1179 for r in rows {
1180 out.push(r?);
1181 }
1182 Ok(out)
1183}
1184
1185pub fn find_task_by_title(conn: &Connection, title: &str) -> anyhow::Result<Option<String>> {
1188 let mut stmt = conn.prepare("SELECT task_id FROM tasks WHERE title = ?1 LIMIT 1")?;
1189 let mut rows = stmt.query(rusqlite::params![title])?;
1190 match rows.next()? {
1191 Some(row) => Ok(Some(row.get(0)?)),
1192 None => Ok(None),
1193 }
1194}
1195
1196pub fn task_event_texts(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<String>> {
1198 let mut stmt = conn.prepare("SELECT text FROM search_fts WHERE task_id = ?1")?;
1199 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
1200 let mut out = Vec::new();
1201 for r in rows {
1202 out.push(r?);
1203 }
1204 Ok(out)
1205}
1206
1207pub fn count_embeddings(conn: &Connection, project_hash: &str) -> anyhow::Result<usize> {
1209 let n: i64 = conn.query_row(
1210 "SELECT COUNT(*) FROM embeddings WHERE project_hash = ?1",
1211 rusqlite::params![project_hash],
1212 |r| r.get(0),
1213 )?;
1214 Ok(n as usize)
1215}
1216
1217pub fn embed_pending(
1225 conn: &Connection,
1226 project_hash: &str,
1227 embedder: &dyn crate::embed::Embedder,
1228 created_at: &str,
1229 limit: usize,
1230) -> anyhow::Result<usize> {
1231 let pending = events_needing_embedding(conn, embedder.model_id(), limit)?;
1232 if pending.is_empty() {
1233 return Ok(0);
1234 }
1235 let texts: Vec<&str> = pending.iter().map(|p| p.text.as_str()).collect();
1236 let vecs = embedder.embed(&texts)?;
1237 let mut done = 0usize;
1238 for (p, v) in pending.iter().zip(vecs.iter()) {
1239 upsert_embedding(
1240 conn,
1241 &p.event_id,
1242 &p.task_id,
1243 project_hash,
1244 "episodic",
1245 embedder.model_id(),
1246 embedder.dim(),
1247 v,
1248 created_at,
1249 )?;
1250 done += 1;
1251 }
1252 Ok(done)
1253}
1254
1255pub struct ScoredHit {
1257 pub event_id: String,
1258 pub task_id: String,
1259 pub task_title: String,
1260 pub event_type: String,
1261 pub tier: String,
1262 pub text: String,
1263 pub score: f32,
1264}
1265
1266pub fn semantic_search(
1272 conn: &Connection,
1273 project_hash: &str,
1274 query_vec: &[f32],
1275 model: &str,
1276 k: usize,
1277) -> anyhow::Result<Vec<ScoredHit>> {
1278 let mut stmt = conn.prepare(
1279 "SELECT e.event_id, e.task_id, e.tier, e.vec, f.text, f.type,
1280 COALESCE(t.title, '')
1281 FROM embeddings e
1282 JOIN search_fts f ON f.event_id = e.event_id
1283 LEFT JOIN tasks t ON t.task_id = e.task_id
1284 WHERE e.project_hash = ?1 AND e.model = ?2",
1285 )?;
1286 let rows = stmt.query_map(rusqlite::params![project_hash, model], |r| {
1287 let blob: Vec<u8> = r.get(3)?;
1288 Ok((
1289 r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?, blob,
1293 r.get::<_, String>(4)?, r.get::<_, String>(5)?, r.get::<_, String>(6)?, ))
1297 })?;
1298
1299 let mut hits: Vec<ScoredHit> = Vec::new();
1300 for row in rows {
1301 let (event_id, task_id, tier, blob, text, event_type, task_title) = row?;
1302 let score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
1303 hits.push(ScoredHit {
1304 event_id,
1305 task_id,
1306 task_title,
1307 event_type,
1308 tier,
1309 text,
1310 score,
1311 });
1312 }
1313 hits.sort_by(|a, b| {
1314 b.score
1315 .partial_cmp(&a.score)
1316 .unwrap_or(std::cmp::Ordering::Equal)
1317 });
1318 hits.truncate(k);
1319 Ok(hits)
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324 use super::*;
1325 use crate::embed::Embedder;
1326 use tempfile::TempDir;
1327
1328 #[test]
1329 fn task_exists_returns_true_for_known_id_false_otherwise() {
1330 let d = TempDir::new().unwrap();
1331 let conn = open(d.path().join("s.sqlite")).unwrap();
1332
1333 assert!(!task_exists(&conn, "tj-nope").unwrap());
1334
1335 let e = make_open_event("tj-yes", "Hello");
1336 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
1337 index_event(&conn, &e).unwrap();
1338
1339 assert!(task_exists(&conn, "tj-yes").unwrap());
1340 assert!(!task_exists(&conn, "tj-nope").unwrap());
1341 }
1342
1343 #[test]
1344 fn rename_event_updates_task_title() {
1345 let d = TempDir::new().unwrap();
1346 let conn = open(d.path().join("s.sqlite")).unwrap();
1347 let ph = "feedfacefeedface";
1348
1349 let open_ev = make_open_event("tj-rn", "#: 5");
1350 upsert_task_from_event(&conn, &open_ev, ph).unwrap();
1351 let title: String = conn
1352 .query_row("SELECT title FROM tasks WHERE task_id='tj-rn'", [], |r| {
1353 r.get(0)
1354 })
1355 .unwrap();
1356 assert_eq!(title, "#: 5");
1357
1358 let mut rename = crate::event::Event::new(
1359 "tj-rn",
1360 crate::event::EventType::Rename,
1361 crate::event::Author::Agent,
1362 crate::event::Source::Cli,
1363 "Support BID 29683996 — voucher refund 50% vs promised 100%".into(),
1364 );
1365 rename.timestamp = "2099-01-01T00:00:00.000Z".into();
1366 upsert_task_from_event(&conn, &rename, ph).unwrap();
1367
1368 let title: String = conn
1369 .query_row("SELECT title FROM tasks WHERE task_id='tj-rn'", [], |r| {
1370 r.get(0)
1371 })
1372 .unwrap();
1373 assert_eq!(
1374 title,
1375 "Support BID 29683996 — voucher refund 50% vs promised 100%"
1376 );
1377 }
1378
1379 #[test]
1380 fn close_event_restores_outcome_from_meta() {
1381 let d = TempDir::new().unwrap();
1382 let conn = open(d.path().join("s.sqlite")).unwrap();
1383 let ph = "feedfacefeedface";
1384
1385 let open_ev = make_open_event("tj-cl", "T");
1386 upsert_task_from_event(&conn, &open_ev, ph).unwrap();
1387
1388 let mut close = crate::event::Event::new(
1389 "tj-cl",
1390 crate::event::EventType::Close,
1391 crate::event::Author::Agent,
1392 crate::event::Source::Cli,
1393 "done".into(),
1394 );
1395 close.meta = serde_json::json!({"outcome": "Shipped the fix.", "outcome_tag": "done"});
1396 upsert_task_from_event(&conn, &close, ph).unwrap();
1397
1398 let (status, outcome, tag): (String, Option<String>, Option<String>) = conn
1399 .query_row(
1400 "SELECT status, outcome, outcome_tag FROM tasks WHERE task_id='tj-cl'",
1401 [],
1402 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1403 )
1404 .unwrap();
1405 assert_eq!(status, "closed");
1406 assert_eq!(outcome.as_deref(), Some("Shipped the fix."));
1407 assert_eq!(tag.as_deref(), Some("done"));
1408 }
1409
1410 #[test]
1411 fn fresh_db_runs_all_migrations() {
1412 let d = TempDir::new().unwrap();
1413 let p = d.path().join("state.sqlite");
1414 let conn = open(&p).unwrap();
1415
1416 let applied: Vec<i64> = conn
1417 .prepare("SELECT version FROM schema_migrations ORDER BY version")
1418 .unwrap()
1419 .query_map([], |r| r.get::<_, i64>(0))
1420 .unwrap()
1421 .collect::<Result<_, _>>()
1422 .unwrap();
1423 assert_eq!(
1424 applied,
1425 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
1426 "every declared migration must be recorded"
1427 );
1428 }
1429
1430 #[test]
1431 fn apply_migrations_is_idempotent_across_reopens() {
1432 let d = TempDir::new().unwrap();
1433 let p = d.path().join("state.sqlite");
1434 let _ = open(&p).unwrap();
1435 let _ = open(&p).unwrap();
1436
1437 let count: i64 = open(&p)
1438 .unwrap()
1439 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
1440 .unwrap();
1441 assert_eq!(
1442 count,
1443 MIGRATIONS.len() as i64,
1444 "schema_migrations must contain exactly one row per declared migration after repeated opens"
1445 );
1446 }
1447
1448 fn make_text_event(text: &str) -> crate::event::Event {
1449 crate::event::Event::new(
1450 "tj-x",
1451 crate::event::EventType::Finding,
1452 crate::event::Author::User,
1453 crate::event::Source::Cli,
1454 text.into(),
1455 )
1456 }
1457
1458 #[test]
1459 fn embed_pending_embeds_all_then_is_idempotent() {
1460 let d = TempDir::new().unwrap();
1461 let conn = open(d.path().join("s.sqlite")).unwrap();
1462 let ph = "feedfacefeedface";
1463
1464 for text in [
1465 "implement payment refund deduplication",
1466 "add validation for negative order amounts",
1467 ] {
1468 index_event(&conn, &make_text_event(text)).unwrap();
1469 }
1470
1471 let emb = crate::embed::HashEmbedder::new(64);
1472 let at = "2026-06-12T00:00:00Z";
1473
1474 let n = embed_pending(&conn, ph, &emb, at, 100).unwrap();
1475 assert_eq!(n, 2, "both events embedded on first pass");
1476 assert_eq!(count_embeddings(&conn, ph).unwrap(), 2);
1477
1478 assert_eq!(embed_pending(&conn, ph, &emb, at, 100).unwrap(), 0);
1480
1481 assert_eq!(
1484 events_needing_embedding(&conn, "other-model", 100)
1485 .unwrap()
1486 .len(),
1487 2
1488 );
1489 }
1490
1491 #[test]
1492 fn semantic_search_ranks_relevant_event_first() {
1493 let d = TempDir::new().unwrap();
1494 let conn = open(d.path().join("s.sqlite")).unwrap();
1495 let ph = "feedfacefeedface";
1496
1497 for text in [
1498 "fix duplicate payment refund write on partial refund",
1499 "update the frontend button hover color",
1500 "add a database index for faster user lookup",
1501 ] {
1502 index_event(&conn, &make_text_event(text)).unwrap();
1503 }
1504 let emb = crate::embed::HashEmbedder::new(256);
1505 embed_pending(&conn, ph, &emb, "t", 100).unwrap();
1506
1507 let q = emb.embed_one("payment refund duplicated").unwrap();
1508 let hits = semantic_search(&conn, ph, &q, emb.model_id(), 3).unwrap();
1509
1510 assert_eq!(hits.len(), 3);
1511 assert!(
1512 hits[0].text.contains("refund"),
1513 "the refund event must rank first, got: {}",
1514 hits[0].text
1515 );
1516 assert!(
1517 hits[0].score >= hits[1].score,
1518 "hits must be sorted by score desc"
1519 );
1520 }
1521
1522 #[test]
1523 fn open_creates_all_tables() {
1524 let d = TempDir::new().unwrap();
1525 let p = d.path().join("state.sqlite");
1526 let conn = open(&p).unwrap();
1527
1528 let names: Vec<String> = conn
1529 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
1530 .unwrap()
1531 .query_map([], |r| r.get::<_, String>(0))
1532 .unwrap()
1533 .collect::<Result<_, _>>()
1534 .unwrap();
1535
1536 for required in [
1537 "decisions",
1538 "events_index",
1539 "evidence",
1540 "task_pack_cache",
1541 "tasks",
1542 "search_fts",
1543 ] {
1544 assert!(
1545 names.iter().any(|n| n == required),
1546 "missing table {required}, have {names:?}"
1547 );
1548 }
1549 }
1550
1551 #[test]
1552 fn open_is_idempotent() {
1553 let d = TempDir::new().unwrap();
1554 let p = d.path().join("state.sqlite");
1555 let _ = open(&p).unwrap();
1556 let _ = open(&p).unwrap();
1557 }
1558
1559 #[test]
1560 fn index_event_projects_evidence() {
1561 let d = TempDir::new().unwrap();
1562 let conn = open(d.path().join("s.sqlite")).unwrap();
1563 let mut open_e = crate::event::Event::new(
1564 "tj-e",
1565 crate::event::EventType::Open,
1566 crate::event::Author::User,
1567 crate::event::Source::Cli,
1568 "x".into(),
1569 );
1570 open_e.meta = serde_json::json!({"title": "T"});
1571 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1572 index_event(&conn, &open_e).unwrap();
1573
1574 let mut ev = crate::event::Event::new(
1575 "tj-e",
1576 crate::event::EventType::Evidence,
1577 crate::event::Author::Agent,
1578 crate::event::Source::Chat,
1579 "Hook startup measured at 12ms".into(),
1580 );
1581 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
1582 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
1583 index_event(&conn, &ev).unwrap();
1584
1585 let (text, strength): (String, String) = conn
1586 .query_row(
1587 "SELECT text, strength FROM evidence WHERE task_id=?1",
1588 rusqlite::params!["tj-e"],
1589 |r| Ok((r.get(0)?, r.get(1)?)),
1590 )
1591 .unwrap();
1592 assert!(text.contains("12ms"));
1593 assert_eq!(strength, "strong");
1594 }
1595
1596 #[test]
1597 fn supersede_event_marks_decision_superseded() {
1598 let d = TempDir::new().unwrap();
1599 let conn = open(d.path().join("s.sqlite")).unwrap();
1600 let mut open_e = crate::event::Event::new(
1601 "tj-s",
1602 crate::event::EventType::Open,
1603 crate::event::Author::User,
1604 crate::event::Source::Cli,
1605 "x".into(),
1606 );
1607 open_e.meta = serde_json::json!({"title": "T"});
1608 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1609 index_event(&conn, &open_e).unwrap();
1610
1611 let dec = crate::event::Event::new(
1612 "tj-s",
1613 crate::event::EventType::Decision,
1614 crate::event::Author::Agent,
1615 crate::event::Source::Chat,
1616 "Use TS".into(),
1617 );
1618 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1619 index_event(&conn, &dec).unwrap();
1620
1621 let mut sup = crate::event::Event::new(
1622 "tj-s",
1623 crate::event::EventType::Supersede,
1624 crate::event::Author::Agent,
1625 crate::event::Source::Chat,
1626 "Replaced by Rust decision".into(),
1627 );
1628 sup.supersedes = Some(dec.event_id.clone());
1629 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1630 index_event(&conn, &sup).unwrap();
1631
1632 let (status, by): (String, Option<String>) = conn
1633 .query_row(
1634 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1635 rusqlite::params![dec.event_id],
1636 |r| Ok((r.get(0)?, r.get(1)?)),
1637 )
1638 .unwrap();
1639 assert_eq!(status, "superseded");
1640 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1641 }
1642
1643 #[test]
1644 fn index_event_projects_decision_to_decisions_table() {
1645 let d = TempDir::new().unwrap();
1646 let conn = open(d.path().join("s.sqlite")).unwrap();
1647
1648 let mut open_e = crate::event::Event::new(
1649 "tj-d",
1650 crate::event::EventType::Open,
1651 crate::event::Author::User,
1652 crate::event::Source::Cli,
1653 "x".into(),
1654 );
1655 open_e.meta = serde_json::json!({"title": "T"});
1656 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1657 index_event(&conn, &open_e).unwrap();
1658
1659 let dec = crate::event::Event::new(
1660 "tj-d",
1661 crate::event::EventType::Decision,
1662 crate::event::Author::Agent,
1663 crate::event::Source::Chat,
1664 "Adopt Rust".into(),
1665 );
1666 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1667 index_event(&conn, &dec).unwrap();
1668
1669 let (id, text, status): (String, String, String) = conn
1670 .query_row(
1671 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1672 rusqlite::params!["tj-d"],
1673 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1674 )
1675 .unwrap();
1676 assert_eq!(id, dec.event_id);
1677 assert_eq!(text, "Adopt Rust");
1678 assert_eq!(status, "active");
1679 }
1680
1681 #[test]
1682 fn index_event_projects_decision_alternatives_into_column() {
1683 let d = TempDir::new().unwrap();
1684 let conn = open(d.path().join("s.sqlite")).unwrap();
1685
1686 let mut dec = crate::event::Event::new(
1687 "tj-alt",
1688 crate::event::EventType::Decision,
1689 crate::event::Author::Agent,
1690 crate::event::Source::Chat,
1691 "Use SQLite".into(),
1692 );
1693 dec.meta = serde_json::json!({
1694 "alternatives": [
1695 {"option": "SQLite", "chosen": true, "rationale": "embedded, zero-ops"},
1696 {"option": "Postgres", "chosen": false, "rationale": "too heavy for local tool"}
1697 ]
1698 });
1699 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1700 index_event(&conn, &dec).unwrap();
1701
1702 let alts: Option<String> = conn
1703 .query_row(
1704 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1705 rusqlite::params![dec.event_id],
1706 |r| r.get(0),
1707 )
1708 .unwrap();
1709 let alts = alts.expect("alternatives column should be populated");
1710 let parsed: serde_json::Value = serde_json::from_str(&alts).unwrap();
1711 assert_eq!(parsed.as_array().unwrap().len(), 2);
1712 assert_eq!(parsed[0]["option"], "SQLite");
1713 assert_eq!(parsed[0]["chosen"], true);
1714 }
1715
1716 #[test]
1717 fn index_event_decision_without_alternatives_leaves_column_null() {
1718 let d = TempDir::new().unwrap();
1719 let conn = open(d.path().join("s.sqlite")).unwrap();
1720
1721 let dec = crate::event::Event::new(
1722 "tj-noalt",
1723 crate::event::EventType::Decision,
1724 crate::event::Author::Agent,
1725 crate::event::Source::Chat,
1726 "Plain decision".into(),
1727 );
1728 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1729 index_event(&conn, &dec).unwrap();
1730
1731 let alts: Option<String> = conn
1732 .query_row(
1733 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1734 rusqlite::params![dec.event_id],
1735 |r| r.get(0),
1736 )
1737 .unwrap();
1738 assert!(alts.is_none());
1739 }
1740
1741 #[test]
1742 fn index_event_is_idempotent_no_search_fts_duplicates() {
1743 let d = TempDir::new().unwrap();
1744 let conn = open(d.path().join("s.sqlite")).unwrap();
1745 let mut open_e = crate::event::Event::new(
1746 "tj-id",
1747 crate::event::EventType::Open,
1748 crate::event::Author::User,
1749 crate::event::Source::Cli,
1750 "x".into(),
1751 );
1752 open_e.meta = serde_json::json!({"title": "Idempotent"});
1753 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1754
1755 index_event(&conn, &open_e).unwrap();
1757 index_event(&conn, &open_e).unwrap();
1758 index_event(&conn, &open_e).unwrap();
1759
1760 let n: i64 = conn
1761 .query_row(
1762 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1763 rusqlite::params![open_e.event_id],
1764 |r| r.get(0),
1765 )
1766 .unwrap();
1767 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1768 }
1769
1770 #[test]
1771 fn list_all_projects_returns_hashes_from_state_dir() {
1772 use std::fs::File;
1773 let d = TempDir::new().unwrap();
1774 let state_dir = d.path().join("state");
1775 std::fs::create_dir_all(&state_dir).unwrap();
1776 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1777 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1778 File::create(state_dir.join("not-a-project.txt")).unwrap();
1779
1780 let mut hashes = list_all_projects(&state_dir).unwrap();
1781 hashes.sort();
1782 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1783 }
1784
1785 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1786 use std::io::Write;
1787 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1788 }
1789
1790 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1791 let mut e = crate::event::Event::new(
1792 task_id,
1793 crate::event::EventType::Open,
1794 crate::event::Author::User,
1795 crate::event::Source::Cli,
1796 "x".into(),
1797 );
1798 e.meta = serde_json::json!({"title": title});
1799 e
1800 }
1801
1802 #[test]
1803 fn ingest_new_events_picks_up_only_new_lines() {
1804 let d = TempDir::new().unwrap();
1805 let jsonl = d.path().join("events.jsonl");
1806 let db = d.path().join("s.sqlite");
1807 let project = "deadbeefdeadbeef";
1808
1809 let e1 = make_open_event("tj-i1", "first");
1810 let e2 = make_open_event("tj-i2", "second");
1811 let e3 = make_open_event("tj-i3", "third");
1812
1813 let mut f = std::fs::File::create(&jsonl).unwrap();
1814 write_event_line(&mut f, &e1);
1815 write_event_line(&mut f, &e2);
1816 write_event_line(&mut f, &e3);
1817 drop(f);
1818
1819 let conn = open(&db).unwrap();
1821 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1822 assert_eq!(n_first, 3);
1823
1824 let e4 = make_open_event("tj-i4", "fourth");
1826 let e5 = make_open_event("tj-i5", "fifth");
1827 let mut f = std::fs::OpenOptions::new()
1828 .append(true)
1829 .open(&jsonl)
1830 .unwrap();
1831 write_event_line(&mut f, &e4);
1832 write_event_line(&mut f, &e5);
1833 drop(f);
1834
1835 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1837 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1838
1839 let total: i64 = conn
1840 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1841 .unwrap();
1842 assert_eq!(total, 5);
1843
1844 let marker: String = conn
1845 .query_row(
1846 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1847 rusqlite::params![project],
1848 |r| r.get(0),
1849 )
1850 .unwrap();
1851 assert_eq!(marker, e5.event_id);
1852 }
1853
1854 #[test]
1855 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1856 let d = TempDir::new().unwrap();
1857 let jsonl = d.path().join("events.jsonl");
1858 let db = d.path().join("s.sqlite");
1859 let project = "feedfacefeedface";
1860
1861 let e1 = make_open_event("tj-r1", "first");
1862 let mut f = std::fs::File::create(&jsonl).unwrap();
1863 write_event_line(&mut f, &e1);
1864 drop(f);
1865
1866 let conn = open(&db).unwrap();
1867 ingest_new_events(&conn, &jsonl, project).unwrap();
1868
1869 let e2 = make_open_event("tj-r2", "after-corruption");
1872 let e3 = make_open_event("tj-r3", "after-corruption-2");
1873 let mut f = std::fs::File::create(&jsonl).unwrap();
1874 write_event_line(&mut f, &e2);
1875 write_event_line(&mut f, &e3);
1876 drop(f);
1877
1878 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1879 assert_eq!(n, 2, "missing marker must trigger full rebuild");
1880 }
1881
1882 #[test]
1883 fn rebuild_state_and_ingest_new_events_produce_same_state() {
1884 let d = TempDir::new().unwrap();
1885 let jsonl_a = d.path().join("a.jsonl");
1886 let jsonl_b = d.path().join("b.jsonl");
1887 let db_a = d.path().join("a.sqlite");
1888 let db_b = d.path().join("b.sqlite");
1889
1890 let events: Vec<_> = (0..5)
1891 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1892 .collect();
1893 for path in [&jsonl_a, &jsonl_b] {
1894 let mut f = std::fs::File::create(path).unwrap();
1895 for e in &events {
1896 write_event_line(&mut f, e);
1897 }
1898 }
1899
1900 let conn_a = open(&db_a).unwrap();
1901 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1902
1903 let conn_b = open(&db_b).unwrap();
1904 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1905
1906 assert_eq!(n_a, n_b);
1907 assert_eq!(n_a, 5);
1908
1909 for table in ["tasks", "events_index"] {
1910 let q = format!("SELECT COUNT(*) FROM {table}");
1911 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1912 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1913 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1914 }
1915 }
1916
1917 #[test]
1918 fn rebuild_state_skips_malformed_jsonl_lines() {
1919 use std::io::Write;
1920 let d = TempDir::new().unwrap();
1921 let events_path = d.path().join("events.jsonl");
1922 let db_path = d.path().join("s.sqlite");
1923
1924 let mut f = std::fs::File::create(&events_path).unwrap();
1925
1926 let mut e1 = crate::event::Event::new(
1927 "tj-skip",
1928 crate::event::EventType::Open,
1929 crate::event::Author::User,
1930 crate::event::Source::Cli,
1931 "x".into(),
1932 );
1933 e1.meta = serde_json::json!({"title": "Skip test"});
1934 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1935
1936 writeln!(f, "this is not a json event line").unwrap();
1938
1939 writeln!(f, "{{\"foo\": 1}}").unwrap();
1941
1942 let e3 = crate::event::Event::new(
1943 "tj-skip",
1944 crate::event::EventType::Decision,
1945 crate::event::Author::Agent,
1946 crate::event::Source::Chat,
1947 "Adopt Rust".into(),
1948 );
1949 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1950 drop(f);
1951
1952 let conn = open(&db_path).unwrap();
1953 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1954 .expect("rebuild_state must succeed despite malformed lines");
1955 assert_eq!(
1956 n, 2,
1957 "expected 2 valid events indexed (2 malformed skipped)"
1958 );
1959
1960 let indexed: i64 = conn
1961 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1962 .unwrap();
1963 assert_eq!(indexed, 2);
1964 }
1965
1966 #[test]
1967 fn rebuild_state_reads_jsonl_and_populates_db() {
1968 use std::io::Write;
1969 let d = TempDir::new().unwrap();
1970 let events_path = d.path().join("events.jsonl");
1971 let db_path = d.path().join("s.sqlite");
1972
1973 let mut f = std::fs::File::create(&events_path).unwrap();
1974 let mut e1 = crate::event::Event::new(
1975 "tj-9",
1976 crate::event::EventType::Open,
1977 crate::event::Author::User,
1978 crate::event::Source::Cli,
1979 "x".into(),
1980 );
1981 e1.meta = serde_json::json!({"title": "Nine"});
1982 let e2 = crate::event::Event::new(
1983 "tj-9",
1984 crate::event::EventType::Decision,
1985 crate::event::Author::Agent,
1986 crate::event::Source::Chat,
1987 "Adopt Rust".into(),
1988 );
1989 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1990 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1991 drop(f);
1992
1993 let conn = open(&db_path).unwrap();
1994 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1995 assert_eq!(n, 2);
1996
1997 let n: i64 = conn
1998 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1999 .unwrap();
2000 assert_eq!(n, 1);
2001 let n: i64 = conn
2002 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
2003 .unwrap();
2004 assert_eq!(n, 2);
2005 }
2006
2007 #[test]
2008 fn index_event_writes_index_and_fts() {
2009 let d = TempDir::new().unwrap();
2010 let conn = open(d.path().join("s.sqlite")).unwrap();
2011 let mut open_e = crate::event::Event::new(
2012 "tj-1",
2013 crate::event::EventType::Open,
2014 crate::event::Author::User,
2015 crate::event::Source::Cli,
2016 "Title".into(),
2017 );
2018 open_e.meta = serde_json::json!({"title": "Title"});
2019 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
2020 index_event(&conn, &open_e).unwrap();
2021
2022 let mut decision = crate::event::Event::new(
2023 "tj-1",
2024 crate::event::EventType::Decision,
2025 crate::event::Author::Agent,
2026 crate::event::Source::Chat,
2027 "Adopt Rust".into(),
2028 );
2029 decision.confidence = Some(0.92);
2030 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
2031 index_event(&conn, &decision).unwrap();
2032
2033 let count: i64 = conn
2034 .query_row(
2035 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
2036 rusqlite::params!["tj-1"],
2037 |r| r.get(0),
2038 )
2039 .unwrap();
2040 assert_eq!(count, 2);
2041
2042 let mut stmt = conn
2043 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
2044 .unwrap();
2045 let hits: Vec<String> = stmt
2046 .query_map(rusqlite::params!["Rust"], |r| {
2047 let s: String = r.get(0)?;
2048 Ok(s)
2049 })
2050 .unwrap()
2051 .collect::<Result<Vec<_>, _>>()
2052 .unwrap();
2053 assert_eq!(hits.len(), 1);
2054 assert_eq!(hits[0], decision.event_id);
2055 }
2056
2057 #[test]
2058 fn upsert_task_from_open_event_inserts_row() {
2059 let d = TempDir::new().unwrap();
2060 let conn = open(d.path().join("s.sqlite")).unwrap();
2061
2062 let mut e = crate::event::Event::new(
2063 "tj-7f3a",
2064 crate::event::EventType::Open,
2065 crate::event::Author::User,
2066 crate::event::Source::Cli,
2067 "Add OAuth".into(),
2068 );
2069 e.meta = serde_json::json!({ "title": "Add OAuth login" });
2070
2071 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
2072
2073 let (id, title, status): (String, String, String) = conn
2074 .query_row(
2075 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
2076 ["tj-7f3a"],
2077 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
2078 )
2079 .unwrap();
2080
2081 assert_eq!(id, "tj-7f3a");
2082 assert_eq!(title, "Add OAuth login");
2083 assert_eq!(status, "open");
2084 }
2085
2086 #[test]
2087 fn migration_adds_parent_id_column_nullable() {
2088 let d = tempfile::TempDir::new().unwrap();
2089 let conn = open(d.path().join("s.sqlite")).unwrap();
2090
2091 let e = make_open_event("tj-a", "Top");
2093 upsert_task_from_event(&conn, &e, "ph").unwrap();
2094
2095 let parent: Option<String> = conn
2096 .query_row(
2097 "SELECT parent_id FROM tasks WHERE task_id = ?1",
2098 rusqlite::params!["tj-a"],
2099 |r| r.get(0),
2100 )
2101 .unwrap();
2102 assert_eq!(parent, None);
2103 }
2104
2105 #[test]
2106 fn open_event_meta_parent_id_is_persisted() {
2107 let d = tempfile::TempDir::new().unwrap();
2108 let conn = open(d.path().join("s.sqlite")).unwrap();
2109
2110 upsert_task_from_event(&conn, &make_open_event("tj-parent", "Parent"), "ph").unwrap();
2112
2113 let mut child = make_open_event("tj-child", "Child");
2115 child.meta = serde_json::json!({"title": "Child", "parent_id": "tj-parent"});
2116 upsert_task_from_event(&conn, &child, "ph").unwrap();
2117
2118 let parent: Option<String> = conn
2119 .query_row(
2120 "SELECT parent_id FROM tasks WHERE task_id = ?1",
2121 rusqlite::params!["tj-child"],
2122 |r| r.get(0),
2123 )
2124 .unwrap();
2125 assert_eq!(parent.as_deref(), Some("tj-parent"));
2126 }
2127
2128 #[test]
2129 fn children_of_and_parent_of_work() {
2130 let d = tempfile::TempDir::new().unwrap();
2131 let conn = open(d.path().join("s.sqlite")).unwrap();
2132 upsert_task_from_event(&conn, &make_open_event("p", "Parent"), "ph").unwrap();
2133
2134 let mut c1 = make_open_event("c1", "Child1");
2135 c1.meta = serde_json::json!({"title": "Child1", "parent_id": "p"});
2136 upsert_task_from_event(&conn, &c1, "ph").unwrap();
2137 let mut c2 = make_open_event("c2", "Child2");
2138 c2.meta = serde_json::json!({"title": "Child2", "parent_id": "p"});
2139 upsert_task_from_event(&conn, &c2, "ph").unwrap();
2140
2141 let kids = children_of(&conn, "p").unwrap();
2142 let ids: Vec<&str> = kids.iter().map(|t| t.task_id.as_str()).collect();
2143 assert!(ids.contains(&"c1") && ids.contains(&"c2"));
2144 assert_eq!(kids.len(), 2);
2145
2146 assert_eq!(parent_of(&conn, "c1").unwrap().as_deref(), Some("p"));
2147 assert_eq!(parent_of(&conn, "p").unwrap(), None);
2148 }
2149
2150 #[test]
2151 fn cycle_guard_rejects_self_and_ancestor() {
2152 let d = tempfile::TempDir::new().unwrap();
2153 let conn = open(d.path().join("s.sqlite")).unwrap();
2154 upsert_task_from_event(&conn, &make_open_event("a", "A"), "ph").unwrap();
2155 let mut b = make_open_event("b", "B");
2156 b.meta = serde_json::json!({"title": "B", "parent_id": "a"});
2157 upsert_task_from_event(&conn, &b, "ph").unwrap();
2158
2159 assert!(would_create_cycle(&conn, "a", "b").unwrap());
2161 assert!(would_create_cycle(&conn, "a", "a").unwrap());
2163 upsert_task_from_event(&conn, &make_open_event("x", "X"), "ph").unwrap();
2165 assert!(!would_create_cycle(&conn, "x", "a").unwrap());
2166 }
2167
2168 #[test]
2169 fn invalidate_cascade_clears_parent_pack() {
2170 let d = tempfile::TempDir::new().unwrap();
2171 let conn = open(d.path().join("s.sqlite")).unwrap();
2172 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2173 let mut c = make_open_event("c", "C");
2174 c.meta = serde_json::json!({"title": "C", "parent_id": "p"});
2175 upsert_task_from_event(&conn, &c, "ph").unwrap();
2176
2177 for id in ["p", "c"] {
2179 conn.execute(
2180 "INSERT INTO task_pack_cache(task_id, mode, text, generated_at, source_event_count)
2181 VALUES (?1, 'compact', 'x', '2026-01-01T00:00:00Z', 1)",
2182 rusqlite::params![id],
2183 )
2184 .unwrap();
2185 }
2186
2187 invalidate_pack_cascade(&conn, "c").unwrap();
2188
2189 let remaining: i64 = conn
2190 .query_row("SELECT COUNT(*) FROM task_pack_cache", [], |r| r.get(0))
2191 .unwrap();
2192 assert_eq!(remaining, 0, "both child and parent pack caches cleared");
2193 }
2194
2195 #[test]
2196 fn count_open_children_counts_only_open() {
2197 let d = tempfile::TempDir::new().unwrap();
2198 let conn = open(d.path().join("s.sqlite")).unwrap();
2199 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2200 let mut c1 = make_open_event("c1", "C1");
2201 c1.meta = serde_json::json!({"title": "C1", "parent_id": "p"});
2202 upsert_task_from_event(&conn, &c1, "ph").unwrap();
2203 let mut close = crate::event::Event::new(
2205 "c1",
2206 crate::event::EventType::Close,
2207 crate::event::Author::User,
2208 crate::event::Source::Cli,
2209 "done".into(),
2210 );
2211 close.timestamp = "2026-01-02T00:00:00Z".into();
2212 upsert_task_from_event(&conn, &close, "ph").unwrap();
2213 let mut c2 = make_open_event("c2", "C2");
2214 c2.meta = serde_json::json!({"title": "C2", "parent_id": "p"});
2215 upsert_task_from_event(&conn, &c2, "ph").unwrap();
2216
2217 assert_eq!(count_open_children(&conn, "p").unwrap(), 1); }
2219}