1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal TEXT;
87ALTER TABLE tasks ADD COLUMN outcome TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103const MIGRATION_005: &str = r#"
106CREATE TABLE IF NOT EXISTS dream_state (
107 project_hash TEXT PRIMARY KEY,
108 last_dream_at TEXT NOT NULL,
109 updated_at TEXT NOT NULL
110);
111"#;
112
113const MIGRATION_006: &str = r#"
117ALTER TABLE tasks ADD COLUMN parent_id TEXT;
118CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_id);
119"#;
120
121const MIGRATION_007: &str = r#"
127ALTER TABLE decisions ADD COLUMN alternatives TEXT;
128DELETE FROM task_pack_cache;
129"#;
130
131const MIGRATION_008: &str = r#"
139CREATE TABLE IF NOT EXISTS embeddings (
140 event_id TEXT PRIMARY KEY,
141 task_id TEXT NOT NULL,
142 project_hash TEXT NOT NULL,
143 tier TEXT NOT NULL DEFAULT 'episodic',
144 model TEXT NOT NULL,
145 dim INTEGER NOT NULL,
146 vec BLOB NOT NULL,
147 created_at TEXT NOT NULL
148);
149CREATE INDEX IF NOT EXISTS idx_emb_project_tier ON embeddings(project_hash, tier);
150ALTER TABLE events_index ADD COLUMN memory_tier TEXT NOT NULL DEFAULT 'episodic';
151"#;
152
153const MIGRATIONS: &[Migration] = &[
156 Migration {
157 version: 1,
158 sql: MIGRATION_001,
159 },
160 Migration {
161 version: 2,
162 sql: MIGRATION_002,
163 },
164 Migration {
165 version: 3,
166 sql: MIGRATION_003,
167 },
168 Migration {
169 version: 4,
170 sql: MIGRATION_004,
171 },
172 Migration {
173 version: 5,
174 sql: MIGRATION_005,
175 },
176 Migration {
177 version: 6,
178 sql: MIGRATION_006,
179 },
180 Migration {
181 version: 7,
182 sql: MIGRATION_007,
183 },
184 Migration {
185 version: 8,
186 sql: MIGRATION_008,
187 },
188];
189
190fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
191 conn.execute_batch(
192 "CREATE TABLE IF NOT EXISTS schema_migrations (
193 version INTEGER PRIMARY KEY,
194 applied_at TEXT NOT NULL
195 )",
196 )
197 .context("create schema_migrations table")?;
198
199 let applied: HashSet<i64> = {
200 let mut stmt = conn
201 .prepare("SELECT version FROM schema_migrations")
202 .context("select applied versions")?;
203 let rows = stmt
204 .query_map([], |r| r.get::<_, i64>(0))
205 .context("iterate schema_migrations")?;
206 rows.collect::<rusqlite::Result<HashSet<_>>>()
207 .context("collect applied versions")?
208 };
209
210 for migration in MIGRATIONS {
211 if applied.contains(&migration.version) {
212 continue;
213 }
214 conn.execute_batch(migration.sql)
215 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
216 conn.execute(
217 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
218 rusqlite::params![
219 migration.version,
220 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
221 ],
222 )
223 .with_context(|| {
224 format!(
225 "record schema migration v{:03} as applied",
226 migration.version
227 )
228 })?;
229 }
230 Ok(())
231}
232
233use crate::event::{Event, EventType};
234
235pub fn upsert_task_from_event(
236 conn: &Connection,
237 event: &Event,
238 project_hash: &str,
239) -> anyhow::Result<()> {
240 match event.event_type {
241 EventType::Open => {
242 let title = event
243 .meta
244 .get("title")
245 .and_then(|v| v.as_str())
246 .unwrap_or(&event.text)
247 .to_string();
248 let parent_id = event
249 .meta
250 .get("parent_id")
251 .and_then(|v| v.as_str())
252 .map(str::to_string);
253 conn.execute(
256 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at, parent_id)
257 VALUES (?1, ?2, 'open', ?3, ?4, ?4, ?5)
258 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
259 rusqlite::params![event.task_id, title, project_hash, event.timestamp, parent_id],
260 )?;
261 }
262 EventType::Close => {
263 conn.execute(
264 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
265 rusqlite::params![event.task_id, event.timestamp],
266 )?;
267 }
268 EventType::Reopen => {
269 conn.execute(
270 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
271 rusqlite::params![event.task_id, event.timestamp],
272 )?;
273 }
274 _ => {
275 conn.execute(
276 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
277 rusqlite::params![event.task_id, event.timestamp],
278 )?;
279 }
280 }
281 Ok(())
282}
283
284use std::io::BufRead;
285
286pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
287 let dir = state_dir.as_ref();
288 if !dir.exists() {
289 return Ok(vec![]);
290 }
291 let mut out = Vec::new();
292 for entry in std::fs::read_dir(dir)? {
293 let entry = entry?;
294 let path = entry.path();
295 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
296 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
297 out.push(stem.to_string());
298 }
299 }
300 }
301 Ok(out)
302}
303
304pub fn rebuild_state(
305 conn: &Connection,
306 jsonl_path: impl AsRef<Path>,
307 project_hash: &str,
308) -> anyhow::Result<usize> {
309 let f = std::fs::File::open(&jsonl_path)
310 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
311 let reader = std::io::BufReader::new(f);
312
313 let tx = conn.unchecked_transaction()?;
314 let mut count = 0;
315 let mut last_event_id: Option<String> = None;
316 for (i, line) in reader.lines().enumerate() {
317 let line = line.with_context(|| format!("read line {i}"))?;
318 if line.trim().is_empty() {
319 continue;
320 }
321 let event: Event = match serde_json::from_str(&line) {
325 Ok(e) => e,
326 Err(err) => {
327 tracing::warn!(
328 line_number = i + 1,
329 error = %err,
330 "skipping malformed JSONL line in rebuild_state"
331 );
332 continue;
333 }
334 };
335 upsert_task_from_event(&tx, &event, project_hash)?;
336 index_event(&tx, &event)?;
337 last_event_id = Some(event.event_id.clone());
338 count += 1;
339 }
340 if let Some(eid) = last_event_id.as_deref() {
341 record_last_indexed(&tx, project_hash, eid)?;
342 }
343 tx.commit()?;
344 Ok(count)
345}
346
347pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
352 let count: i64 = conn.query_row(
353 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
354 rusqlite::params![task_id],
355 |r| r.get(0),
356 )?;
357 Ok(count > 0)
358}
359
360pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
364 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
365 let mut rows = stmt.query(rusqlite::params![task_id])?;
366 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
367}
368
369pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
373 conn.execute(
374 "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
375 rusqlite::params![goal, task_id],
376 )
377 .with_context(|| format!("set goal for {task_id}"))?;
378 conn.execute(
381 "DELETE FROM task_pack_cache WHERE task_id = ?1",
382 rusqlite::params![task_id],
383 )?;
384 Ok(())
385}
386
387pub fn set_task_outcome(
391 conn: &Connection,
392 task_id: &str,
393 outcome: &str,
394 outcome_tag: Option<&str>,
395) -> anyhow::Result<()> {
396 conn.execute(
397 "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
398 rusqlite::params![outcome, outcome_tag, task_id],
399 )
400 .with_context(|| format!("set outcome for {task_id}"))?;
401 conn.execute(
402 "DELETE FROM task_pack_cache WHERE task_id = ?1",
403 rusqlite::params![task_id],
404 )?;
405 Ok(())
406}
407
408pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
413 let current: Option<String> = conn
414 .query_row(
415 "SELECT external FROM tasks WHERE task_id = ?1",
416 rusqlite::params![task_id],
417 |r| r.get::<_, Option<String>>(0),
418 )
419 .with_context(|| format!("read external for {task_id}"))?;
420 let next = match current {
421 Some(s) if !s.is_empty() => format!("{s},{reference}"),
422 _ => reference.to_string(),
423 };
424 conn.execute(
425 "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
426 rusqlite::params![next, task_id],
427 )?;
428 conn.execute(
429 "DELETE FROM task_pack_cache WHERE task_id = ?1",
430 rusqlite::params![task_id],
431 )?;
432 Ok(())
433}
434
435#[derive(Debug, Clone, Default)]
438pub struct TaskMetadata {
439 pub goal: Option<String>,
440 pub outcome: Option<String>,
441 pub outcome_tag: Option<String>,
442 pub external: Option<String>,
443}
444
445pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
446 let mut stmt =
447 conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
448 let mut rows = stmt.query(rusqlite::params![task_id])?;
449 Ok(match rows.next()? {
450 Some(r) => Some(TaskMetadata {
451 goal: r.get::<_, Option<String>>(0)?,
452 outcome: r.get::<_, Option<String>>(1)?,
453 outcome_tag: r.get::<_, Option<String>>(2)?,
454 external: r.get::<_, Option<String>>(3)?,
455 }),
456 None => None,
457 })
458}
459
460#[derive(Debug, Clone)]
463pub struct StaleTask {
464 pub task_id: String,
465 pub title: String,
466 pub last_event_at: String,
467 pub days_idle: i64,
468}
469
470pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
473 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
474 let cutoff_str = cutoff.to_rfc3339();
475 let mut stmt = conn.prepare(
476 "SELECT task_id, title, last_event_at FROM tasks
477 WHERE status = 'open' AND last_event_at < ?1
478 ORDER BY last_event_at ASC",
479 )?;
480 let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
481 Ok((
482 r.get::<_, String>(0)?,
483 r.get::<_, String>(1)?,
484 r.get::<_, String>(2)?,
485 ))
486 })?;
487 let now = chrono::Utc::now();
488 let mut out = Vec::new();
489 for row in rows {
490 let (task_id, title, last_at) = row?;
491 let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
492 .map(|d| d.with_timezone(&chrono::Utc))
493 .unwrap_or(now);
494 let days_idle = (now - dt).num_days();
495 out.push(StaleTask {
496 task_id,
497 title,
498 last_event_at: last_at,
499 days_idle,
500 });
501 }
502 Ok(out)
503}
504
505#[derive(Debug, Clone)]
510pub struct RelatedTask {
511 pub task_id: String,
512 pub status: String,
513 pub score: f64,
514}
515
516pub fn find_related_tasks(
527 conn: &Connection,
528 arts: &crate::artifacts::Artifacts,
529) -> anyhow::Result<Vec<RelatedTask>> {
530 use std::collections::HashMap;
531 if arts.is_empty() {
532 return Ok(Vec::new());
533 }
534 let mut scores: HashMap<String, f64> = HashMap::new();
535 let mut last_seen: HashMap<String, String> = HashMap::new();
536
537 let needles: Vec<(String, f64)> = arts
538 .linked_issues
539 .iter()
540 .map(|s| (s.clone(), 1.0))
541 .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
542 .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
543 .collect();
544
545 for (needle, weight) in needles {
546 let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
547 let mut stmt = conn.prepare(
548 "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
549 WHERE artifacts LIKE ?1
550 GROUP BY task_id
551 ORDER BY ts DESC",
552 )?;
553 let rows = stmt.query_map(rusqlite::params![pattern], |r| {
554 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
555 })?;
556 for row in rows {
557 let (id, ts) = row?;
558 *scores.entry(id.clone()).or_insert(0.0) += weight;
559 last_seen.insert(id, ts);
560 }
561 }
562
563 let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
564 for (id, score) in scores {
565 let status: Option<String> = conn
566 .query_row(
567 "SELECT status FROM tasks WHERE task_id = ?1",
568 rusqlite::params![&id],
569 |r| r.get(0),
570 )
571 .ok();
572 if let Some(status) = status {
573 out.push(RelatedTask {
574 task_id: id,
575 status,
576 score,
577 });
578 }
579 }
580 out.sort_by(|a, b| {
581 b.score
582 .partial_cmp(&a.score)
583 .unwrap_or(std::cmp::Ordering::Equal)
584 .then_with(|| {
585 let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
586 let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
587 ts_b.cmp(&ts_a)
588 })
589 });
590 Ok(out)
591}
592
593pub fn find_tasks_by_linked_issues(
600 conn: &Connection,
601 issues: &[String],
602) -> anyhow::Result<Vec<(String, String)>> {
603 if issues.is_empty() {
604 return Ok(Vec::new());
605 }
606 let mut candidate_ids: Vec<String> = Vec::new();
612 for issue in issues {
613 let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
614 let mut stmt = conn.prepare(
615 "SELECT DISTINCT task_id FROM events_index
616 WHERE artifacts LIKE ?1
617 ORDER BY timestamp DESC",
618 )?;
619 let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
620 for r in rows {
621 let id = r?;
622 if !candidate_ids.contains(&id) {
623 candidate_ids.push(id);
624 }
625 }
626 }
627 let mut out = Vec::with_capacity(candidate_ids.len());
629 for id in candidate_ids {
630 let status: Option<String> = conn
631 .query_row(
632 "SELECT status FROM tasks WHERE task_id = ?1",
633 rusqlite::params![&id],
634 |r| r.get(0),
635 )
636 .ok();
637 if let Some(s) = status {
638 out.push((id, s));
639 }
640 }
641 Ok(out)
642}
643
644pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
650 let mut stmt = conn.prepare(
651 "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
652 LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
653 WHERE ei.task_id = ?1",
654 )?;
655 let rows: Vec<(String, String)> = stmt
656 .query_map(rusqlite::params![task_id], |r| {
657 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
658 })?
659 .collect::<Result<_, _>>()?;
660 let count = rows.len();
661 for (event_id, text) in rows {
662 let arts = crate::artifacts::extract(&text);
663 let json = if arts.is_empty() {
664 None
665 } else {
666 Some(serde_json::to_string(&arts)?)
667 };
668 conn.execute(
669 "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
670 rusqlite::params![json, event_id],
671 )?;
672 }
673 invalidate_pack_cascade(conn, task_id)?;
674 Ok(count)
675}
676
677pub fn task_artifacts(
683 conn: &Connection,
684 task_id: &str,
685) -> anyhow::Result<crate::artifacts::Artifacts> {
686 let mut stmt = conn.prepare(
687 "SELECT artifacts FROM events_index
688 WHERE task_id = ?1 AND artifacts IS NOT NULL
689 ORDER BY timestamp ASC",
690 )?;
691 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
692 let mut acc = crate::artifacts::Artifacts::default();
693 for row in rows {
694 let json = row?;
695 if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
696 acc.merge(parsed);
697 }
698 }
699 Ok(acc)
700}
701
702fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
706 let mut stmt =
707 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
708 let mut rows = stmt.query(rusqlite::params![project_hash])?;
709 if let Some(row) = rows.next()? {
710 Ok(Some(row.get::<_, String>(0)?))
711 } else {
712 Ok(None)
713 }
714}
715
716fn record_last_indexed(
717 conn: &Connection,
718 project_hash: &str,
719 event_id: &str,
720) -> anyhow::Result<()> {
721 conn.execute(
722 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
723 VALUES (?1, ?2, ?3)
724 ON CONFLICT(project_hash) DO UPDATE SET
725 last_indexed_event_id = excluded.last_indexed_event_id,
726 updated_at = excluded.updated_at",
727 rusqlite::params![
728 project_hash,
729 event_id,
730 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
731 ],
732 )?;
733 Ok(())
734}
735
736pub fn ingest_new_events(
746 conn: &Connection,
747 jsonl_path: impl AsRef<Path>,
748 project_hash: &str,
749) -> anyhow::Result<usize> {
750 let marker = match last_indexed_event_id(conn, project_hash)? {
751 Some(id) => id,
752 None => return rebuild_state(conn, jsonl_path, project_hash),
753 };
754
755 let f = std::fs::File::open(&jsonl_path)
756 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
757 let reader = std::io::BufReader::new(f);
758
759 let tx = conn.unchecked_transaction()?;
763 let mut found_marker = false;
764 let mut count = 0;
765 let mut last_event_id: Option<String> = None;
766 for (i, line) in reader.lines().enumerate() {
767 let line = line.with_context(|| format!("read line {i}"))?;
768 if line.trim().is_empty() {
769 continue;
770 }
771 let event: Event = match serde_json::from_str(&line) {
772 Ok(e) => e,
773 Err(err) => {
774 tracing::warn!(
775 line_number = i + 1,
776 error = %err,
777 "skipping malformed JSONL line in ingest_new_events"
778 );
779 continue;
780 }
781 };
782 if !found_marker {
783 if event.event_id == marker {
784 found_marker = true;
785 }
786 continue;
787 }
788 upsert_task_from_event(&tx, &event, project_hash)?;
789 index_event(&tx, &event)?;
790 last_event_id = Some(event.event_id.clone());
791 count += 1;
792 }
793
794 if !found_marker {
795 drop(tx);
797 tracing::warn!(
798 project_hash = project_hash,
799 marker = marker.as_str(),
800 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
801 );
802 return rebuild_state(conn, jsonl_path, project_hash);
803 }
804
805 if let Some(eid) = last_event_id.as_deref() {
806 record_last_indexed(&tx, project_hash, eid)?;
807 }
808 tx.commit()?;
809 Ok(count)
810}
811
812pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
813 let type_str = serde_json::to_value(event.event_type)?
814 .as_str()
815 .unwrap()
816 .to_string();
817 let status_str = serde_json::to_value(event.status)?
818 .as_str()
819 .unwrap()
820 .to_string();
821 let artifacts = crate::artifacts::extract(&event.text);
826 let artifacts_json = if artifacts.is_empty() {
827 None
828 } else {
829 Some(serde_json::to_string(&artifacts)?)
830 };
831 conn.execute(
832 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
833 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
834 rusqlite::params![
835 event.event_id, event.task_id, type_str,
836 event.timestamp, event.confidence, status_str, artifacts_json
837 ],
838 )?;
839 conn.execute(
841 "DELETE FROM search_fts WHERE event_id=?1",
842 rusqlite::params![event.event_id],
843 )?;
844 conn.execute(
845 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
846 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
847 )?;
848
849 if event.event_type == EventType::Decision {
850 let alternatives_json = match event.meta.get("alternatives") {
854 Some(v) if !v.is_null() => Some(serde_json::to_string(v)?),
855 _ => None,
856 };
857 conn.execute(
858 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status, alternatives)
859 VALUES (?1, ?2, ?3, 'active', ?4)",
860 rusqlite::params![event.event_id, event.task_id, event.text, alternatives_json],
861 )?;
862 }
863
864 if event.event_type == EventType::Supersede {
865 if let Some(target) = &event.supersedes {
866 conn.execute(
867 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
868 rusqlite::params![event.event_id, target],
869 )?;
870 }
871 }
872
873 if event.event_type == EventType::Evidence {
874 let strength_str = event
875 .evidence_strength
876 .map(|s| {
877 serde_json::to_value(s)
878 .unwrap()
879 .as_str()
880 .unwrap()
881 .to_string()
882 })
883 .unwrap_or_else(|| "medium".into());
884 conn.execute(
885 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
886 VALUES (?1, ?2, ?3, ?4)",
887 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
888 )?;
889 }
890
891 invalidate_pack_cascade(conn, &event.task_id)?;
894
895 Ok(())
896}
897
898pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
899 if let Some(parent) = path.as_ref().parent() {
900 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
901 }
902 let conn =
903 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
904 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
905 apply_migrations(&conn).context("apply schema migrations")?;
906 Ok(conn)
907}
908
909#[derive(Debug, Clone)]
913pub struct TaskRow {
914 pub task_id: String,
915 pub title: String,
916 pub status: String,
917 pub last_event_at: String,
918 pub event_count: usize,
919}
920
921pub fn list_tasks_by_project(
925 conn: &Connection,
926 project_hash: &str,
927) -> anyhow::Result<Vec<TaskRow>> {
928 let mut stmt = conn.prepare(
929 "SELECT t.task_id, t.title, t.status, t.last_event_at,
930 COALESCE(c.cnt, 0) AS event_count
931 FROM tasks t
932 LEFT JOIN (
933 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
934 ) c ON c.task_id = t.task_id
935 WHERE t.project_hash = ?1
936 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
937 )?;
938 let rows = stmt
939 .query_map(rusqlite::params![project_hash], |r| {
940 Ok(TaskRow {
941 task_id: r.get::<_, String>(0)?,
942 title: r.get::<_, String>(1)?,
943 status: r.get::<_, String>(2)?,
944 last_event_at: r.get::<_, String>(3)?,
945 event_count: r.get::<_, i64>(4)? as usize,
946 })
947 })?
948 .collect::<Result<Vec<_>, _>>()?;
949 Ok(rows)
950}
951
952pub fn top_level_tasks(conn: &Connection, project_hash: &str) -> anyhow::Result<Vec<TaskRow>> {
956 let mut stmt = conn.prepare(
957 "SELECT t.task_id, t.title, t.status, t.last_event_at,
958 COALESCE(c.cnt, 0) AS event_count
959 FROM tasks t
960 LEFT JOIN (
961 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
962 ) c ON c.task_id = t.task_id
963 WHERE t.project_hash = ?1 AND t.parent_id IS NULL
964 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
965 )?;
966 let rows = stmt
967 .query_map(rusqlite::params![project_hash], |r| {
968 Ok(TaskRow {
969 task_id: r.get::<_, String>(0)?,
970 title: r.get::<_, String>(1)?,
971 status: r.get::<_, String>(2)?,
972 last_event_at: r.get::<_, String>(3)?,
973 event_count: r.get::<_, i64>(4)? as usize,
974 })
975 })?
976 .collect::<Result<Vec<_>, _>>()?;
977 Ok(rows)
978}
979
980pub fn children_of(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<TaskRow>> {
982 let mut stmt = conn.prepare(
983 "SELECT t.task_id, t.title, t.status, t.last_event_at,
984 COALESCE(c.cnt, 0) AS event_count
985 FROM tasks t
986 LEFT JOIN (
987 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
988 ) c ON c.task_id = t.task_id
989 WHERE t.parent_id = ?1
990 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
991 )?;
992 let rows = stmt
993 .query_map(rusqlite::params![task_id], |r| {
994 Ok(TaskRow {
995 task_id: r.get::<_, String>(0)?,
996 title: r.get::<_, String>(1)?,
997 status: r.get::<_, String>(2)?,
998 last_event_at: r.get::<_, String>(3)?,
999 event_count: r.get::<_, i64>(4)? as usize,
1000 })
1001 })?
1002 .collect::<Result<Vec<_>, _>>()?;
1003 Ok(rows)
1004}
1005
1006pub fn parent_of(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
1008 let mut stmt = conn.prepare("SELECT parent_id FROM tasks WHERE task_id = ?1")?;
1009 let mut rows = stmt.query(rusqlite::params![task_id])?;
1010 Ok(match rows.next()? {
1011 Some(r) => r.get::<_, Option<String>>(0)?,
1012 None => None,
1013 })
1014}
1015
1016pub fn would_create_cycle(
1021 conn: &Connection,
1022 task_id: &str,
1023 new_parent: &str,
1024) -> anyhow::Result<bool> {
1025 if task_id == new_parent {
1026 return Ok(true);
1027 }
1028 let mut cursor = Some(new_parent.to_string());
1029 for _ in 0..64 {
1030 let Some(cur) = cursor else {
1031 return Ok(false);
1032 };
1033 if cur == task_id {
1034 return Ok(true);
1035 }
1036 cursor = parent_of(conn, &cur)?;
1037 }
1038 Ok(true)
1040}
1041
1042pub fn count_open_children(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
1044 let n: i64 = conn.query_row(
1045 "SELECT COUNT(*) FROM tasks WHERE parent_id = ?1 AND status = 'open'",
1046 rusqlite::params![task_id],
1047 |r| r.get(0),
1048 )?;
1049 Ok(n as usize)
1050}
1051
1052pub fn invalidate_pack_cascade(conn: &Connection, task_id: &str) -> anyhow::Result<()> {
1054 conn.execute(
1055 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1056 rusqlite::params![task_id],
1057 )?;
1058 if let Some(parent) = parent_of(conn, task_id)? {
1059 conn.execute(
1060 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1061 rusqlite::params![parent],
1062 )?;
1063 }
1064 Ok(())
1065}
1066
1067pub struct PendingEmbed {
1073 pub event_id: String,
1074 pub task_id: String,
1075 pub text: String,
1076}
1077
1078pub fn events_needing_embedding(
1082 conn: &Connection,
1083 model: &str,
1084 limit: usize,
1085) -> anyhow::Result<Vec<PendingEmbed>> {
1086 let mut stmt = conn.prepare(
1087 "SELECT f.event_id, f.task_id, f.text
1088 FROM search_fts f
1089 LEFT JOIN embeddings e ON e.event_id = f.event_id AND e.model = ?1
1090 WHERE e.event_id IS NULL
1091 LIMIT ?2",
1092 )?;
1093 let rows = stmt.query_map(rusqlite::params![model, limit as i64], |r| {
1094 Ok(PendingEmbed {
1095 event_id: r.get(0)?,
1096 task_id: r.get(1)?,
1097 text: r.get(2)?,
1098 })
1099 })?;
1100 let mut out = Vec::new();
1101 for r in rows {
1102 out.push(r?);
1103 }
1104 Ok(out)
1105}
1106
1107#[allow(clippy::too_many_arguments)]
1110pub fn upsert_embedding(
1111 conn: &Connection,
1112 event_id: &str,
1113 task_id: &str,
1114 project_hash: &str,
1115 tier: &str,
1116 model: &str,
1117 dim: usize,
1118 vec: &[f32],
1119 created_at: &str,
1120) -> anyhow::Result<()> {
1121 conn.execute(
1122 "INSERT OR REPLACE INTO embeddings(event_id, task_id, project_hash, tier, model, dim, vec, created_at)
1123 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1124 rusqlite::params![
1125 event_id,
1126 task_id,
1127 project_hash,
1128 tier,
1129 model,
1130 dim as i64,
1131 crate::embed::to_blob(vec),
1132 created_at
1133 ],
1134 )?;
1135 Ok(())
1136}
1137
1138pub fn high_signal_events(
1141 conn: &Connection,
1142 limit: usize,
1143) -> anyhow::Result<Vec<(String, String)>> {
1144 let mut stmt = conn.prepare(
1145 "SELECT f.event_id, f.text
1146 FROM search_fts f
1147 JOIN events_index ei ON ei.event_id = f.event_id
1148 WHERE f.type IN ('decision', 'constraint', 'rejection')
1149 ORDER BY ei.timestamp DESC
1150 LIMIT ?1",
1151 )?;
1152 let rows = stmt.query_map(rusqlite::params![limit as i64], |r| {
1153 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
1154 })?;
1155 let mut out = Vec::new();
1156 for r in rows {
1157 out.push(r?);
1158 }
1159 Ok(out)
1160}
1161
1162pub fn find_task_by_title(conn: &Connection, title: &str) -> anyhow::Result<Option<String>> {
1165 let mut stmt = conn.prepare("SELECT task_id FROM tasks WHERE title = ?1 LIMIT 1")?;
1166 let mut rows = stmt.query(rusqlite::params![title])?;
1167 match rows.next()? {
1168 Some(row) => Ok(Some(row.get(0)?)),
1169 None => Ok(None),
1170 }
1171}
1172
1173pub fn task_event_texts(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<String>> {
1175 let mut stmt = conn.prepare("SELECT text FROM search_fts WHERE task_id = ?1")?;
1176 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
1177 let mut out = Vec::new();
1178 for r in rows {
1179 out.push(r?);
1180 }
1181 Ok(out)
1182}
1183
1184pub fn count_embeddings(conn: &Connection, project_hash: &str) -> anyhow::Result<usize> {
1186 let n: i64 = conn.query_row(
1187 "SELECT COUNT(*) FROM embeddings WHERE project_hash = ?1",
1188 rusqlite::params![project_hash],
1189 |r| r.get(0),
1190 )?;
1191 Ok(n as usize)
1192}
1193
1194pub fn embed_pending(
1202 conn: &Connection,
1203 project_hash: &str,
1204 embedder: &dyn crate::embed::Embedder,
1205 created_at: &str,
1206 limit: usize,
1207) -> anyhow::Result<usize> {
1208 let pending = events_needing_embedding(conn, embedder.model_id(), limit)?;
1209 if pending.is_empty() {
1210 return Ok(0);
1211 }
1212 let texts: Vec<&str> = pending.iter().map(|p| p.text.as_str()).collect();
1213 let vecs = embedder.embed(&texts)?;
1214 let mut done = 0usize;
1215 for (p, v) in pending.iter().zip(vecs.iter()) {
1216 upsert_embedding(
1217 conn,
1218 &p.event_id,
1219 &p.task_id,
1220 project_hash,
1221 "episodic",
1222 embedder.model_id(),
1223 embedder.dim(),
1224 v,
1225 created_at,
1226 )?;
1227 done += 1;
1228 }
1229 Ok(done)
1230}
1231
1232pub struct ScoredHit {
1234 pub event_id: String,
1235 pub task_id: String,
1236 pub task_title: String,
1237 pub event_type: String,
1238 pub tier: String,
1239 pub text: String,
1240 pub score: f32,
1241}
1242
1243pub fn semantic_search(
1249 conn: &Connection,
1250 project_hash: &str,
1251 query_vec: &[f32],
1252 model: &str,
1253 k: usize,
1254) -> anyhow::Result<Vec<ScoredHit>> {
1255 let mut stmt = conn.prepare(
1256 "SELECT e.event_id, e.task_id, e.tier, e.vec, f.text, f.type,
1257 COALESCE(t.title, '')
1258 FROM embeddings e
1259 JOIN search_fts f ON f.event_id = e.event_id
1260 LEFT JOIN tasks t ON t.task_id = e.task_id
1261 WHERE e.project_hash = ?1 AND e.model = ?2",
1262 )?;
1263 let rows = stmt.query_map(rusqlite::params![project_hash, model], |r| {
1264 let blob: Vec<u8> = r.get(3)?;
1265 Ok((
1266 r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?, blob,
1270 r.get::<_, String>(4)?, r.get::<_, String>(5)?, r.get::<_, String>(6)?, ))
1274 })?;
1275
1276 let mut hits: Vec<ScoredHit> = Vec::new();
1277 for row in rows {
1278 let (event_id, task_id, tier, blob, text, event_type, task_title) = row?;
1279 let score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
1280 hits.push(ScoredHit {
1281 event_id,
1282 task_id,
1283 task_title,
1284 event_type,
1285 tier,
1286 text,
1287 score,
1288 });
1289 }
1290 hits.sort_by(|a, b| {
1291 b.score
1292 .partial_cmp(&a.score)
1293 .unwrap_or(std::cmp::Ordering::Equal)
1294 });
1295 hits.truncate(k);
1296 Ok(hits)
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302 use crate::embed::Embedder;
1303 use tempfile::TempDir;
1304
1305 #[test]
1306 fn task_exists_returns_true_for_known_id_false_otherwise() {
1307 let d = TempDir::new().unwrap();
1308 let conn = open(d.path().join("s.sqlite")).unwrap();
1309
1310 assert!(!task_exists(&conn, "tj-nope").unwrap());
1311
1312 let e = make_open_event("tj-yes", "Hello");
1313 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
1314 index_event(&conn, &e).unwrap();
1315
1316 assert!(task_exists(&conn, "tj-yes").unwrap());
1317 assert!(!task_exists(&conn, "tj-nope").unwrap());
1318 }
1319
1320 #[test]
1321 fn fresh_db_runs_all_migrations() {
1322 let d = TempDir::new().unwrap();
1323 let p = d.path().join("state.sqlite");
1324 let conn = open(&p).unwrap();
1325
1326 let applied: Vec<i64> = conn
1327 .prepare("SELECT version FROM schema_migrations ORDER BY version")
1328 .unwrap()
1329 .query_map([], |r| r.get::<_, i64>(0))
1330 .unwrap()
1331 .collect::<Result<_, _>>()
1332 .unwrap();
1333 assert_eq!(
1334 applied,
1335 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
1336 "every declared migration must be recorded"
1337 );
1338 }
1339
1340 #[test]
1341 fn apply_migrations_is_idempotent_across_reopens() {
1342 let d = TempDir::new().unwrap();
1343 let p = d.path().join("state.sqlite");
1344 let _ = open(&p).unwrap();
1345 let _ = open(&p).unwrap();
1346
1347 let count: i64 = open(&p)
1348 .unwrap()
1349 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
1350 .unwrap();
1351 assert_eq!(
1352 count,
1353 MIGRATIONS.len() as i64,
1354 "schema_migrations must contain exactly one row per declared migration after repeated opens"
1355 );
1356 }
1357
1358 fn make_text_event(text: &str) -> crate::event::Event {
1359 crate::event::Event::new(
1360 "tj-x",
1361 crate::event::EventType::Finding,
1362 crate::event::Author::User,
1363 crate::event::Source::Cli,
1364 text.into(),
1365 )
1366 }
1367
1368 #[test]
1369 fn embed_pending_embeds_all_then_is_idempotent() {
1370 let d = TempDir::new().unwrap();
1371 let conn = open(d.path().join("s.sqlite")).unwrap();
1372 let ph = "feedfacefeedface";
1373
1374 for text in [
1375 "implement payment refund deduplication",
1376 "add validation for negative order amounts",
1377 ] {
1378 index_event(&conn, &make_text_event(text)).unwrap();
1379 }
1380
1381 let emb = crate::embed::HashEmbedder::new(64);
1382 let at = "2026-06-12T00:00:00Z";
1383
1384 let n = embed_pending(&conn, ph, &emb, at, 100).unwrap();
1385 assert_eq!(n, 2, "both events embedded on first pass");
1386 assert_eq!(count_embeddings(&conn, ph).unwrap(), 2);
1387
1388 assert_eq!(embed_pending(&conn, ph, &emb, at, 100).unwrap(), 0);
1390
1391 assert_eq!(
1394 events_needing_embedding(&conn, "other-model", 100)
1395 .unwrap()
1396 .len(),
1397 2
1398 );
1399 }
1400
1401 #[test]
1402 fn semantic_search_ranks_relevant_event_first() {
1403 let d = TempDir::new().unwrap();
1404 let conn = open(d.path().join("s.sqlite")).unwrap();
1405 let ph = "feedfacefeedface";
1406
1407 for text in [
1408 "fix duplicate payment refund write on partial refund",
1409 "update the frontend button hover color",
1410 "add a database index for faster user lookup",
1411 ] {
1412 index_event(&conn, &make_text_event(text)).unwrap();
1413 }
1414 let emb = crate::embed::HashEmbedder::new(256);
1415 embed_pending(&conn, ph, &emb, "t", 100).unwrap();
1416
1417 let q = emb.embed_one("payment refund duplicated").unwrap();
1418 let hits = semantic_search(&conn, ph, &q, emb.model_id(), 3).unwrap();
1419
1420 assert_eq!(hits.len(), 3);
1421 assert!(
1422 hits[0].text.contains("refund"),
1423 "the refund event must rank first, got: {}",
1424 hits[0].text
1425 );
1426 assert!(
1427 hits[0].score >= hits[1].score,
1428 "hits must be sorted by score desc"
1429 );
1430 }
1431
1432 #[test]
1433 fn open_creates_all_tables() {
1434 let d = TempDir::new().unwrap();
1435 let p = d.path().join("state.sqlite");
1436 let conn = open(&p).unwrap();
1437
1438 let names: Vec<String> = conn
1439 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
1440 .unwrap()
1441 .query_map([], |r| r.get::<_, String>(0))
1442 .unwrap()
1443 .collect::<Result<_, _>>()
1444 .unwrap();
1445
1446 for required in [
1447 "decisions",
1448 "events_index",
1449 "evidence",
1450 "task_pack_cache",
1451 "tasks",
1452 "search_fts",
1453 ] {
1454 assert!(
1455 names.iter().any(|n| n == required),
1456 "missing table {required}, have {names:?}"
1457 );
1458 }
1459 }
1460
1461 #[test]
1462 fn open_is_idempotent() {
1463 let d = TempDir::new().unwrap();
1464 let p = d.path().join("state.sqlite");
1465 let _ = open(&p).unwrap();
1466 let _ = open(&p).unwrap();
1467 }
1468
1469 #[test]
1470 fn index_event_projects_evidence() {
1471 let d = TempDir::new().unwrap();
1472 let conn = open(d.path().join("s.sqlite")).unwrap();
1473 let mut open_e = crate::event::Event::new(
1474 "tj-e",
1475 crate::event::EventType::Open,
1476 crate::event::Author::User,
1477 crate::event::Source::Cli,
1478 "x".into(),
1479 );
1480 open_e.meta = serde_json::json!({"title": "T"});
1481 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1482 index_event(&conn, &open_e).unwrap();
1483
1484 let mut ev = crate::event::Event::new(
1485 "tj-e",
1486 crate::event::EventType::Evidence,
1487 crate::event::Author::Agent,
1488 crate::event::Source::Chat,
1489 "Hook startup measured at 12ms".into(),
1490 );
1491 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
1492 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
1493 index_event(&conn, &ev).unwrap();
1494
1495 let (text, strength): (String, String) = conn
1496 .query_row(
1497 "SELECT text, strength FROM evidence WHERE task_id=?1",
1498 rusqlite::params!["tj-e"],
1499 |r| Ok((r.get(0)?, r.get(1)?)),
1500 )
1501 .unwrap();
1502 assert!(text.contains("12ms"));
1503 assert_eq!(strength, "strong");
1504 }
1505
1506 #[test]
1507 fn supersede_event_marks_decision_superseded() {
1508 let d = TempDir::new().unwrap();
1509 let conn = open(d.path().join("s.sqlite")).unwrap();
1510 let mut open_e = crate::event::Event::new(
1511 "tj-s",
1512 crate::event::EventType::Open,
1513 crate::event::Author::User,
1514 crate::event::Source::Cli,
1515 "x".into(),
1516 );
1517 open_e.meta = serde_json::json!({"title": "T"});
1518 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1519 index_event(&conn, &open_e).unwrap();
1520
1521 let dec = crate::event::Event::new(
1522 "tj-s",
1523 crate::event::EventType::Decision,
1524 crate::event::Author::Agent,
1525 crate::event::Source::Chat,
1526 "Use TS".into(),
1527 );
1528 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1529 index_event(&conn, &dec).unwrap();
1530
1531 let mut sup = crate::event::Event::new(
1532 "tj-s",
1533 crate::event::EventType::Supersede,
1534 crate::event::Author::Agent,
1535 crate::event::Source::Chat,
1536 "Replaced by Rust decision".into(),
1537 );
1538 sup.supersedes = Some(dec.event_id.clone());
1539 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1540 index_event(&conn, &sup).unwrap();
1541
1542 let (status, by): (String, Option<String>) = conn
1543 .query_row(
1544 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1545 rusqlite::params![dec.event_id],
1546 |r| Ok((r.get(0)?, r.get(1)?)),
1547 )
1548 .unwrap();
1549 assert_eq!(status, "superseded");
1550 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1551 }
1552
1553 #[test]
1554 fn index_event_projects_decision_to_decisions_table() {
1555 let d = TempDir::new().unwrap();
1556 let conn = open(d.path().join("s.sqlite")).unwrap();
1557
1558 let mut open_e = crate::event::Event::new(
1559 "tj-d",
1560 crate::event::EventType::Open,
1561 crate::event::Author::User,
1562 crate::event::Source::Cli,
1563 "x".into(),
1564 );
1565 open_e.meta = serde_json::json!({"title": "T"});
1566 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1567 index_event(&conn, &open_e).unwrap();
1568
1569 let dec = crate::event::Event::new(
1570 "tj-d",
1571 crate::event::EventType::Decision,
1572 crate::event::Author::Agent,
1573 crate::event::Source::Chat,
1574 "Adopt Rust".into(),
1575 );
1576 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1577 index_event(&conn, &dec).unwrap();
1578
1579 let (id, text, status): (String, String, String) = conn
1580 .query_row(
1581 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1582 rusqlite::params!["tj-d"],
1583 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1584 )
1585 .unwrap();
1586 assert_eq!(id, dec.event_id);
1587 assert_eq!(text, "Adopt Rust");
1588 assert_eq!(status, "active");
1589 }
1590
1591 #[test]
1592 fn index_event_projects_decision_alternatives_into_column() {
1593 let d = TempDir::new().unwrap();
1594 let conn = open(d.path().join("s.sqlite")).unwrap();
1595
1596 let mut dec = crate::event::Event::new(
1597 "tj-alt",
1598 crate::event::EventType::Decision,
1599 crate::event::Author::Agent,
1600 crate::event::Source::Chat,
1601 "Use SQLite".into(),
1602 );
1603 dec.meta = serde_json::json!({
1604 "alternatives": [
1605 {"option": "SQLite", "chosen": true, "rationale": "embedded, zero-ops"},
1606 {"option": "Postgres", "chosen": false, "rationale": "too heavy for local tool"}
1607 ]
1608 });
1609 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1610 index_event(&conn, &dec).unwrap();
1611
1612 let alts: Option<String> = conn
1613 .query_row(
1614 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1615 rusqlite::params![dec.event_id],
1616 |r| r.get(0),
1617 )
1618 .unwrap();
1619 let alts = alts.expect("alternatives column should be populated");
1620 let parsed: serde_json::Value = serde_json::from_str(&alts).unwrap();
1621 assert_eq!(parsed.as_array().unwrap().len(), 2);
1622 assert_eq!(parsed[0]["option"], "SQLite");
1623 assert_eq!(parsed[0]["chosen"], true);
1624 }
1625
1626 #[test]
1627 fn index_event_decision_without_alternatives_leaves_column_null() {
1628 let d = TempDir::new().unwrap();
1629 let conn = open(d.path().join("s.sqlite")).unwrap();
1630
1631 let dec = crate::event::Event::new(
1632 "tj-noalt",
1633 crate::event::EventType::Decision,
1634 crate::event::Author::Agent,
1635 crate::event::Source::Chat,
1636 "Plain decision".into(),
1637 );
1638 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1639 index_event(&conn, &dec).unwrap();
1640
1641 let alts: Option<String> = conn
1642 .query_row(
1643 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1644 rusqlite::params![dec.event_id],
1645 |r| r.get(0),
1646 )
1647 .unwrap();
1648 assert!(alts.is_none());
1649 }
1650
1651 #[test]
1652 fn index_event_is_idempotent_no_search_fts_duplicates() {
1653 let d = TempDir::new().unwrap();
1654 let conn = open(d.path().join("s.sqlite")).unwrap();
1655 let mut open_e = crate::event::Event::new(
1656 "tj-id",
1657 crate::event::EventType::Open,
1658 crate::event::Author::User,
1659 crate::event::Source::Cli,
1660 "x".into(),
1661 );
1662 open_e.meta = serde_json::json!({"title": "Idempotent"});
1663 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1664
1665 index_event(&conn, &open_e).unwrap();
1667 index_event(&conn, &open_e).unwrap();
1668 index_event(&conn, &open_e).unwrap();
1669
1670 let n: i64 = conn
1671 .query_row(
1672 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1673 rusqlite::params![open_e.event_id],
1674 |r| r.get(0),
1675 )
1676 .unwrap();
1677 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1678 }
1679
1680 #[test]
1681 fn list_all_projects_returns_hashes_from_state_dir() {
1682 use std::fs::File;
1683 let d = TempDir::new().unwrap();
1684 let state_dir = d.path().join("state");
1685 std::fs::create_dir_all(&state_dir).unwrap();
1686 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1687 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1688 File::create(state_dir.join("not-a-project.txt")).unwrap();
1689
1690 let mut hashes = list_all_projects(&state_dir).unwrap();
1691 hashes.sort();
1692 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1693 }
1694
1695 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1696 use std::io::Write;
1697 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1698 }
1699
1700 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1701 let mut e = crate::event::Event::new(
1702 task_id,
1703 crate::event::EventType::Open,
1704 crate::event::Author::User,
1705 crate::event::Source::Cli,
1706 "x".into(),
1707 );
1708 e.meta = serde_json::json!({"title": title});
1709 e
1710 }
1711
1712 #[test]
1713 fn ingest_new_events_picks_up_only_new_lines() {
1714 let d = TempDir::new().unwrap();
1715 let jsonl = d.path().join("events.jsonl");
1716 let db = d.path().join("s.sqlite");
1717 let project = "deadbeefdeadbeef";
1718
1719 let e1 = make_open_event("tj-i1", "first");
1720 let e2 = make_open_event("tj-i2", "second");
1721 let e3 = make_open_event("tj-i3", "third");
1722
1723 let mut f = std::fs::File::create(&jsonl).unwrap();
1724 write_event_line(&mut f, &e1);
1725 write_event_line(&mut f, &e2);
1726 write_event_line(&mut f, &e3);
1727 drop(f);
1728
1729 let conn = open(&db).unwrap();
1731 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1732 assert_eq!(n_first, 3);
1733
1734 let e4 = make_open_event("tj-i4", "fourth");
1736 let e5 = make_open_event("tj-i5", "fifth");
1737 let mut f = std::fs::OpenOptions::new()
1738 .append(true)
1739 .open(&jsonl)
1740 .unwrap();
1741 write_event_line(&mut f, &e4);
1742 write_event_line(&mut f, &e5);
1743 drop(f);
1744
1745 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1747 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1748
1749 let total: i64 = conn
1750 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1751 .unwrap();
1752 assert_eq!(total, 5);
1753
1754 let marker: String = conn
1755 .query_row(
1756 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1757 rusqlite::params![project],
1758 |r| r.get(0),
1759 )
1760 .unwrap();
1761 assert_eq!(marker, e5.event_id);
1762 }
1763
1764 #[test]
1765 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1766 let d = TempDir::new().unwrap();
1767 let jsonl = d.path().join("events.jsonl");
1768 let db = d.path().join("s.sqlite");
1769 let project = "feedfacefeedface";
1770
1771 let e1 = make_open_event("tj-r1", "first");
1772 let mut f = std::fs::File::create(&jsonl).unwrap();
1773 write_event_line(&mut f, &e1);
1774 drop(f);
1775
1776 let conn = open(&db).unwrap();
1777 ingest_new_events(&conn, &jsonl, project).unwrap();
1778
1779 let e2 = make_open_event("tj-r2", "after-corruption");
1782 let e3 = make_open_event("tj-r3", "after-corruption-2");
1783 let mut f = std::fs::File::create(&jsonl).unwrap();
1784 write_event_line(&mut f, &e2);
1785 write_event_line(&mut f, &e3);
1786 drop(f);
1787
1788 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1789 assert_eq!(n, 2, "missing marker must trigger full rebuild");
1790 }
1791
1792 #[test]
1793 fn rebuild_state_and_ingest_new_events_produce_same_state() {
1794 let d = TempDir::new().unwrap();
1795 let jsonl_a = d.path().join("a.jsonl");
1796 let jsonl_b = d.path().join("b.jsonl");
1797 let db_a = d.path().join("a.sqlite");
1798 let db_b = d.path().join("b.sqlite");
1799
1800 let events: Vec<_> = (0..5)
1801 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1802 .collect();
1803 for path in [&jsonl_a, &jsonl_b] {
1804 let mut f = std::fs::File::create(path).unwrap();
1805 for e in &events {
1806 write_event_line(&mut f, e);
1807 }
1808 }
1809
1810 let conn_a = open(&db_a).unwrap();
1811 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1812
1813 let conn_b = open(&db_b).unwrap();
1814 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1815
1816 assert_eq!(n_a, n_b);
1817 assert_eq!(n_a, 5);
1818
1819 for table in ["tasks", "events_index"] {
1820 let q = format!("SELECT COUNT(*) FROM {table}");
1821 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1822 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1823 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1824 }
1825 }
1826
1827 #[test]
1828 fn rebuild_state_skips_malformed_jsonl_lines() {
1829 use std::io::Write;
1830 let d = TempDir::new().unwrap();
1831 let events_path = d.path().join("events.jsonl");
1832 let db_path = d.path().join("s.sqlite");
1833
1834 let mut f = std::fs::File::create(&events_path).unwrap();
1835
1836 let mut e1 = crate::event::Event::new(
1837 "tj-skip",
1838 crate::event::EventType::Open,
1839 crate::event::Author::User,
1840 crate::event::Source::Cli,
1841 "x".into(),
1842 );
1843 e1.meta = serde_json::json!({"title": "Skip test"});
1844 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1845
1846 writeln!(f, "this is not a json event line").unwrap();
1848
1849 writeln!(f, "{{\"foo\": 1}}").unwrap();
1851
1852 let e3 = crate::event::Event::new(
1853 "tj-skip",
1854 crate::event::EventType::Decision,
1855 crate::event::Author::Agent,
1856 crate::event::Source::Chat,
1857 "Adopt Rust".into(),
1858 );
1859 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1860 drop(f);
1861
1862 let conn = open(&db_path).unwrap();
1863 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1864 .expect("rebuild_state must succeed despite malformed lines");
1865 assert_eq!(
1866 n, 2,
1867 "expected 2 valid events indexed (2 malformed skipped)"
1868 );
1869
1870 let indexed: i64 = conn
1871 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1872 .unwrap();
1873 assert_eq!(indexed, 2);
1874 }
1875
1876 #[test]
1877 fn rebuild_state_reads_jsonl_and_populates_db() {
1878 use std::io::Write;
1879 let d = TempDir::new().unwrap();
1880 let events_path = d.path().join("events.jsonl");
1881 let db_path = d.path().join("s.sqlite");
1882
1883 let mut f = std::fs::File::create(&events_path).unwrap();
1884 let mut e1 = crate::event::Event::new(
1885 "tj-9",
1886 crate::event::EventType::Open,
1887 crate::event::Author::User,
1888 crate::event::Source::Cli,
1889 "x".into(),
1890 );
1891 e1.meta = serde_json::json!({"title": "Nine"});
1892 let e2 = crate::event::Event::new(
1893 "tj-9",
1894 crate::event::EventType::Decision,
1895 crate::event::Author::Agent,
1896 crate::event::Source::Chat,
1897 "Adopt Rust".into(),
1898 );
1899 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1900 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1901 drop(f);
1902
1903 let conn = open(&db_path).unwrap();
1904 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1905 assert_eq!(n, 2);
1906
1907 let n: i64 = conn
1908 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1909 .unwrap();
1910 assert_eq!(n, 1);
1911 let n: i64 = conn
1912 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1913 .unwrap();
1914 assert_eq!(n, 2);
1915 }
1916
1917 #[test]
1918 fn index_event_writes_index_and_fts() {
1919 let d = TempDir::new().unwrap();
1920 let conn = open(d.path().join("s.sqlite")).unwrap();
1921 let mut open_e = crate::event::Event::new(
1922 "tj-1",
1923 crate::event::EventType::Open,
1924 crate::event::Author::User,
1925 crate::event::Source::Cli,
1926 "Title".into(),
1927 );
1928 open_e.meta = serde_json::json!({"title": "Title"});
1929 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1930 index_event(&conn, &open_e).unwrap();
1931
1932 let mut decision = crate::event::Event::new(
1933 "tj-1",
1934 crate::event::EventType::Decision,
1935 crate::event::Author::Agent,
1936 crate::event::Source::Chat,
1937 "Adopt Rust".into(),
1938 );
1939 decision.confidence = Some(0.92);
1940 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1941 index_event(&conn, &decision).unwrap();
1942
1943 let count: i64 = conn
1944 .query_row(
1945 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1946 rusqlite::params!["tj-1"],
1947 |r| r.get(0),
1948 )
1949 .unwrap();
1950 assert_eq!(count, 2);
1951
1952 let mut stmt = conn
1953 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1954 .unwrap();
1955 let hits: Vec<String> = stmt
1956 .query_map(rusqlite::params!["Rust"], |r| {
1957 let s: String = r.get(0)?;
1958 Ok(s)
1959 })
1960 .unwrap()
1961 .collect::<Result<Vec<_>, _>>()
1962 .unwrap();
1963 assert_eq!(hits.len(), 1);
1964 assert_eq!(hits[0], decision.event_id);
1965 }
1966
1967 #[test]
1968 fn upsert_task_from_open_event_inserts_row() {
1969 let d = TempDir::new().unwrap();
1970 let conn = open(d.path().join("s.sqlite")).unwrap();
1971
1972 let mut e = crate::event::Event::new(
1973 "tj-7f3a",
1974 crate::event::EventType::Open,
1975 crate::event::Author::User,
1976 crate::event::Source::Cli,
1977 "Add OAuth".into(),
1978 );
1979 e.meta = serde_json::json!({ "title": "Add OAuth login" });
1980
1981 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1982
1983 let (id, title, status): (String, String, String) = conn
1984 .query_row(
1985 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1986 ["tj-7f3a"],
1987 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1988 )
1989 .unwrap();
1990
1991 assert_eq!(id, "tj-7f3a");
1992 assert_eq!(title, "Add OAuth login");
1993 assert_eq!(status, "open");
1994 }
1995
1996 #[test]
1997 fn migration_adds_parent_id_column_nullable() {
1998 let d = tempfile::TempDir::new().unwrap();
1999 let conn = open(d.path().join("s.sqlite")).unwrap();
2000
2001 let e = make_open_event("tj-a", "Top");
2003 upsert_task_from_event(&conn, &e, "ph").unwrap();
2004
2005 let parent: Option<String> = conn
2006 .query_row(
2007 "SELECT parent_id FROM tasks WHERE task_id = ?1",
2008 rusqlite::params!["tj-a"],
2009 |r| r.get(0),
2010 )
2011 .unwrap();
2012 assert_eq!(parent, None);
2013 }
2014
2015 #[test]
2016 fn open_event_meta_parent_id_is_persisted() {
2017 let d = tempfile::TempDir::new().unwrap();
2018 let conn = open(d.path().join("s.sqlite")).unwrap();
2019
2020 upsert_task_from_event(&conn, &make_open_event("tj-parent", "Parent"), "ph").unwrap();
2022
2023 let mut child = make_open_event("tj-child", "Child");
2025 child.meta = serde_json::json!({"title": "Child", "parent_id": "tj-parent"});
2026 upsert_task_from_event(&conn, &child, "ph").unwrap();
2027
2028 let parent: Option<String> = conn
2029 .query_row(
2030 "SELECT parent_id FROM tasks WHERE task_id = ?1",
2031 rusqlite::params!["tj-child"],
2032 |r| r.get(0),
2033 )
2034 .unwrap();
2035 assert_eq!(parent.as_deref(), Some("tj-parent"));
2036 }
2037
2038 #[test]
2039 fn children_of_and_parent_of_work() {
2040 let d = tempfile::TempDir::new().unwrap();
2041 let conn = open(d.path().join("s.sqlite")).unwrap();
2042 upsert_task_from_event(&conn, &make_open_event("p", "Parent"), "ph").unwrap();
2043
2044 let mut c1 = make_open_event("c1", "Child1");
2045 c1.meta = serde_json::json!({"title": "Child1", "parent_id": "p"});
2046 upsert_task_from_event(&conn, &c1, "ph").unwrap();
2047 let mut c2 = make_open_event("c2", "Child2");
2048 c2.meta = serde_json::json!({"title": "Child2", "parent_id": "p"});
2049 upsert_task_from_event(&conn, &c2, "ph").unwrap();
2050
2051 let kids = children_of(&conn, "p").unwrap();
2052 let ids: Vec<&str> = kids.iter().map(|t| t.task_id.as_str()).collect();
2053 assert!(ids.contains(&"c1") && ids.contains(&"c2"));
2054 assert_eq!(kids.len(), 2);
2055
2056 assert_eq!(parent_of(&conn, "c1").unwrap().as_deref(), Some("p"));
2057 assert_eq!(parent_of(&conn, "p").unwrap(), None);
2058 }
2059
2060 #[test]
2061 fn cycle_guard_rejects_self_and_ancestor() {
2062 let d = tempfile::TempDir::new().unwrap();
2063 let conn = open(d.path().join("s.sqlite")).unwrap();
2064 upsert_task_from_event(&conn, &make_open_event("a", "A"), "ph").unwrap();
2065 let mut b = make_open_event("b", "B");
2066 b.meta = serde_json::json!({"title": "B", "parent_id": "a"});
2067 upsert_task_from_event(&conn, &b, "ph").unwrap();
2068
2069 assert!(would_create_cycle(&conn, "a", "b").unwrap());
2071 assert!(would_create_cycle(&conn, "a", "a").unwrap());
2073 upsert_task_from_event(&conn, &make_open_event("x", "X"), "ph").unwrap();
2075 assert!(!would_create_cycle(&conn, "x", "a").unwrap());
2076 }
2077
2078 #[test]
2079 fn invalidate_cascade_clears_parent_pack() {
2080 let d = tempfile::TempDir::new().unwrap();
2081 let conn = open(d.path().join("s.sqlite")).unwrap();
2082 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2083 let mut c = make_open_event("c", "C");
2084 c.meta = serde_json::json!({"title": "C", "parent_id": "p"});
2085 upsert_task_from_event(&conn, &c, "ph").unwrap();
2086
2087 for id in ["p", "c"] {
2089 conn.execute(
2090 "INSERT INTO task_pack_cache(task_id, mode, text, generated_at, source_event_count)
2091 VALUES (?1, 'compact', 'x', '2026-01-01T00:00:00Z', 1)",
2092 rusqlite::params![id],
2093 )
2094 .unwrap();
2095 }
2096
2097 invalidate_pack_cascade(&conn, "c").unwrap();
2098
2099 let remaining: i64 = conn
2100 .query_row("SELECT COUNT(*) FROM task_pack_cache", [], |r| r.get(0))
2101 .unwrap();
2102 assert_eq!(remaining, 0, "both child and parent pack caches cleared");
2103 }
2104
2105 #[test]
2106 fn count_open_children_counts_only_open() {
2107 let d = tempfile::TempDir::new().unwrap();
2108 let conn = open(d.path().join("s.sqlite")).unwrap();
2109 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2110 let mut c1 = make_open_event("c1", "C1");
2111 c1.meta = serde_json::json!({"title": "C1", "parent_id": "p"});
2112 upsert_task_from_event(&conn, &c1, "ph").unwrap();
2113 let mut close = crate::event::Event::new(
2115 "c1",
2116 crate::event::EventType::Close,
2117 crate::event::Author::User,
2118 crate::event::Source::Cli,
2119 "done".into(),
2120 );
2121 close.timestamp = "2026-01-02T00:00:00Z".into();
2122 upsert_task_from_event(&conn, &close, "ph").unwrap();
2123 let mut c2 = make_open_event("c2", "C2");
2124 c2.meta = serde_json::json!({"title": "C2", "parent_id": "p"});
2125 upsert_task_from_event(&conn, &c2, "ph").unwrap();
2126
2127 assert_eq!(count_open_children(&conn, "p").unwrap(), 1); }
2129}