1use anyhow::Context;
2use rusqlite::Connection;
3use std::collections::HashSet;
4use std::path::Path;
5
6struct Migration {
10 version: i64,
11 sql: &'static str,
12}
13
14const MIGRATION_001: &str = r#"
15CREATE TABLE IF NOT EXISTS tasks (
16 task_id TEXT PRIMARY KEY,
17 title TEXT NOT NULL,
18 status TEXT NOT NULL,
19 project_hash TEXT NOT NULL,
20 opened_at TEXT NOT NULL,
21 closed_at TEXT,
22 last_event_at TEXT NOT NULL
23);
24CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_hash, last_event_at DESC);
25
26CREATE TABLE IF NOT EXISTS events_index (
27 event_id TEXT PRIMARY KEY,
28 task_id TEXT NOT NULL,
29 type TEXT NOT NULL,
30 timestamp TEXT NOT NULL,
31 confidence REAL,
32 status TEXT NOT NULL
33);
34CREATE INDEX IF NOT EXISTS idx_events_task_time ON events_index(task_id, timestamp DESC);
35
36CREATE TABLE IF NOT EXISTS decisions (
37 decision_id TEXT PRIMARY KEY,
38 task_id TEXT NOT NULL,
39 text TEXT NOT NULL,
40 status TEXT NOT NULL,
41 superseded_by TEXT
42);
43
44CREATE TABLE IF NOT EXISTS evidence (
45 evidence_id TEXT PRIMARY KEY,
46 task_id TEXT NOT NULL,
47 text TEXT NOT NULL,
48 strength TEXT NOT NULL,
49 refers_to_decision_id TEXT
50);
51
52CREATE TABLE IF NOT EXISTS task_pack_cache (
53 task_id TEXT NOT NULL,
54 mode TEXT NOT NULL,
55 text TEXT NOT NULL,
56 generated_at TEXT NOT NULL,
57 source_event_count INTEGER NOT NULL,
58 PRIMARY KEY (task_id, mode)
59);
60
61CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
62 task_id UNINDEXED,
63 event_id UNINDEXED,
64 text,
65 type
66);
67"#;
68
69const MIGRATION_002: &str = r#"
74CREATE TABLE IF NOT EXISTS index_state (
75 project_hash TEXT PRIMARY KEY,
76 last_indexed_event_id TEXT NOT NULL,
77 updated_at TEXT NOT NULL
78);
79"#;
80
81const MIGRATION_003: &str = r#"
86ALTER TABLE tasks ADD COLUMN goal TEXT;
87ALTER TABLE tasks ADD COLUMN outcome TEXT;
88ALTER TABLE tasks ADD COLUMN outcome_tag TEXT;
89ALTER TABLE tasks ADD COLUMN external TEXT;
90ALTER TABLE events_index ADD COLUMN artifacts TEXT;
91DELETE FROM task_pack_cache;
92"#;
93
94const MIGRATION_004: &str = r#"
100DELETE FROM task_pack_cache;
101"#;
102
103const MIGRATION_005: &str = r#"
106CREATE TABLE IF NOT EXISTS dream_state (
107 project_hash TEXT PRIMARY KEY,
108 last_dream_at TEXT NOT NULL,
109 updated_at TEXT NOT NULL
110);
111"#;
112
113const MIGRATION_006: &str = r#"
117ALTER TABLE tasks ADD COLUMN parent_id TEXT;
118CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_id);
119"#;
120
121const MIGRATION_007: &str = r#"
127ALTER TABLE decisions ADD COLUMN alternatives TEXT;
128DELETE FROM task_pack_cache;
129"#;
130
131const MIGRATION_008: &str = r#"
139CREATE TABLE IF NOT EXISTS embeddings (
140 event_id TEXT PRIMARY KEY,
141 task_id TEXT NOT NULL,
142 project_hash TEXT NOT NULL,
143 tier TEXT NOT NULL DEFAULT 'episodic',
144 model TEXT NOT NULL,
145 dim INTEGER NOT NULL,
146 vec BLOB NOT NULL,
147 created_at TEXT NOT NULL
148);
149CREATE INDEX IF NOT EXISTS idx_emb_project_tier ON embeddings(project_hash, tier);
150ALTER TABLE events_index ADD COLUMN memory_tier TEXT NOT NULL DEFAULT 'episodic';
151"#;
152
153const MIGRATIONS: &[Migration] = &[
156 Migration {
157 version: 1,
158 sql: MIGRATION_001,
159 },
160 Migration {
161 version: 2,
162 sql: MIGRATION_002,
163 },
164 Migration {
165 version: 3,
166 sql: MIGRATION_003,
167 },
168 Migration {
169 version: 4,
170 sql: MIGRATION_004,
171 },
172 Migration {
173 version: 5,
174 sql: MIGRATION_005,
175 },
176 Migration {
177 version: 6,
178 sql: MIGRATION_006,
179 },
180 Migration {
181 version: 7,
182 sql: MIGRATION_007,
183 },
184 Migration {
185 version: 8,
186 sql: MIGRATION_008,
187 },
188];
189
190fn apply_migrations(conn: &Connection) -> anyhow::Result<()> {
191 conn.execute_batch(
192 "CREATE TABLE IF NOT EXISTS schema_migrations (
193 version INTEGER PRIMARY KEY,
194 applied_at TEXT NOT NULL
195 )",
196 )
197 .context("create schema_migrations table")?;
198
199 let applied: HashSet<i64> = {
200 let mut stmt = conn
201 .prepare("SELECT version FROM schema_migrations")
202 .context("select applied versions")?;
203 let rows = stmt
204 .query_map([], |r| r.get::<_, i64>(0))
205 .context("iterate schema_migrations")?;
206 rows.collect::<rusqlite::Result<HashSet<_>>>()
207 .context("collect applied versions")?
208 };
209
210 for migration in MIGRATIONS {
211 if applied.contains(&migration.version) {
212 continue;
213 }
214 conn.execute_batch(migration.sql)
215 .with_context(|| format!("apply schema migration v{:03}", migration.version))?;
216 conn.execute(
217 "INSERT INTO schema_migrations(version, applied_at) VALUES (?1, ?2)",
218 rusqlite::params![
219 migration.version,
220 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
221 ],
222 )
223 .with_context(|| {
224 format!(
225 "record schema migration v{:03} as applied",
226 migration.version
227 )
228 })?;
229 }
230 Ok(())
231}
232
233use crate::event::{Event, EventType};
234
235pub fn upsert_task_from_event(
236 conn: &Connection,
237 event: &Event,
238 project_hash: &str,
239) -> anyhow::Result<()> {
240 match event.event_type {
241 EventType::Open => {
242 let title = event
243 .meta
244 .get("title")
245 .and_then(|v| v.as_str())
246 .unwrap_or(&event.text)
247 .to_string();
248 let parent_id = event
249 .meta
250 .get("parent_id")
251 .and_then(|v| v.as_str())
252 .map(str::to_string);
253 conn.execute(
256 "INSERT INTO tasks(task_id, title, status, project_hash, opened_at, last_event_at, parent_id)
257 VALUES (?1, ?2, 'open', ?3, ?4, ?4, ?5)
258 ON CONFLICT(task_id) DO UPDATE SET last_event_at = ?4",
259 rusqlite::params![event.task_id, title, project_hash, event.timestamp, parent_id],
260 )?;
261 }
262 EventType::Close => {
263 conn.execute(
264 "UPDATE tasks SET status='closed', closed_at=?2, last_event_at=?2 WHERE task_id=?1",
265 rusqlite::params![event.task_id, event.timestamp],
266 )?;
267 }
268 EventType::Reopen => {
269 conn.execute(
270 "UPDATE tasks SET status='open', closed_at=NULL, last_event_at=?2 WHERE task_id=?1",
271 rusqlite::params![event.task_id, event.timestamp],
272 )?;
273 }
274 _ => {
275 conn.execute(
276 "UPDATE tasks SET last_event_at=?2 WHERE task_id=?1",
277 rusqlite::params![event.task_id, event.timestamp],
278 )?;
279 }
280 }
281 Ok(())
282}
283
284use std::io::BufRead;
285
286pub fn list_all_projects(state_dir: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
287 let dir = state_dir.as_ref();
288 if !dir.exists() {
289 return Ok(vec![]);
290 }
291 let mut out = Vec::new();
292 for entry in std::fs::read_dir(dir)? {
293 let entry = entry?;
294 let path = entry.path();
295 if path.extension().and_then(|e| e.to_str()) == Some("sqlite") {
296 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
297 out.push(stem.to_string());
298 }
299 }
300 }
301 Ok(out)
302}
303
304pub fn rebuild_state(
305 conn: &Connection,
306 jsonl_path: impl AsRef<Path>,
307 project_hash: &str,
308) -> anyhow::Result<usize> {
309 let f = std::fs::File::open(&jsonl_path)
310 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
311 let reader = std::io::BufReader::new(f);
312
313 let tx = conn.unchecked_transaction()?;
314 let mut count = 0;
315 let mut last_event_id: Option<String> = None;
316 for (i, line) in reader.lines().enumerate() {
317 let line = line.with_context(|| format!("read line {i}"))?;
318 if line.trim().is_empty() {
319 continue;
320 }
321 let event: Event = match serde_json::from_str(&line) {
325 Ok(e) => e,
326 Err(err) => {
327 tracing::warn!(
328 line_number = i + 1,
329 error = %err,
330 "skipping malformed JSONL line in rebuild_state"
331 );
332 continue;
333 }
334 };
335 upsert_task_from_event(&tx, &event, project_hash)?;
336 index_event(&tx, &event)?;
337 last_event_id = Some(event.event_id.clone());
338 count += 1;
339 }
340 if let Some(eid) = last_event_id.as_deref() {
341 record_last_indexed(&tx, project_hash, eid)?;
342 }
343 tx.commit()?;
344 Ok(count)
345}
346
347pub fn task_exists(conn: &Connection, task_id: &str) -> anyhow::Result<bool> {
352 let count: i64 = conn.query_row(
353 "SELECT COUNT(*) FROM tasks WHERE task_id = ?1",
354 rusqlite::params![task_id],
355 |r| r.get(0),
356 )?;
357 Ok(count > 0)
358}
359
360pub fn task_status(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
364 let mut stmt = conn.prepare("SELECT status FROM tasks WHERE task_id = ?1")?;
365 let mut rows = stmt.query(rusqlite::params![task_id])?;
366 Ok(rows.next()?.map(|r| r.get::<_, String>(0)).transpose()?)
367}
368
369pub fn set_task_goal(conn: &Connection, task_id: &str, goal: &str) -> anyhow::Result<()> {
373 conn.execute(
374 "UPDATE tasks SET goal = ?1 WHERE task_id = ?2",
375 rusqlite::params![goal, task_id],
376 )
377 .with_context(|| format!("set goal for {task_id}"))?;
378 conn.execute(
381 "DELETE FROM task_pack_cache WHERE task_id = ?1",
382 rusqlite::params![task_id],
383 )?;
384 Ok(())
385}
386
387pub fn set_task_outcome(
391 conn: &Connection,
392 task_id: &str,
393 outcome: &str,
394 outcome_tag: Option<&str>,
395) -> anyhow::Result<()> {
396 conn.execute(
397 "UPDATE tasks SET outcome = ?1, outcome_tag = ?2 WHERE task_id = ?3",
398 rusqlite::params![outcome, outcome_tag, task_id],
399 )
400 .with_context(|| format!("set outcome for {task_id}"))?;
401 conn.execute(
402 "DELETE FROM task_pack_cache WHERE task_id = ?1",
403 rusqlite::params![task_id],
404 )?;
405 Ok(())
406}
407
408pub fn add_task_external(conn: &Connection, task_id: &str, reference: &str) -> anyhow::Result<()> {
413 let current: Option<String> = conn
414 .query_row(
415 "SELECT external FROM tasks WHERE task_id = ?1",
416 rusqlite::params![task_id],
417 |r| r.get::<_, Option<String>>(0),
418 )
419 .with_context(|| format!("read external for {task_id}"))?;
420 let next = match current {
421 Some(s) if !s.is_empty() => format!("{s},{reference}"),
422 _ => reference.to_string(),
423 };
424 conn.execute(
425 "UPDATE tasks SET external = ?1 WHERE task_id = ?2",
426 rusqlite::params![next, task_id],
427 )?;
428 conn.execute(
429 "DELETE FROM task_pack_cache WHERE task_id = ?1",
430 rusqlite::params![task_id],
431 )?;
432 Ok(())
433}
434
435#[derive(Debug, Clone, Default)]
438pub struct TaskMetadata {
439 pub goal: Option<String>,
440 pub outcome: Option<String>,
441 pub outcome_tag: Option<String>,
442 pub external: Option<String>,
443}
444
445pub fn task_metadata(conn: &Connection, task_id: &str) -> anyhow::Result<Option<TaskMetadata>> {
446 let mut stmt =
447 conn.prepare("SELECT goal, outcome, outcome_tag, external FROM tasks WHERE task_id = ?1")?;
448 let mut rows = stmt.query(rusqlite::params![task_id])?;
449 Ok(match rows.next()? {
450 Some(r) => Some(TaskMetadata {
451 goal: r.get::<_, Option<String>>(0)?,
452 outcome: r.get::<_, Option<String>>(1)?,
453 outcome_tag: r.get::<_, Option<String>>(2)?,
454 external: r.get::<_, Option<String>>(3)?,
455 }),
456 None => None,
457 })
458}
459
460#[derive(Debug, Clone)]
463pub struct StaleTask {
464 pub task_id: String,
465 pub title: String,
466 pub last_event_at: String,
467 pub days_idle: i64,
468}
469
470pub fn stale_tasks(conn: &Connection, days: i64) -> anyhow::Result<Vec<StaleTask>> {
473 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
474 let cutoff_str = cutoff.to_rfc3339();
475 let mut stmt = conn.prepare(
476 "SELECT task_id, title, last_event_at FROM tasks
477 WHERE status = 'open' AND last_event_at < ?1
478 ORDER BY last_event_at ASC",
479 )?;
480 let rows = stmt.query_map(rusqlite::params![cutoff_str], |r| {
481 Ok((
482 r.get::<_, String>(0)?,
483 r.get::<_, String>(1)?,
484 r.get::<_, String>(2)?,
485 ))
486 })?;
487 let now = chrono::Utc::now();
488 let mut out = Vec::new();
489 for row in rows {
490 let (task_id, title, last_at) = row?;
491 let dt = chrono::DateTime::parse_from_rfc3339(&last_at)
492 .map(|d| d.with_timezone(&chrono::Utc))
493 .unwrap_or(now);
494 let days_idle = (now - dt).num_days();
495 out.push(StaleTask {
496 task_id,
497 title,
498 last_event_at: last_at,
499 days_idle,
500 });
501 }
502 Ok(out)
503}
504
505#[derive(Debug, Clone)]
510pub struct RelatedTask {
511 pub task_id: String,
512 pub status: String,
513 pub score: f64,
514}
515
516pub fn find_related_tasks(
527 conn: &Connection,
528 arts: &crate::artifacts::Artifacts,
529) -> anyhow::Result<Vec<RelatedTask>> {
530 use std::collections::HashMap;
531 if arts.is_empty() {
532 return Ok(Vec::new());
533 }
534 let mut scores: HashMap<String, f64> = HashMap::new();
535 let mut last_seen: HashMap<String, String> = HashMap::new();
536
537 let needles: Vec<(String, f64)> = arts
538 .linked_issues
539 .iter()
540 .map(|s| (s.clone(), 1.0))
541 .chain(arts.commit_hashes.iter().map(|s| (s.clone(), 0.8)))
542 .chain(arts.files.iter().map(|s| (s.clone(), 0.3)))
543 .collect();
544
545 for (needle, weight) in needles {
546 let pattern = format!("%\"{}\"%", needle.replace('%', "\\%"));
547 let mut stmt = conn.prepare(
548 "SELECT DISTINCT task_id, MAX(timestamp) as ts FROM events_index
549 WHERE artifacts LIKE ?1
550 GROUP BY task_id
551 ORDER BY ts DESC",
552 )?;
553 let rows = stmt.query_map(rusqlite::params![pattern], |r| {
554 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
555 })?;
556 for row in rows {
557 let (id, ts) = row?;
558 *scores.entry(id.clone()).or_insert(0.0) += weight;
559 last_seen.insert(id, ts);
560 }
561 }
562
563 let mut out: Vec<RelatedTask> = Vec::with_capacity(scores.len());
564 for (id, score) in scores {
565 let status: Option<String> = conn
566 .query_row(
567 "SELECT status FROM tasks WHERE task_id = ?1",
568 rusqlite::params![&id],
569 |r| r.get(0),
570 )
571 .ok();
572 if let Some(status) = status {
573 out.push(RelatedTask {
574 task_id: id,
575 status,
576 score,
577 });
578 }
579 }
580 out.sort_by(|a, b| {
581 b.score
582 .partial_cmp(&a.score)
583 .unwrap_or(std::cmp::Ordering::Equal)
584 .then_with(|| {
585 let ts_a = last_seen.get(&a.task_id).cloned().unwrap_or_default();
586 let ts_b = last_seen.get(&b.task_id).cloned().unwrap_or_default();
587 ts_b.cmp(&ts_a)
588 })
589 });
590 Ok(out)
591}
592
593pub fn find_tasks_by_linked_issues(
600 conn: &Connection,
601 issues: &[String],
602) -> anyhow::Result<Vec<(String, String)>> {
603 if issues.is_empty() {
604 return Ok(Vec::new());
605 }
606 let mut candidate_ids: Vec<String> = Vec::new();
612 for issue in issues {
613 let pattern = format!("%\"{}\"%", issue.replace('%', "\\%"));
614 let mut stmt = conn.prepare(
615 "SELECT DISTINCT task_id FROM events_index
616 WHERE artifacts LIKE ?1
617 ORDER BY timestamp DESC",
618 )?;
619 let rows = stmt.query_map(rusqlite::params![pattern], |r| r.get::<_, String>(0))?;
620 for r in rows {
621 let id = r?;
622 if !candidate_ids.contains(&id) {
623 candidate_ids.push(id);
624 }
625 }
626 }
627 let mut out = Vec::with_capacity(candidate_ids.len());
629 for id in candidate_ids {
630 let status: Option<String> = conn
631 .query_row(
632 "SELECT status FROM tasks WHERE task_id = ?1",
633 rusqlite::params![&id],
634 |r| r.get(0),
635 )
636 .ok();
637 if let Some(s) = status {
638 out.push((id, s));
639 }
640 }
641 Ok(out)
642}
643
644pub fn reclassify_task_artifacts(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
650 let mut stmt = conn.prepare(
651 "SELECT ei.event_id, COALESCE(sf.text, '') FROM events_index ei
652 LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
653 WHERE ei.task_id = ?1",
654 )?;
655 let rows: Vec<(String, String)> = stmt
656 .query_map(rusqlite::params![task_id], |r| {
657 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
658 })?
659 .collect::<Result<_, _>>()?;
660 let count = rows.len();
661 for (event_id, text) in rows {
662 let arts = crate::artifacts::extract(&text);
663 let json = if arts.is_empty() {
664 None
665 } else {
666 Some(serde_json::to_string(&arts)?)
667 };
668 conn.execute(
669 "UPDATE events_index SET artifacts = ?1 WHERE event_id = ?2",
670 rusqlite::params![json, event_id],
671 )?;
672 }
673 invalidate_pack_cascade(conn, task_id)?;
674 Ok(count)
675}
676
677pub fn task_artifacts(
683 conn: &Connection,
684 task_id: &str,
685) -> anyhow::Result<crate::artifacts::Artifacts> {
686 let mut stmt = conn.prepare(
687 "SELECT artifacts FROM events_index
688 WHERE task_id = ?1 AND artifacts IS NOT NULL
689 ORDER BY timestamp ASC",
690 )?;
691 let rows = stmt.query_map(rusqlite::params![task_id], |r| r.get::<_, String>(0))?;
692 let mut acc = crate::artifacts::Artifacts::default();
693 for row in rows {
694 let json = row?;
695 if let Ok(parsed) = serde_json::from_str::<crate::artifacts::Artifacts>(&json) {
696 acc.merge(parsed);
697 }
698 }
699 Ok(acc)
700}
701
702fn last_indexed_event_id(conn: &Connection, project_hash: &str) -> anyhow::Result<Option<String>> {
706 let mut stmt =
707 conn.prepare("SELECT last_indexed_event_id FROM index_state WHERE project_hash = ?1")?;
708 let mut rows = stmt.query(rusqlite::params![project_hash])?;
709 if let Some(row) = rows.next()? {
710 Ok(Some(row.get::<_, String>(0)?))
711 } else {
712 Ok(None)
713 }
714}
715
716fn record_last_indexed(
717 conn: &Connection,
718 project_hash: &str,
719 event_id: &str,
720) -> anyhow::Result<()> {
721 conn.execute(
722 "INSERT INTO index_state(project_hash, last_indexed_event_id, updated_at)
723 VALUES (?1, ?2, ?3)
724 ON CONFLICT(project_hash) DO UPDATE SET
725 last_indexed_event_id = excluded.last_indexed_event_id,
726 updated_at = excluded.updated_at",
727 rusqlite::params![
728 project_hash,
729 event_id,
730 chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
731 ],
732 )?;
733 Ok(())
734}
735
736pub fn ingest_new_events(
746 conn: &Connection,
747 jsonl_path: impl AsRef<Path>,
748 project_hash: &str,
749) -> anyhow::Result<usize> {
750 let marker = match last_indexed_event_id(conn, project_hash)? {
751 Some(id) => id,
752 None => return rebuild_state(conn, jsonl_path, project_hash),
753 };
754
755 let f = std::fs::File::open(&jsonl_path)
756 .with_context(|| format!("open {:?}", jsonl_path.as_ref()))?;
757 let reader = std::io::BufReader::new(f);
758
759 let tx = conn.unchecked_transaction()?;
763 let mut found_marker = false;
764 let mut count = 0;
765 let mut last_event_id: Option<String> = None;
766 for (i, line) in reader.lines().enumerate() {
767 let line = line.with_context(|| format!("read line {i}"))?;
768 if line.trim().is_empty() {
769 continue;
770 }
771 let event: Event = match serde_json::from_str(&line) {
772 Ok(e) => e,
773 Err(err) => {
774 tracing::warn!(
775 line_number = i + 1,
776 error = %err,
777 "skipping malformed JSONL line in ingest_new_events"
778 );
779 continue;
780 }
781 };
782 if !found_marker {
783 if event.event_id == marker {
784 found_marker = true;
785 }
786 continue;
787 }
788 upsert_task_from_event(&tx, &event, project_hash)?;
789 index_event(&tx, &event)?;
790 last_event_id = Some(event.event_id.clone());
791 count += 1;
792 }
793
794 if !found_marker {
795 drop(tx);
797 tracing::warn!(
798 project_hash = project_hash,
799 marker = marker.as_str(),
800 "last_indexed_event_id not found in JSONL — falling back to full rebuild"
801 );
802 return rebuild_state(conn, jsonl_path, project_hash);
803 }
804
805 if let Some(eid) = last_event_id.as_deref() {
806 record_last_indexed(&tx, project_hash, eid)?;
807 }
808 tx.commit()?;
809 Ok(count)
810}
811
812pub fn index_event(conn: &Connection, event: &Event) -> anyhow::Result<()> {
813 let type_str = serde_json::to_value(event.event_type)?
814 .as_str()
815 .unwrap()
816 .to_string();
817 let status_str = serde_json::to_value(event.status)?
818 .as_str()
819 .unwrap()
820 .to_string();
821 let artifacts = crate::artifacts::extract(&event.text);
826 let artifacts_json = if artifacts.is_empty() {
827 None
828 } else {
829 Some(serde_json::to_string(&artifacts)?)
830 };
831 conn.execute(
832 "INSERT OR REPLACE INTO events_index(event_id, task_id, type, timestamp, confidence, status, artifacts)
833 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
834 rusqlite::params![
835 event.event_id, event.task_id, type_str,
836 event.timestamp, event.confidence, status_str, artifacts_json
837 ],
838 )?;
839 conn.execute(
841 "DELETE FROM search_fts WHERE event_id=?1",
842 rusqlite::params![event.event_id],
843 )?;
844 conn.execute(
845 "INSERT INTO search_fts(task_id, event_id, text, type) VALUES (?1, ?2, ?3, ?4)",
846 rusqlite::params![event.task_id, event.event_id, event.text, type_str],
847 )?;
848
849 if event.event_type == EventType::Decision {
850 let alternatives_json = match event.meta.get("alternatives") {
854 Some(v) if !v.is_null() => Some(serde_json::to_string(v)?),
855 _ => None,
856 };
857 conn.execute(
858 "INSERT OR REPLACE INTO decisions(decision_id, task_id, text, status, alternatives)
859 VALUES (?1, ?2, ?3, 'active', ?4)",
860 rusqlite::params![event.event_id, event.task_id, event.text, alternatives_json],
861 )?;
862 }
863
864 if event.event_type == EventType::Supersede {
865 if let Some(target) = &event.supersedes {
866 conn.execute(
867 "UPDATE decisions SET status='superseded', superseded_by=?1 WHERE decision_id=?2",
868 rusqlite::params![event.event_id, target],
869 )?;
870 }
871 }
872
873 if event.event_type == EventType::Evidence {
874 let strength_str = event
875 .evidence_strength
876 .map(|s| {
877 serde_json::to_value(s)
878 .unwrap()
879 .as_str()
880 .unwrap()
881 .to_string()
882 })
883 .unwrap_or_else(|| "medium".into());
884 conn.execute(
885 "INSERT OR REPLACE INTO evidence(evidence_id, task_id, text, strength)
886 VALUES (?1, ?2, ?3, ?4)",
887 rusqlite::params![event.event_id, event.task_id, event.text, strength_str],
888 )?;
889 }
890
891 invalidate_pack_cascade(conn, &event.task_id)?;
894
895 Ok(())
896}
897
898pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Connection> {
899 if let Some(parent) = path.as_ref().parent() {
900 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
901 }
902 let conn =
903 Connection::open(&path).with_context(|| format!("open SQLite at {:?}", path.as_ref()))?;
904 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
905 apply_migrations(&conn).context("apply schema migrations")?;
906 Ok(conn)
907}
908
909#[derive(Debug, Clone)]
913pub struct TaskRow {
914 pub task_id: String,
915 pub title: String,
916 pub status: String,
917 pub last_event_at: String,
918 pub event_count: usize,
919}
920
921pub fn list_tasks_by_project(
925 conn: &Connection,
926 project_hash: &str,
927) -> anyhow::Result<Vec<TaskRow>> {
928 let mut stmt = conn.prepare(
929 "SELECT t.task_id, t.title, t.status, t.last_event_at,
930 COALESCE(c.cnt, 0) AS event_count
931 FROM tasks t
932 LEFT JOIN (
933 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
934 ) c ON c.task_id = t.task_id
935 WHERE t.project_hash = ?1
936 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
937 )?;
938 let rows = stmt
939 .query_map(rusqlite::params![project_hash], |r| {
940 Ok(TaskRow {
941 task_id: r.get::<_, String>(0)?,
942 title: r.get::<_, String>(1)?,
943 status: r.get::<_, String>(2)?,
944 last_event_at: r.get::<_, String>(3)?,
945 event_count: r.get::<_, i64>(4)? as usize,
946 })
947 })?
948 .collect::<Result<Vec<_>, _>>()?;
949 Ok(rows)
950}
951
952pub fn top_level_tasks(conn: &Connection, project_hash: &str) -> anyhow::Result<Vec<TaskRow>> {
956 let mut stmt = conn.prepare(
957 "SELECT t.task_id, t.title, t.status, t.last_event_at,
958 COALESCE(c.cnt, 0) AS event_count
959 FROM tasks t
960 LEFT JOIN (
961 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
962 ) c ON c.task_id = t.task_id
963 WHERE t.project_hash = ?1 AND t.parent_id IS NULL
964 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
965 )?;
966 let rows = stmt
967 .query_map(rusqlite::params![project_hash], |r| {
968 Ok(TaskRow {
969 task_id: r.get::<_, String>(0)?,
970 title: r.get::<_, String>(1)?,
971 status: r.get::<_, String>(2)?,
972 last_event_at: r.get::<_, String>(3)?,
973 event_count: r.get::<_, i64>(4)? as usize,
974 })
975 })?
976 .collect::<Result<Vec<_>, _>>()?;
977 Ok(rows)
978}
979
980pub fn children_of(conn: &Connection, task_id: &str) -> anyhow::Result<Vec<TaskRow>> {
982 let mut stmt = conn.prepare(
983 "SELECT t.task_id, t.title, t.status, t.last_event_at,
984 COALESCE(c.cnt, 0) AS event_count
985 FROM tasks t
986 LEFT JOIN (
987 SELECT task_id, COUNT(*) AS cnt FROM events_index GROUP BY task_id
988 ) c ON c.task_id = t.task_id
989 WHERE t.parent_id = ?1
990 ORDER BY (t.status = 'open') DESC, t.last_event_at DESC",
991 )?;
992 let rows = stmt
993 .query_map(rusqlite::params![task_id], |r| {
994 Ok(TaskRow {
995 task_id: r.get::<_, String>(0)?,
996 title: r.get::<_, String>(1)?,
997 status: r.get::<_, String>(2)?,
998 last_event_at: r.get::<_, String>(3)?,
999 event_count: r.get::<_, i64>(4)? as usize,
1000 })
1001 })?
1002 .collect::<Result<Vec<_>, _>>()?;
1003 Ok(rows)
1004}
1005
1006pub fn parent_of(conn: &Connection, task_id: &str) -> anyhow::Result<Option<String>> {
1008 let mut stmt = conn.prepare("SELECT parent_id FROM tasks WHERE task_id = ?1")?;
1009 let mut rows = stmt.query(rusqlite::params![task_id])?;
1010 Ok(match rows.next()? {
1011 Some(r) => r.get::<_, Option<String>>(0)?,
1012 None => None,
1013 })
1014}
1015
1016pub fn would_create_cycle(
1021 conn: &Connection,
1022 task_id: &str,
1023 new_parent: &str,
1024) -> anyhow::Result<bool> {
1025 if task_id == new_parent {
1026 return Ok(true);
1027 }
1028 let mut cursor = Some(new_parent.to_string());
1029 for _ in 0..64 {
1030 let Some(cur) = cursor else {
1031 return Ok(false);
1032 };
1033 if cur == task_id {
1034 return Ok(true);
1035 }
1036 cursor = parent_of(conn, &cur)?;
1037 }
1038 Ok(true)
1040}
1041
1042pub fn count_open_children(conn: &Connection, task_id: &str) -> anyhow::Result<usize> {
1044 let n: i64 = conn.query_row(
1045 "SELECT COUNT(*) FROM tasks WHERE parent_id = ?1 AND status = 'open'",
1046 rusqlite::params![task_id],
1047 |r| r.get(0),
1048 )?;
1049 Ok(n as usize)
1050}
1051
1052pub fn invalidate_pack_cascade(conn: &Connection, task_id: &str) -> anyhow::Result<()> {
1054 conn.execute(
1055 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1056 rusqlite::params![task_id],
1057 )?;
1058 if let Some(parent) = parent_of(conn, task_id)? {
1059 conn.execute(
1060 "DELETE FROM task_pack_cache WHERE task_id = ?1",
1061 rusqlite::params![parent],
1062 )?;
1063 }
1064 Ok(())
1065}
1066
1067pub struct PendingEmbed {
1073 pub event_id: String,
1074 pub task_id: String,
1075 pub text: String,
1076}
1077
1078pub fn events_needing_embedding(
1082 conn: &Connection,
1083 model: &str,
1084 limit: usize,
1085) -> anyhow::Result<Vec<PendingEmbed>> {
1086 let mut stmt = conn.prepare(
1087 "SELECT f.event_id, f.task_id, f.text
1088 FROM search_fts f
1089 LEFT JOIN embeddings e ON e.event_id = f.event_id AND e.model = ?1
1090 WHERE e.event_id IS NULL
1091 LIMIT ?2",
1092 )?;
1093 let rows = stmt.query_map(rusqlite::params![model, limit as i64], |r| {
1094 Ok(PendingEmbed {
1095 event_id: r.get(0)?,
1096 task_id: r.get(1)?,
1097 text: r.get(2)?,
1098 })
1099 })?;
1100 let mut out = Vec::new();
1101 for r in rows {
1102 out.push(r?);
1103 }
1104 Ok(out)
1105}
1106
1107#[allow(clippy::too_many_arguments)]
1110pub fn upsert_embedding(
1111 conn: &Connection,
1112 event_id: &str,
1113 task_id: &str,
1114 project_hash: &str,
1115 tier: &str,
1116 model: &str,
1117 dim: usize,
1118 vec: &[f32],
1119 created_at: &str,
1120) -> anyhow::Result<()> {
1121 conn.execute(
1122 "INSERT OR REPLACE INTO embeddings(event_id, task_id, project_hash, tier, model, dim, vec, created_at)
1123 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1124 rusqlite::params![
1125 event_id,
1126 task_id,
1127 project_hash,
1128 tier,
1129 model,
1130 dim as i64,
1131 crate::embed::to_blob(vec),
1132 created_at
1133 ],
1134 )?;
1135 Ok(())
1136}
1137
1138pub fn count_embeddings(conn: &Connection, project_hash: &str) -> anyhow::Result<usize> {
1140 let n: i64 = conn.query_row(
1141 "SELECT COUNT(*) FROM embeddings WHERE project_hash = ?1",
1142 rusqlite::params![project_hash],
1143 |r| r.get(0),
1144 )?;
1145 Ok(n as usize)
1146}
1147
1148pub fn embed_pending(
1156 conn: &Connection,
1157 project_hash: &str,
1158 embedder: &dyn crate::embed::Embedder,
1159 created_at: &str,
1160 limit: usize,
1161) -> anyhow::Result<usize> {
1162 let pending = events_needing_embedding(conn, embedder.model_id(), limit)?;
1163 if pending.is_empty() {
1164 return Ok(0);
1165 }
1166 let texts: Vec<&str> = pending.iter().map(|p| p.text.as_str()).collect();
1167 let vecs = embedder.embed(&texts)?;
1168 let mut done = 0usize;
1169 for (p, v) in pending.iter().zip(vecs.iter()) {
1170 upsert_embedding(
1171 conn,
1172 &p.event_id,
1173 &p.task_id,
1174 project_hash,
1175 "episodic",
1176 embedder.model_id(),
1177 embedder.dim(),
1178 v,
1179 created_at,
1180 )?;
1181 done += 1;
1182 }
1183 Ok(done)
1184}
1185
1186pub struct ScoredHit {
1188 pub event_id: String,
1189 pub task_id: String,
1190 pub task_title: String,
1191 pub event_type: String,
1192 pub tier: String,
1193 pub text: String,
1194 pub score: f32,
1195}
1196
1197pub fn semantic_search(
1203 conn: &Connection,
1204 project_hash: &str,
1205 query_vec: &[f32],
1206 model: &str,
1207 k: usize,
1208) -> anyhow::Result<Vec<ScoredHit>> {
1209 let mut stmt = conn.prepare(
1210 "SELECT e.event_id, e.task_id, e.tier, e.vec, f.text, f.type,
1211 COALESCE(t.title, '')
1212 FROM embeddings e
1213 JOIN search_fts f ON f.event_id = e.event_id
1214 LEFT JOIN tasks t ON t.task_id = e.task_id
1215 WHERE e.project_hash = ?1 AND e.model = ?2",
1216 )?;
1217 let rows = stmt.query_map(rusqlite::params![project_hash, model], |r| {
1218 let blob: Vec<u8> = r.get(3)?;
1219 Ok((
1220 r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?, blob,
1224 r.get::<_, String>(4)?, r.get::<_, String>(5)?, r.get::<_, String>(6)?, ))
1228 })?;
1229
1230 let mut hits: Vec<ScoredHit> = Vec::new();
1231 for row in rows {
1232 let (event_id, task_id, tier, blob, text, event_type, task_title) = row?;
1233 let score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
1234 hits.push(ScoredHit {
1235 event_id,
1236 task_id,
1237 task_title,
1238 event_type,
1239 tier,
1240 text,
1241 score,
1242 });
1243 }
1244 hits.sort_by(|a, b| {
1245 b.score
1246 .partial_cmp(&a.score)
1247 .unwrap_or(std::cmp::Ordering::Equal)
1248 });
1249 hits.truncate(k);
1250 Ok(hits)
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use super::*;
1256 use crate::embed::Embedder;
1257 use tempfile::TempDir;
1258
1259 #[test]
1260 fn task_exists_returns_true_for_known_id_false_otherwise() {
1261 let d = TempDir::new().unwrap();
1262 let conn = open(d.path().join("s.sqlite")).unwrap();
1263
1264 assert!(!task_exists(&conn, "tj-nope").unwrap());
1265
1266 let e = make_open_event("tj-yes", "Hello");
1267 upsert_task_from_event(&conn, &e, "feedfacefeedface").unwrap();
1268 index_event(&conn, &e).unwrap();
1269
1270 assert!(task_exists(&conn, "tj-yes").unwrap());
1271 assert!(!task_exists(&conn, "tj-nope").unwrap());
1272 }
1273
1274 #[test]
1275 fn fresh_db_runs_all_migrations() {
1276 let d = TempDir::new().unwrap();
1277 let p = d.path().join("state.sqlite");
1278 let conn = open(&p).unwrap();
1279
1280 let applied: Vec<i64> = conn
1281 .prepare("SELECT version FROM schema_migrations ORDER BY version")
1282 .unwrap()
1283 .query_map([], |r| r.get::<_, i64>(0))
1284 .unwrap()
1285 .collect::<Result<_, _>>()
1286 .unwrap();
1287 assert_eq!(
1288 applied,
1289 (1..=MIGRATIONS.len() as i64).collect::<Vec<_>>(),
1290 "every declared migration must be recorded"
1291 );
1292 }
1293
1294 #[test]
1295 fn apply_migrations_is_idempotent_across_reopens() {
1296 let d = TempDir::new().unwrap();
1297 let p = d.path().join("state.sqlite");
1298 let _ = open(&p).unwrap();
1299 let _ = open(&p).unwrap();
1300
1301 let count: i64 = open(&p)
1302 .unwrap()
1303 .query_row("SELECT COUNT(*) FROM schema_migrations", [], |r| r.get(0))
1304 .unwrap();
1305 assert_eq!(
1306 count,
1307 MIGRATIONS.len() as i64,
1308 "schema_migrations must contain exactly one row per declared migration after repeated opens"
1309 );
1310 }
1311
1312 fn make_text_event(text: &str) -> crate::event::Event {
1313 crate::event::Event::new(
1314 "tj-x",
1315 crate::event::EventType::Finding,
1316 crate::event::Author::User,
1317 crate::event::Source::Cli,
1318 text.into(),
1319 )
1320 }
1321
1322 #[test]
1323 fn embed_pending_embeds_all_then_is_idempotent() {
1324 let d = TempDir::new().unwrap();
1325 let conn = open(d.path().join("s.sqlite")).unwrap();
1326 let ph = "feedfacefeedface";
1327
1328 for text in [
1329 "implement payment refund deduplication",
1330 "add validation for negative order amounts",
1331 ] {
1332 index_event(&conn, &make_text_event(text)).unwrap();
1333 }
1334
1335 let emb = crate::embed::HashEmbedder::new(64);
1336 let at = "2026-06-12T00:00:00Z";
1337
1338 let n = embed_pending(&conn, ph, &emb, at, 100).unwrap();
1339 assert_eq!(n, 2, "both events embedded on first pass");
1340 assert_eq!(count_embeddings(&conn, ph).unwrap(), 2);
1341
1342 assert_eq!(embed_pending(&conn, ph, &emb, at, 100).unwrap(), 0);
1344
1345 assert_eq!(
1348 events_needing_embedding(&conn, "other-model", 100)
1349 .unwrap()
1350 .len(),
1351 2
1352 );
1353 }
1354
1355 #[test]
1356 fn semantic_search_ranks_relevant_event_first() {
1357 let d = TempDir::new().unwrap();
1358 let conn = open(d.path().join("s.sqlite")).unwrap();
1359 let ph = "feedfacefeedface";
1360
1361 for text in [
1362 "fix duplicate payment refund write on partial refund",
1363 "update the frontend button hover color",
1364 "add a database index for faster user lookup",
1365 ] {
1366 index_event(&conn, &make_text_event(text)).unwrap();
1367 }
1368 let emb = crate::embed::HashEmbedder::new(256);
1369 embed_pending(&conn, ph, &emb, "t", 100).unwrap();
1370
1371 let q = emb.embed_one("payment refund duplicated").unwrap();
1372 let hits = semantic_search(&conn, ph, &q, emb.model_id(), 3).unwrap();
1373
1374 assert_eq!(hits.len(), 3);
1375 assert!(
1376 hits[0].text.contains("refund"),
1377 "the refund event must rank first, got: {}",
1378 hits[0].text
1379 );
1380 assert!(
1381 hits[0].score >= hits[1].score,
1382 "hits must be sorted by score desc"
1383 );
1384 }
1385
1386 #[test]
1387 fn open_creates_all_tables() {
1388 let d = TempDir::new().unwrap();
1389 let p = d.path().join("state.sqlite");
1390 let conn = open(&p).unwrap();
1391
1392 let names: Vec<String> = conn
1393 .prepare("SELECT name FROM sqlite_master WHERE type='table' OR type='virtual table' ORDER BY name")
1394 .unwrap()
1395 .query_map([], |r| r.get::<_, String>(0))
1396 .unwrap()
1397 .collect::<Result<_, _>>()
1398 .unwrap();
1399
1400 for required in [
1401 "decisions",
1402 "events_index",
1403 "evidence",
1404 "task_pack_cache",
1405 "tasks",
1406 "search_fts",
1407 ] {
1408 assert!(
1409 names.iter().any(|n| n == required),
1410 "missing table {required}, have {names:?}"
1411 );
1412 }
1413 }
1414
1415 #[test]
1416 fn open_is_idempotent() {
1417 let d = TempDir::new().unwrap();
1418 let p = d.path().join("state.sqlite");
1419 let _ = open(&p).unwrap();
1420 let _ = open(&p).unwrap();
1421 }
1422
1423 #[test]
1424 fn index_event_projects_evidence() {
1425 let d = TempDir::new().unwrap();
1426 let conn = open(d.path().join("s.sqlite")).unwrap();
1427 let mut open_e = crate::event::Event::new(
1428 "tj-e",
1429 crate::event::EventType::Open,
1430 crate::event::Author::User,
1431 crate::event::Source::Cli,
1432 "x".into(),
1433 );
1434 open_e.meta = serde_json::json!({"title": "T"});
1435 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1436 index_event(&conn, &open_e).unwrap();
1437
1438 let mut ev = crate::event::Event::new(
1439 "tj-e",
1440 crate::event::EventType::Evidence,
1441 crate::event::Author::Agent,
1442 crate::event::Source::Chat,
1443 "Hook startup measured at 12ms".into(),
1444 );
1445 ev.evidence_strength = Some(crate::event::EvidenceStrength::Strong);
1446 upsert_task_from_event(&conn, &ev, "feedface").unwrap();
1447 index_event(&conn, &ev).unwrap();
1448
1449 let (text, strength): (String, String) = conn
1450 .query_row(
1451 "SELECT text, strength FROM evidence WHERE task_id=?1",
1452 rusqlite::params!["tj-e"],
1453 |r| Ok((r.get(0)?, r.get(1)?)),
1454 )
1455 .unwrap();
1456 assert!(text.contains("12ms"));
1457 assert_eq!(strength, "strong");
1458 }
1459
1460 #[test]
1461 fn supersede_event_marks_decision_superseded() {
1462 let d = TempDir::new().unwrap();
1463 let conn = open(d.path().join("s.sqlite")).unwrap();
1464 let mut open_e = crate::event::Event::new(
1465 "tj-s",
1466 crate::event::EventType::Open,
1467 crate::event::Author::User,
1468 crate::event::Source::Cli,
1469 "x".into(),
1470 );
1471 open_e.meta = serde_json::json!({"title": "T"});
1472 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1473 index_event(&conn, &open_e).unwrap();
1474
1475 let dec = crate::event::Event::new(
1476 "tj-s",
1477 crate::event::EventType::Decision,
1478 crate::event::Author::Agent,
1479 crate::event::Source::Chat,
1480 "Use TS".into(),
1481 );
1482 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1483 index_event(&conn, &dec).unwrap();
1484
1485 let mut sup = crate::event::Event::new(
1486 "tj-s",
1487 crate::event::EventType::Supersede,
1488 crate::event::Author::Agent,
1489 crate::event::Source::Chat,
1490 "Replaced by Rust decision".into(),
1491 );
1492 sup.supersedes = Some(dec.event_id.clone());
1493 upsert_task_from_event(&conn, &sup, "feedface").unwrap();
1494 index_event(&conn, &sup).unwrap();
1495
1496 let (status, by): (String, Option<String>) = conn
1497 .query_row(
1498 "SELECT status, superseded_by FROM decisions WHERE decision_id=?1",
1499 rusqlite::params![dec.event_id],
1500 |r| Ok((r.get(0)?, r.get(1)?)),
1501 )
1502 .unwrap();
1503 assert_eq!(status, "superseded");
1504 assert_eq!(by.as_deref(), Some(sup.event_id.as_str()));
1505 }
1506
1507 #[test]
1508 fn index_event_projects_decision_to_decisions_table() {
1509 let d = TempDir::new().unwrap();
1510 let conn = open(d.path().join("s.sqlite")).unwrap();
1511
1512 let mut open_e = crate::event::Event::new(
1513 "tj-d",
1514 crate::event::EventType::Open,
1515 crate::event::Author::User,
1516 crate::event::Source::Cli,
1517 "x".into(),
1518 );
1519 open_e.meta = serde_json::json!({"title": "T"});
1520 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1521 index_event(&conn, &open_e).unwrap();
1522
1523 let dec = crate::event::Event::new(
1524 "tj-d",
1525 crate::event::EventType::Decision,
1526 crate::event::Author::Agent,
1527 crate::event::Source::Chat,
1528 "Adopt Rust".into(),
1529 );
1530 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1531 index_event(&conn, &dec).unwrap();
1532
1533 let (id, text, status): (String, String, String) = conn
1534 .query_row(
1535 "SELECT decision_id, text, status FROM decisions WHERE task_id=?1",
1536 rusqlite::params!["tj-d"],
1537 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1538 )
1539 .unwrap();
1540 assert_eq!(id, dec.event_id);
1541 assert_eq!(text, "Adopt Rust");
1542 assert_eq!(status, "active");
1543 }
1544
1545 #[test]
1546 fn index_event_projects_decision_alternatives_into_column() {
1547 let d = TempDir::new().unwrap();
1548 let conn = open(d.path().join("s.sqlite")).unwrap();
1549
1550 let mut dec = crate::event::Event::new(
1551 "tj-alt",
1552 crate::event::EventType::Decision,
1553 crate::event::Author::Agent,
1554 crate::event::Source::Chat,
1555 "Use SQLite".into(),
1556 );
1557 dec.meta = serde_json::json!({
1558 "alternatives": [
1559 {"option": "SQLite", "chosen": true, "rationale": "embedded, zero-ops"},
1560 {"option": "Postgres", "chosen": false, "rationale": "too heavy for local tool"}
1561 ]
1562 });
1563 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1564 index_event(&conn, &dec).unwrap();
1565
1566 let alts: Option<String> = conn
1567 .query_row(
1568 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1569 rusqlite::params![dec.event_id],
1570 |r| r.get(0),
1571 )
1572 .unwrap();
1573 let alts = alts.expect("alternatives column should be populated");
1574 let parsed: serde_json::Value = serde_json::from_str(&alts).unwrap();
1575 assert_eq!(parsed.as_array().unwrap().len(), 2);
1576 assert_eq!(parsed[0]["option"], "SQLite");
1577 assert_eq!(parsed[0]["chosen"], true);
1578 }
1579
1580 #[test]
1581 fn index_event_decision_without_alternatives_leaves_column_null() {
1582 let d = TempDir::new().unwrap();
1583 let conn = open(d.path().join("s.sqlite")).unwrap();
1584
1585 let dec = crate::event::Event::new(
1586 "tj-noalt",
1587 crate::event::EventType::Decision,
1588 crate::event::Author::Agent,
1589 crate::event::Source::Chat,
1590 "Plain decision".into(),
1591 );
1592 upsert_task_from_event(&conn, &dec, "feedface").unwrap();
1593 index_event(&conn, &dec).unwrap();
1594
1595 let alts: Option<String> = conn
1596 .query_row(
1597 "SELECT alternatives FROM decisions WHERE decision_id=?1",
1598 rusqlite::params![dec.event_id],
1599 |r| r.get(0),
1600 )
1601 .unwrap();
1602 assert!(alts.is_none());
1603 }
1604
1605 #[test]
1606 fn index_event_is_idempotent_no_search_fts_duplicates() {
1607 let d = TempDir::new().unwrap();
1608 let conn = open(d.path().join("s.sqlite")).unwrap();
1609 let mut open_e = crate::event::Event::new(
1610 "tj-id",
1611 crate::event::EventType::Open,
1612 crate::event::Author::User,
1613 crate::event::Source::Cli,
1614 "x".into(),
1615 );
1616 open_e.meta = serde_json::json!({"title": "Idempotent"});
1617 upsert_task_from_event(&conn, &open_e, "feedface").unwrap();
1618
1619 index_event(&conn, &open_e).unwrap();
1621 index_event(&conn, &open_e).unwrap();
1622 index_event(&conn, &open_e).unwrap();
1623
1624 let n: i64 = conn
1625 .query_row(
1626 "SELECT COUNT(*) FROM search_fts WHERE event_id=?1",
1627 rusqlite::params![open_e.event_id],
1628 |r| r.get(0),
1629 )
1630 .unwrap();
1631 assert_eq!(n, 1, "search_fts must hold exactly one row per event_id");
1632 }
1633
1634 #[test]
1635 fn list_all_projects_returns_hashes_from_state_dir() {
1636 use std::fs::File;
1637 let d = TempDir::new().unwrap();
1638 let state_dir = d.path().join("state");
1639 std::fs::create_dir_all(&state_dir).unwrap();
1640 File::create(state_dir.join("aaaa1111aaaa1111.sqlite")).unwrap();
1641 File::create(state_dir.join("bbbb2222bbbb2222.sqlite")).unwrap();
1642 File::create(state_dir.join("not-a-project.txt")).unwrap();
1643
1644 let mut hashes = list_all_projects(&state_dir).unwrap();
1645 hashes.sort();
1646 assert_eq!(hashes, vec!["aaaa1111aaaa1111", "bbbb2222bbbb2222"]);
1647 }
1648
1649 fn write_event_line(f: &mut std::fs::File, e: &crate::event::Event) {
1650 use std::io::Write;
1651 writeln!(f, "{}", serde_json::to_string(e).unwrap()).unwrap();
1652 }
1653
1654 fn make_open_event(task_id: &str, title: &str) -> crate::event::Event {
1655 let mut e = crate::event::Event::new(
1656 task_id,
1657 crate::event::EventType::Open,
1658 crate::event::Author::User,
1659 crate::event::Source::Cli,
1660 "x".into(),
1661 );
1662 e.meta = serde_json::json!({"title": title});
1663 e
1664 }
1665
1666 #[test]
1667 fn ingest_new_events_picks_up_only_new_lines() {
1668 let d = TempDir::new().unwrap();
1669 let jsonl = d.path().join("events.jsonl");
1670 let db = d.path().join("s.sqlite");
1671 let project = "deadbeefdeadbeef";
1672
1673 let e1 = make_open_event("tj-i1", "first");
1674 let e2 = make_open_event("tj-i2", "second");
1675 let e3 = make_open_event("tj-i3", "third");
1676
1677 let mut f = std::fs::File::create(&jsonl).unwrap();
1678 write_event_line(&mut f, &e1);
1679 write_event_line(&mut f, &e2);
1680 write_event_line(&mut f, &e3);
1681 drop(f);
1682
1683 let conn = open(&db).unwrap();
1685 let n_first = ingest_new_events(&conn, &jsonl, project).unwrap();
1686 assert_eq!(n_first, 3);
1687
1688 let e4 = make_open_event("tj-i4", "fourth");
1690 let e5 = make_open_event("tj-i5", "fifth");
1691 let mut f = std::fs::OpenOptions::new()
1692 .append(true)
1693 .open(&jsonl)
1694 .unwrap();
1695 write_event_line(&mut f, &e4);
1696 write_event_line(&mut f, &e5);
1697 drop(f);
1698
1699 let n_second = ingest_new_events(&conn, &jsonl, project).unwrap();
1701 assert_eq!(n_second, 2, "incremental ingest must read only the tail");
1702
1703 let total: i64 = conn
1704 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1705 .unwrap();
1706 assert_eq!(total, 5);
1707
1708 let marker: String = conn
1709 .query_row(
1710 "SELECT last_indexed_event_id FROM index_state WHERE project_hash=?1",
1711 rusqlite::params![project],
1712 |r| r.get(0),
1713 )
1714 .unwrap();
1715 assert_eq!(marker, e5.event_id);
1716 }
1717
1718 #[test]
1719 fn ingest_new_events_falls_back_to_full_rebuild_when_marker_vanishes() {
1720 let d = TempDir::new().unwrap();
1721 let jsonl = d.path().join("events.jsonl");
1722 let db = d.path().join("s.sqlite");
1723 let project = "feedfacefeedface";
1724
1725 let e1 = make_open_event("tj-r1", "first");
1726 let mut f = std::fs::File::create(&jsonl).unwrap();
1727 write_event_line(&mut f, &e1);
1728 drop(f);
1729
1730 let conn = open(&db).unwrap();
1731 ingest_new_events(&conn, &jsonl, project).unwrap();
1732
1733 let e2 = make_open_event("tj-r2", "after-corruption");
1736 let e3 = make_open_event("tj-r3", "after-corruption-2");
1737 let mut f = std::fs::File::create(&jsonl).unwrap();
1738 write_event_line(&mut f, &e2);
1739 write_event_line(&mut f, &e3);
1740 drop(f);
1741
1742 let n = ingest_new_events(&conn, &jsonl, project).unwrap();
1743 assert_eq!(n, 2, "missing marker must trigger full rebuild");
1744 }
1745
1746 #[test]
1747 fn rebuild_state_and_ingest_new_events_produce_same_state() {
1748 let d = TempDir::new().unwrap();
1749 let jsonl_a = d.path().join("a.jsonl");
1750 let jsonl_b = d.path().join("b.jsonl");
1751 let db_a = d.path().join("a.sqlite");
1752 let db_b = d.path().join("b.sqlite");
1753
1754 let events: Vec<_> = (0..5)
1755 .map(|i| make_open_event(&format!("tj-eq{i}"), &format!("title {i}")))
1756 .collect();
1757 for path in [&jsonl_a, &jsonl_b] {
1758 let mut f = std::fs::File::create(path).unwrap();
1759 for e in &events {
1760 write_event_line(&mut f, e);
1761 }
1762 }
1763
1764 let conn_a = open(&db_a).unwrap();
1765 let n_a = rebuild_state(&conn_a, &jsonl_a, "abcd1234abcd1234").unwrap();
1766
1767 let conn_b = open(&db_b).unwrap();
1768 let n_b = ingest_new_events(&conn_b, &jsonl_b, "abcd1234abcd1234").unwrap();
1769
1770 assert_eq!(n_a, n_b);
1771 assert_eq!(n_a, 5);
1772
1773 for table in ["tasks", "events_index"] {
1774 let q = format!("SELECT COUNT(*) FROM {table}");
1775 let cnt_a: i64 = conn_a.query_row(&q, [], |r| r.get(0)).unwrap();
1776 let cnt_b: i64 = conn_b.query_row(&q, [], |r| r.get(0)).unwrap();
1777 assert_eq!(cnt_a, cnt_b, "row count mismatch in {table}");
1778 }
1779 }
1780
1781 #[test]
1782 fn rebuild_state_skips_malformed_jsonl_lines() {
1783 use std::io::Write;
1784 let d = TempDir::new().unwrap();
1785 let events_path = d.path().join("events.jsonl");
1786 let db_path = d.path().join("s.sqlite");
1787
1788 let mut f = std::fs::File::create(&events_path).unwrap();
1789
1790 let mut e1 = crate::event::Event::new(
1791 "tj-skip",
1792 crate::event::EventType::Open,
1793 crate::event::Author::User,
1794 crate::event::Source::Cli,
1795 "x".into(),
1796 );
1797 e1.meta = serde_json::json!({"title": "Skip test"});
1798 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1799
1800 writeln!(f, "this is not a json event line").unwrap();
1802
1803 writeln!(f, "{{\"foo\": 1}}").unwrap();
1805
1806 let e3 = crate::event::Event::new(
1807 "tj-skip",
1808 crate::event::EventType::Decision,
1809 crate::event::Author::Agent,
1810 crate::event::Source::Chat,
1811 "Adopt Rust".into(),
1812 );
1813 writeln!(f, "{}", serde_json::to_string(&e3).unwrap()).unwrap();
1814 drop(f);
1815
1816 let conn = open(&db_path).unwrap();
1817 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef")
1818 .expect("rebuild_state must succeed despite malformed lines");
1819 assert_eq!(
1820 n, 2,
1821 "expected 2 valid events indexed (2 malformed skipped)"
1822 );
1823
1824 let indexed: i64 = conn
1825 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1826 .unwrap();
1827 assert_eq!(indexed, 2);
1828 }
1829
1830 #[test]
1831 fn rebuild_state_reads_jsonl_and_populates_db() {
1832 use std::io::Write;
1833 let d = TempDir::new().unwrap();
1834 let events_path = d.path().join("events.jsonl");
1835 let db_path = d.path().join("s.sqlite");
1836
1837 let mut f = std::fs::File::create(&events_path).unwrap();
1838 let mut e1 = crate::event::Event::new(
1839 "tj-9",
1840 crate::event::EventType::Open,
1841 crate::event::Author::User,
1842 crate::event::Source::Cli,
1843 "x".into(),
1844 );
1845 e1.meta = serde_json::json!({"title": "Nine"});
1846 let e2 = crate::event::Event::new(
1847 "tj-9",
1848 crate::event::EventType::Decision,
1849 crate::event::Author::Agent,
1850 crate::event::Source::Chat,
1851 "Adopt Rust".into(),
1852 );
1853 writeln!(f, "{}", serde_json::to_string(&e1).unwrap()).unwrap();
1854 writeln!(f, "{}", serde_json::to_string(&e2).unwrap()).unwrap();
1855 drop(f);
1856
1857 let conn = open(&db_path).unwrap();
1858 let n = rebuild_state(&conn, &events_path, "deadbeefdeadbeef").unwrap();
1859 assert_eq!(n, 2);
1860
1861 let n: i64 = conn
1862 .query_row("SELECT COUNT(*) FROM tasks", [], |r| r.get(0))
1863 .unwrap();
1864 assert_eq!(n, 1);
1865 let n: i64 = conn
1866 .query_row("SELECT COUNT(*) FROM events_index", [], |r| r.get(0))
1867 .unwrap();
1868 assert_eq!(n, 2);
1869 }
1870
1871 #[test]
1872 fn index_event_writes_index_and_fts() {
1873 let d = TempDir::new().unwrap();
1874 let conn = open(d.path().join("s.sqlite")).unwrap();
1875 let mut open_e = crate::event::Event::new(
1876 "tj-1",
1877 crate::event::EventType::Open,
1878 crate::event::Author::User,
1879 crate::event::Source::Cli,
1880 "Title".into(),
1881 );
1882 open_e.meta = serde_json::json!({"title": "Title"});
1883 upsert_task_from_event(&conn, &open_e, "deadbeefdeadbeef").unwrap();
1884 index_event(&conn, &open_e).unwrap();
1885
1886 let mut decision = crate::event::Event::new(
1887 "tj-1",
1888 crate::event::EventType::Decision,
1889 crate::event::Author::Agent,
1890 crate::event::Source::Chat,
1891 "Adopt Rust".into(),
1892 );
1893 decision.confidence = Some(0.92);
1894 upsert_task_from_event(&conn, &decision, "deadbeefdeadbeef").unwrap();
1895 index_event(&conn, &decision).unwrap();
1896
1897 let count: i64 = conn
1898 .query_row(
1899 "SELECT COUNT(*) FROM events_index WHERE task_id=?1",
1900 rusqlite::params!["tj-1"],
1901 |r| r.get(0),
1902 )
1903 .unwrap();
1904 assert_eq!(count, 2);
1905
1906 let mut stmt = conn
1907 .prepare("SELECT event_id FROM search_fts WHERE search_fts MATCH ?1")
1908 .unwrap();
1909 let hits: Vec<String> = stmt
1910 .query_map(rusqlite::params!["Rust"], |r| {
1911 let s: String = r.get(0)?;
1912 Ok(s)
1913 })
1914 .unwrap()
1915 .collect::<Result<Vec<_>, _>>()
1916 .unwrap();
1917 assert_eq!(hits.len(), 1);
1918 assert_eq!(hits[0], decision.event_id);
1919 }
1920
1921 #[test]
1922 fn upsert_task_from_open_event_inserts_row() {
1923 let d = TempDir::new().unwrap();
1924 let conn = open(d.path().join("s.sqlite")).unwrap();
1925
1926 let mut e = crate::event::Event::new(
1927 "tj-7f3a",
1928 crate::event::EventType::Open,
1929 crate::event::Author::User,
1930 crate::event::Source::Cli,
1931 "Add OAuth".into(),
1932 );
1933 e.meta = serde_json::json!({ "title": "Add OAuth login" });
1934
1935 upsert_task_from_event(&conn, &e, "abcd1234abcd1234").unwrap();
1936
1937 let (id, title, status): (String, String, String) = conn
1938 .query_row(
1939 "SELECT task_id, title, status FROM tasks WHERE task_id = ?1",
1940 ["tj-7f3a"],
1941 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1942 )
1943 .unwrap();
1944
1945 assert_eq!(id, "tj-7f3a");
1946 assert_eq!(title, "Add OAuth login");
1947 assert_eq!(status, "open");
1948 }
1949
1950 #[test]
1951 fn migration_adds_parent_id_column_nullable() {
1952 let d = tempfile::TempDir::new().unwrap();
1953 let conn = open(d.path().join("s.sqlite")).unwrap();
1954
1955 let e = make_open_event("tj-a", "Top");
1957 upsert_task_from_event(&conn, &e, "ph").unwrap();
1958
1959 let parent: Option<String> = conn
1960 .query_row(
1961 "SELECT parent_id FROM tasks WHERE task_id = ?1",
1962 rusqlite::params!["tj-a"],
1963 |r| r.get(0),
1964 )
1965 .unwrap();
1966 assert_eq!(parent, None);
1967 }
1968
1969 #[test]
1970 fn open_event_meta_parent_id_is_persisted() {
1971 let d = tempfile::TempDir::new().unwrap();
1972 let conn = open(d.path().join("s.sqlite")).unwrap();
1973
1974 upsert_task_from_event(&conn, &make_open_event("tj-parent", "Parent"), "ph").unwrap();
1976
1977 let mut child = make_open_event("tj-child", "Child");
1979 child.meta = serde_json::json!({"title": "Child", "parent_id": "tj-parent"});
1980 upsert_task_from_event(&conn, &child, "ph").unwrap();
1981
1982 let parent: Option<String> = conn
1983 .query_row(
1984 "SELECT parent_id FROM tasks WHERE task_id = ?1",
1985 rusqlite::params!["tj-child"],
1986 |r| r.get(0),
1987 )
1988 .unwrap();
1989 assert_eq!(parent.as_deref(), Some("tj-parent"));
1990 }
1991
1992 #[test]
1993 fn children_of_and_parent_of_work() {
1994 let d = tempfile::TempDir::new().unwrap();
1995 let conn = open(d.path().join("s.sqlite")).unwrap();
1996 upsert_task_from_event(&conn, &make_open_event("p", "Parent"), "ph").unwrap();
1997
1998 let mut c1 = make_open_event("c1", "Child1");
1999 c1.meta = serde_json::json!({"title": "Child1", "parent_id": "p"});
2000 upsert_task_from_event(&conn, &c1, "ph").unwrap();
2001 let mut c2 = make_open_event("c2", "Child2");
2002 c2.meta = serde_json::json!({"title": "Child2", "parent_id": "p"});
2003 upsert_task_from_event(&conn, &c2, "ph").unwrap();
2004
2005 let kids = children_of(&conn, "p").unwrap();
2006 let ids: Vec<&str> = kids.iter().map(|t| t.task_id.as_str()).collect();
2007 assert!(ids.contains(&"c1") && ids.contains(&"c2"));
2008 assert_eq!(kids.len(), 2);
2009
2010 assert_eq!(parent_of(&conn, "c1").unwrap().as_deref(), Some("p"));
2011 assert_eq!(parent_of(&conn, "p").unwrap(), None);
2012 }
2013
2014 #[test]
2015 fn cycle_guard_rejects_self_and_ancestor() {
2016 let d = tempfile::TempDir::new().unwrap();
2017 let conn = open(d.path().join("s.sqlite")).unwrap();
2018 upsert_task_from_event(&conn, &make_open_event("a", "A"), "ph").unwrap();
2019 let mut b = make_open_event("b", "B");
2020 b.meta = serde_json::json!({"title": "B", "parent_id": "a"});
2021 upsert_task_from_event(&conn, &b, "ph").unwrap();
2022
2023 assert!(would_create_cycle(&conn, "a", "b").unwrap());
2025 assert!(would_create_cycle(&conn, "a", "a").unwrap());
2027 upsert_task_from_event(&conn, &make_open_event("x", "X"), "ph").unwrap();
2029 assert!(!would_create_cycle(&conn, "x", "a").unwrap());
2030 }
2031
2032 #[test]
2033 fn invalidate_cascade_clears_parent_pack() {
2034 let d = tempfile::TempDir::new().unwrap();
2035 let conn = open(d.path().join("s.sqlite")).unwrap();
2036 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2037 let mut c = make_open_event("c", "C");
2038 c.meta = serde_json::json!({"title": "C", "parent_id": "p"});
2039 upsert_task_from_event(&conn, &c, "ph").unwrap();
2040
2041 for id in ["p", "c"] {
2043 conn.execute(
2044 "INSERT INTO task_pack_cache(task_id, mode, text, generated_at, source_event_count)
2045 VALUES (?1, 'compact', 'x', '2026-01-01T00:00:00Z', 1)",
2046 rusqlite::params![id],
2047 )
2048 .unwrap();
2049 }
2050
2051 invalidate_pack_cascade(&conn, "c").unwrap();
2052
2053 let remaining: i64 = conn
2054 .query_row("SELECT COUNT(*) FROM task_pack_cache", [], |r| r.get(0))
2055 .unwrap();
2056 assert_eq!(remaining, 0, "both child and parent pack caches cleared");
2057 }
2058
2059 #[test]
2060 fn count_open_children_counts_only_open() {
2061 let d = tempfile::TempDir::new().unwrap();
2062 let conn = open(d.path().join("s.sqlite")).unwrap();
2063 upsert_task_from_event(&conn, &make_open_event("p", "P"), "ph").unwrap();
2064 let mut c1 = make_open_event("c1", "C1");
2065 c1.meta = serde_json::json!({"title": "C1", "parent_id": "p"});
2066 upsert_task_from_event(&conn, &c1, "ph").unwrap();
2067 let mut close = crate::event::Event::new(
2069 "c1",
2070 crate::event::EventType::Close,
2071 crate::event::Author::User,
2072 crate::event::Source::Cli,
2073 "done".into(),
2074 );
2075 close.timestamp = "2026-01-02T00:00:00Z".into();
2076 upsert_task_from_event(&conn, &close, "ph").unwrap();
2077 let mut c2 = make_open_event("c2", "C2");
2078 c2.meta = serde_json::json!({"title": "C2", "parent_id": "p"});
2079 upsert_task_from_event(&conn, &c2, "ph").unwrap();
2080
2081 assert_eq!(count_open_children(&conn, "p").unwrap(), 1); }
2083}