1use edda_core::types::{Digest, Event, Provenance, Refs};
7use rusqlite::{params, Connection, OptionalExtension};
8use std::path::Path;
9
10const SCHEMA_SQL: &str = "
11PRAGMA journal_mode = WAL;
12PRAGMA foreign_keys = ON;
13
14CREATE TABLE IF NOT EXISTS events (
15 rowid INTEGER PRIMARY KEY,
16 event_id TEXT UNIQUE NOT NULL,
17 ts TEXT NOT NULL,
18 event_type TEXT NOT NULL,
19 branch TEXT NOT NULL,
20 parent_hash TEXT,
21 hash TEXT NOT NULL,
22 payload TEXT NOT NULL,
23 refs_blobs TEXT NOT NULL DEFAULT '[]',
24 refs_events TEXT NOT NULL DEFAULT '[]',
25 refs_provenance TEXT NOT NULL DEFAULT '[]',
26 schema_version INTEGER NOT NULL DEFAULT 0,
27 digests TEXT NOT NULL DEFAULT '[]',
28 event_family TEXT,
29 event_level TEXT
30);
31
32CREATE INDEX IF NOT EXISTS idx_events_branch ON events(branch);
33CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
34CREATE INDEX IF NOT EXISTS idx_events_branch_type ON events(branch, event_type);
35CREATE INDEX IF NOT EXISTS idx_events_ts ON events(ts);
36CREATE INDEX IF NOT EXISTS idx_events_branch_ts ON events(branch, ts DESC);
37
38CREATE TABLE IF NOT EXISTS refs (
39 key TEXT PRIMARY KEY,
40 value TEXT NOT NULL
41);
42
43CREATE TABLE IF NOT EXISTS schema_meta (
44 key TEXT PRIMARY KEY,
45 value TEXT NOT NULL
46);
47";
48
49const SCHEMA_V2_SQL: &str = "
50CREATE TABLE IF NOT EXISTS decisions (
51 event_id TEXT PRIMARY KEY REFERENCES events(event_id),
52 key TEXT NOT NULL,
53 value TEXT NOT NULL,
54 reason TEXT NOT NULL DEFAULT '',
55 domain TEXT NOT NULL DEFAULT '',
56 branch TEXT NOT NULL,
57 supersedes_id TEXT,
58 is_active BOOLEAN NOT NULL DEFAULT TRUE
59);
60CREATE INDEX IF NOT EXISTS idx_decisions_key ON decisions(key);
61CREATE INDEX IF NOT EXISTS idx_decisions_domain ON decisions(domain);
62CREATE INDEX IF NOT EXISTS idx_decisions_active ON decisions(is_active) WHERE is_active = TRUE;
63CREATE INDEX IF NOT EXISTS idx_decisions_branch_key ON decisions(branch, key);
64";
65
66#[derive(Debug, Clone)]
68pub struct DecisionRow {
69 pub event_id: String,
70 pub key: String,
71 pub value: String,
72 pub reason: String,
73 pub domain: String,
74 pub branch: String,
75 pub supersedes_id: Option<String>,
76 pub is_active: bool,
77 pub ts: Option<String>,
78}
79
80pub struct SqliteStore {
82 conn: Connection,
83}
84
85impl SqliteStore {
86 pub fn open(db_path: &Path) -> anyhow::Result<Self> {
88 let conn = Connection::open(db_path)?;
89 let store = Self { conn };
90 store.apply_pragmas()?;
91 Ok(store)
92 }
93
94 pub fn open_or_create(db_path: &Path) -> anyhow::Result<Self> {
96 if let Some(parent) = db_path.parent() {
97 std::fs::create_dir_all(parent)?;
98 }
99 let conn = Connection::open(db_path)?;
100 let store = Self { conn };
101 store.apply_pragmas()?;
102 store.apply_schema()?;
103 Ok(store)
104 }
105
106 fn apply_pragmas(&self) -> anyhow::Result<()> {
107 self.conn.execute_batch(
108 "PRAGMA journal_mode = WAL;
109 PRAGMA foreign_keys = ON;
110 PRAGMA busy_timeout = 5000;",
111 )?;
112 Ok(())
113 }
114
115 fn apply_schema(&self) -> anyhow::Result<()> {
116 self.conn.execute_batch(SCHEMA_SQL)?;
118
119 self.conn.execute(
121 "INSERT OR IGNORE INTO schema_meta (key, value) VALUES ('version', '1')",
122 [],
123 )?;
124
125 let current = self.schema_version()?;
127 if current < 2 {
128 self.migrate_v1_to_v2()?;
129 }
130
131 Ok(())
132 }
133
134 fn schema_version(&self) -> anyhow::Result<u32> {
135 let version_str: String = self
136 .conn
137 .query_row(
138 "SELECT value FROM schema_meta WHERE key = 'version'",
139 [],
140 |row| row.get(0),
141 )
142 .unwrap_or_else(|_| "1".to_string());
143 Ok(version_str.parse().unwrap_or(1))
144 }
145
146 fn set_schema_version(&self, version: u32) -> anyhow::Result<()> {
147 self.conn.execute(
148 "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('version', ?1)",
149 params![version.to_string()],
150 )?;
151 Ok(())
152 }
153
154 fn migrate_v1_to_v2(&self) -> anyhow::Result<()> {
155 self.conn.execute_batch(SCHEMA_V2_SQL)?;
157
158 let mut stmt = self.conn.prepare(
160 "SELECT event_id, ts, branch, payload, refs_provenance FROM events
161 WHERE event_type = 'note' ORDER BY rowid",
162 )?;
163 let rows: Vec<(String, String, String, String, String)> = stmt
164 .query_map([], |row| {
165 Ok((
166 row.get(0)?,
167 row.get(1)?,
168 row.get(2)?,
169 row.get(3)?,
170 row.get(4)?,
171 ))
172 })?
173 .collect::<Result<Vec<_>, _>>()?;
174
175 for (event_id, _ts, branch, payload_str, prov_str) in &rows {
176 let payload: serde_json::Value = match serde_json::from_str(payload_str) {
177 Ok(v) => v,
178 Err(_) => continue,
179 };
180
181 if !is_decision_payload(&payload) {
182 continue;
183 }
184
185 let (key, value, reason) = extract_decision_from_payload(&payload);
186 if key.is_empty() && value.is_empty() {
187 continue;
188 }
189 let domain = extract_domain(&key);
190
191 let provenance: Vec<Provenance> = serde_json::from_str(prov_str).unwrap_or_default();
192 let supersedes_id = provenance
193 .iter()
194 .find(|p| p.rel == "supersedes")
195 .map(|p| p.target.as_str());
196
197 self.conn.execute(
198 "INSERT OR IGNORE INTO decisions
199 (event_id, key, value, reason, domain, branch, supersedes_id, is_active)
200 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, TRUE)",
201 params![event_id, key, value, reason, domain, branch, supersedes_id],
202 )?;
203 }
204
205 self.conn.execute(
207 "UPDATE decisions SET is_active = FALSE
208 WHERE event_id IN (
209 SELECT d_old.event_id FROM decisions d_old
210 JOIN decisions d_new ON d_new.supersedes_id = d_old.event_id
211 )",
212 [],
213 )?;
214
215 self.conn.execute_batch(
218 "UPDATE decisions SET is_active = FALSE
219 WHERE rowid NOT IN (
220 SELECT MAX(d.rowid) FROM decisions d
221 GROUP BY d.key, d.branch
222 ) AND is_active = TRUE",
223 )?;
224
225 self.set_schema_version(2)?;
226 Ok(())
227 }
228
229 pub fn append_event(&self, event: &Event) -> anyhow::Result<()> {
236 let payload = serde_json::to_string(&event.payload)?;
237 let refs_blobs = serde_json::to_string(&event.refs.blobs)?;
238 let refs_events = serde_json::to_string(&event.refs.events)?;
239 let refs_provenance = serde_json::to_string(&event.refs.provenance)?;
240 let digests = serde_json::to_string(&event.digests)?;
241
242 let tx = self.conn.unchecked_transaction()?;
243
244 tx.execute(
245 "INSERT INTO events (
246 event_id, ts, event_type, branch, parent_hash, hash,
247 payload, refs_blobs, refs_events, refs_provenance,
248 schema_version, digests, event_family, event_level
249 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
250 params![
251 event.event_id,
252 event.ts,
253 event.event_type,
254 event.branch,
255 event.parent_hash,
256 event.hash,
257 payload,
258 refs_blobs,
259 refs_events,
260 refs_provenance,
261 event.schema_version,
262 digests,
263 event.event_family,
264 event.event_level,
265 ],
266 )?;
267
268 if is_decision_event(event) {
270 let (key, value, reason) = extract_decision_from_payload(&event.payload);
271 if !key.is_empty() || !value.is_empty() {
272 let domain = extract_domain(&key);
273 let supersedes_id = event
274 .refs
275 .provenance
276 .iter()
277 .find(|p| p.rel == "supersedes")
278 .map(|p| p.target.as_str());
279
280 tx.execute(
282 "UPDATE decisions SET is_active = FALSE
283 WHERE key = ?1 AND branch = ?2 AND is_active = TRUE",
284 params![key, event.branch],
285 )?;
286
287 tx.execute(
288 "INSERT INTO decisions
289 (event_id, key, value, reason, domain, branch, supersedes_id, is_active)
290 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, TRUE)",
291 params![
292 event.event_id,
293 key,
294 value,
295 reason,
296 domain,
297 event.branch,
298 supersedes_id
299 ],
300 )?;
301 }
302 }
303
304 tx.commit()?;
305 Ok(())
306 }
307
308 pub fn iter_events(&self) -> anyhow::Result<Vec<Event>> {
310 let mut stmt = self.conn.prepare(
311 "SELECT event_id, ts, event_type, branch, parent_hash, hash,
312 payload, refs_blobs, refs_events, refs_provenance,
313 schema_version, digests, event_family, event_level
314 FROM events ORDER BY rowid",
315 )?;
316
317 let events = stmt
318 .query_map([], |row| {
319 let payload_str: String = row.get(6)?;
320 let refs_blobs_str: String = row.get(7)?;
321 let refs_events_str: String = row.get(8)?;
322 let refs_prov_str: String = row.get(9)?;
323 let digests_str: String = row.get(11)?;
324
325 Ok(EventRow {
326 event_id: row.get(0)?,
327 ts: row.get(1)?,
328 event_type: row.get(2)?,
329 branch: row.get(3)?,
330 parent_hash: row.get(4)?,
331 hash: row.get(5)?,
332 payload_str,
333 refs_blobs_str,
334 refs_events_str,
335 refs_prov_str,
336 schema_version: row.get(10)?,
337 digests_str,
338 event_family: row.get(12)?,
339 event_level: row.get(13)?,
340 })
341 })?
342 .collect::<Result<Vec<_>, _>>()?;
343
344 events.into_iter().map(row_to_event).collect()
345 }
346
347 pub fn last_event_hash(&self) -> anyhow::Result<Option<String>> {
349 let result: Option<String> = self
350 .conn
351 .query_row(
352 "SELECT hash FROM events ORDER BY rowid DESC LIMIT 1",
353 [],
354 |row| row.get(0),
355 )
356 .optional()?;
357 Ok(result)
358 }
359
360 pub fn head_branch(&self) -> anyhow::Result<String> {
364 let value: String = self
365 .conn
366 .query_row("SELECT value FROM refs WHERE key = 'HEAD'", [], |row| {
367 row.get(0)
368 })
369 .map_err(|_| anyhow::anyhow!("HEAD not set in refs table"))?;
370 Ok(value)
371 }
372
373 pub fn set_head_branch(&self, name: &str) -> anyhow::Result<()> {
375 self.conn.execute(
376 "INSERT OR REPLACE INTO refs (key, value) VALUES ('HEAD', ?1)",
377 params![name],
378 )?;
379 Ok(())
380 }
381
382 pub fn branches_json(&self) -> anyhow::Result<serde_json::Value> {
384 let value: String = self
385 .conn
386 .query_row("SELECT value FROM refs WHERE key = 'branches'", [], |row| {
387 row.get(0)
388 })
389 .map_err(|_| anyhow::anyhow!("branches not set in refs table"))?;
390 let json: serde_json::Value = serde_json::from_str(&value)?;
391 Ok(json)
392 }
393
394 pub fn set_branches_json(&self, value: &serde_json::Value) -> anyhow::Result<()> {
396 let json_str = serde_json::to_string(value)?;
397 self.conn.execute(
398 "INSERT OR REPLACE INTO refs (key, value) VALUES ('branches', ?1)",
399 params![json_str],
400 )?;
401 Ok(())
402 }
403
404 pub fn active_decisions(
408 &self,
409 domain: Option<&str>,
410 key_pattern: Option<&str>,
411 ) -> anyhow::Result<Vec<DecisionRow>> {
412 let sql = match (domain, key_pattern) {
413 (Some(_), _) => {
414 "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
415 d.supersedes_id, d.is_active, e.ts
416 FROM decisions d JOIN events e ON d.event_id = e.event_id
417 WHERE d.is_active = TRUE AND d.domain = ?1
418 ORDER BY d.domain, d.key"
419 }
420 (_, Some(_)) => {
421 "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
422 d.supersedes_id, d.is_active, e.ts
423 FROM decisions d JOIN events e ON d.event_id = e.event_id
424 WHERE d.is_active = TRUE AND (d.key LIKE ?1 OR d.value LIKE ?1)
425 ORDER BY d.domain, d.key"
426 }
427 (None, None) => {
428 "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
429 d.supersedes_id, d.is_active, e.ts
430 FROM decisions d JOIN events e ON d.event_id = e.event_id
431 WHERE d.is_active = TRUE
432 ORDER BY d.domain, d.key"
433 }
434 };
435
436 let param: String = match (domain, key_pattern) {
437 (Some(d), _) => d.to_string(),
438 (_, Some(k)) => format!("%{k}%"),
439 _ => String::new(),
440 };
441
442 let mut stmt = self.conn.prepare(sql)?;
443 let rows = if domain.is_some() || key_pattern.is_some() {
444 stmt.query_map(params![param], map_decision_row)?
445 } else {
446 stmt.query_map([], map_decision_row)?
447 };
448
449 rows.collect::<Result<Vec<_>, _>>()
450 .map_err(|e| anyhow::anyhow!("decision query failed: {e}"))
451 }
452
453 pub fn decision_timeline(&self, key: &str) -> anyhow::Result<Vec<DecisionRow>> {
455 let mut stmt = self.conn.prepare(
456 "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
457 d.supersedes_id, d.is_active, e.ts
458 FROM decisions d JOIN events e ON d.event_id = e.event_id
459 WHERE d.key = ?1
460 ORDER BY e.ts",
461 )?;
462 let rows = stmt.query_map(params![key], map_decision_row)?;
463 rows.collect::<Result<Vec<_>, _>>()
464 .map_err(|e| anyhow::anyhow!("decision timeline query failed: {e}"))
465 }
466
467 pub fn domain_timeline(&self, domain: &str) -> anyhow::Result<Vec<DecisionRow>> {
469 let mut stmt = self.conn.prepare(
470 "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
471 d.supersedes_id, d.is_active, e.ts
472 FROM decisions d JOIN events e ON d.event_id = e.event_id
473 WHERE d.domain = ?1
474 ORDER BY e.ts",
475 )?;
476 let rows = stmt.query_map(params![domain], map_decision_row)?;
477 rows.collect::<Result<Vec<_>, _>>()
478 .map_err(|e| anyhow::anyhow!("domain timeline query failed: {e}"))
479 }
480
481 pub fn list_domains(&self) -> anyhow::Result<Vec<String>> {
483 let mut stmt = self.conn.prepare(
484 "SELECT DISTINCT domain FROM decisions WHERE is_active = TRUE ORDER BY domain",
485 )?;
486 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
487 rows.collect::<Result<Vec<_>, _>>()
488 .map_err(|e| anyhow::anyhow!("list domains query failed: {e}"))
489 }
490
491 pub fn find_active_decision(
493 &self,
494 branch: &str,
495 key: &str,
496 ) -> anyhow::Result<Option<DecisionRow>> {
497 let mut stmt = self.conn.prepare(
498 "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
499 d.supersedes_id, d.is_active, e.ts
500 FROM decisions d JOIN events e ON d.event_id = e.event_id
501 WHERE d.key = ?1 AND d.branch = ?2 AND d.is_active = TRUE
502 LIMIT 1",
503 )?;
504 let result = stmt
505 .query_map(params![key, branch], map_decision_row)?
506 .next();
507 match result {
508 Some(Ok(row)) => Ok(Some(row)),
509 Some(Err(e)) => Err(anyhow::anyhow!("decision query failed: {e}")),
510 None => Ok(None),
511 }
512 }
513}
514
515impl Drop for SqliteStore {
516 fn drop(&mut self) {
517 let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
519 }
520}
521
522fn is_decision_event(event: &Event) -> bool {
526 event.event_type == "note"
527 && event
528 .payload
529 .get("tags")
530 .and_then(|v| v.as_array())
531 .map(|arr| arr.iter().any(|t| t.as_str() == Some("decision")))
532 .unwrap_or(false)
533}
534
535fn is_decision_payload(payload: &serde_json::Value) -> bool {
537 payload
538 .get("tags")
539 .and_then(|v| v.as_array())
540 .map(|arr| arr.iter().any(|t| t.as_str() == Some("decision")))
541 .unwrap_or(false)
542}
543
544fn extract_decision_from_payload(payload: &serde_json::Value) -> (String, String, String) {
547 if let Some(d) = payload.get("decision") {
548 let key = d
549 .get("key")
550 .and_then(|v| v.as_str())
551 .unwrap_or("")
552 .to_string();
553 let value = d
554 .get("value")
555 .and_then(|v| v.as_str())
556 .unwrap_or("")
557 .to_string();
558 let reason = d
559 .get("reason")
560 .and_then(|v| v.as_str())
561 .unwrap_or("")
562 .to_string();
563 return (key, value, reason);
564 }
565 let text = payload.get("text").and_then(|v| v.as_str()).unwrap_or("");
567 let (key, rest) = match text.split_once(": ") {
568 Some((k, r)) => (k.to_string(), r),
569 None => return (String::new(), text.to_string(), String::new()),
570 };
571 let (value, reason) = match rest.split_once(" — ") {
572 Some((v, r)) => (v.to_string(), r.to_string()),
573 None => (rest.to_string(), String::new()),
574 };
575 (key, value, reason)
576}
577
578fn extract_domain(key: &str) -> String {
580 key.split('.').next().unwrap_or(key).to_string()
581}
582
583fn map_decision_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DecisionRow> {
584 Ok(DecisionRow {
585 event_id: row.get(0)?,
586 key: row.get(1)?,
587 value: row.get(2)?,
588 reason: row.get(3)?,
589 domain: row.get(4)?,
590 branch: row.get(5)?,
591 supersedes_id: row.get(6)?,
592 is_active: row.get(7)?,
593 ts: row.get(8)?,
594 })
595}
596
597struct EventRow {
601 event_id: String,
602 ts: String,
603 event_type: String,
604 branch: String,
605 parent_hash: Option<String>,
606 hash: String,
607 payload_str: String,
608 refs_blobs_str: String,
609 refs_events_str: String,
610 refs_prov_str: String,
611 schema_version: u32,
612 digests_str: String,
613 event_family: Option<String>,
614 event_level: Option<String>,
615}
616
617fn row_to_event(row: EventRow) -> anyhow::Result<Event> {
618 let payload: serde_json::Value = serde_json::from_str(&row.payload_str)?;
619 let blobs: Vec<String> = serde_json::from_str(&row.refs_blobs_str)?;
620 let events: Vec<String> = serde_json::from_str(&row.refs_events_str)?;
621 let provenance: Vec<Provenance> = serde_json::from_str(&row.refs_prov_str)?;
622 let digests: Vec<Digest> = serde_json::from_str(&row.digests_str)?;
623
624 Ok(Event {
625 event_id: row.event_id,
626 ts: row.ts,
627 event_type: row.event_type,
628 branch: row.branch,
629 parent_hash: row.parent_hash,
630 hash: row.hash,
631 payload,
632 refs: Refs {
633 blobs,
634 events,
635 provenance,
636 },
637 schema_version: row.schema_version,
638 digests,
639 event_family: row.event_family,
640 event_level: row.event_level,
641 })
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use edda_core::event::new_note_event;
648 use std::sync::atomic::{AtomicU64, Ordering};
649
650 static COUNTER: AtomicU64 = AtomicU64::new(0);
651
652 fn tmp_db() -> (std::path::PathBuf, SqliteStore) {
653 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
654 let dir = std::env::temp_dir().join(format!("edda_sqlite_test_{}_{n}", std::process::id()));
655 let _ = std::fs::remove_dir_all(&dir);
656 std::fs::create_dir_all(&dir).unwrap();
657 let db_path = dir.join("ledger.db");
658 let store = SqliteStore::open_or_create(&db_path).unwrap();
659 (dir, store)
660 }
661
662 #[test]
663 fn schema_creation() {
664 let (dir, store) = tmp_db();
665 let tables: Vec<String> = store
667 .conn
668 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
669 .unwrap()
670 .query_map([], |row| row.get(0))
671 .unwrap()
672 .collect::<Result<Vec<_>, _>>()
673 .unwrap();
674 assert!(tables.contains(&"events".to_string()));
675 assert!(tables.contains(&"refs".to_string()));
676 assert!(tables.contains(&"schema_meta".to_string()));
677 drop(store);
678 let _ = std::fs::remove_dir_all(&dir);
679 }
680
681 #[test]
682 fn event_round_trip() {
683 let (dir, store) = tmp_db();
684 let e1 = new_note_event("main", None, "system", "first note", &["test".into()]).unwrap();
685 store.append_event(&e1).unwrap();
686
687 let e2 = new_note_event(
688 "main",
689 Some(&e1.hash),
690 "user",
691 "second note",
692 &["test".into()],
693 )
694 .unwrap();
695 store.append_event(&e2).unwrap();
696
697 let events = store.iter_events().unwrap();
698 assert_eq!(events.len(), 2);
699 assert_eq!(events[0].event_id, e1.event_id);
700 assert_eq!(events[0].hash, e1.hash);
701 assert_eq!(events[0].event_type, "note");
702 assert_eq!(events[0].branch, "main");
703 assert_eq!(events[1].event_id, e2.event_id);
704 assert_eq!(events[1].parent_hash, Some(e1.hash.clone()));
705
706 assert_eq!(events[0].payload["text"], "first note");
708 assert_eq!(events[1].payload["text"], "second note");
709
710 drop(store);
711 let _ = std::fs::remove_dir_all(&dir);
712 }
713
714 #[test]
715 fn last_event_hash_empty() {
716 let (dir, store) = tmp_db();
717 assert_eq!(store.last_event_hash().unwrap(), None);
718 drop(store);
719 let _ = std::fs::remove_dir_all(&dir);
720 }
721
722 #[test]
723 fn last_event_hash_returns_latest() {
724 let (dir, store) = tmp_db();
725 let e1 = new_note_event("main", None, "system", "init", &[]).unwrap();
726 store.append_event(&e1).unwrap();
727 assert_eq!(store.last_event_hash().unwrap(), Some(e1.hash.clone()));
728
729 let e2 = new_note_event("main", Some(&e1.hash), "user", "hello", &[]).unwrap();
730 store.append_event(&e2).unwrap();
731 assert_eq!(store.last_event_hash().unwrap(), Some(e2.hash.clone()));
732
733 drop(store);
734 let _ = std::fs::remove_dir_all(&dir);
735 }
736
737 #[test]
738 fn refs_head_branch() {
739 let (dir, store) = tmp_db();
740 assert!(store.head_branch().is_err());
742
743 store.set_head_branch("main").unwrap();
744 assert_eq!(store.head_branch().unwrap(), "main");
745
746 store.set_head_branch("feat/x").unwrap();
747 assert_eq!(store.head_branch().unwrap(), "feat/x");
748
749 drop(store);
750 let _ = std::fs::remove_dir_all(&dir);
751 }
752
753 #[test]
754 fn refs_branches_json() {
755 let (dir, store) = tmp_db();
756 let json = serde_json::json!({
757 "branches": {
758 "main": { "created_at": "2026-01-01T00:00:00Z" }
759 }
760 });
761 store.set_branches_json(&json).unwrap();
762 let loaded = store.branches_json().unwrap();
763 assert_eq!(loaded, json);
764
765 drop(store);
766 let _ = std::fs::remove_dir_all(&dir);
767 }
768
769 #[test]
770 fn event_with_refs() {
771 let (dir, store) = tmp_db();
772 let mut e =
773 new_note_event("main", None, "system", "with refs", &["decision".into()]).unwrap();
774 e.refs.blobs = vec!["blob:sha256:abc123".to_string()];
775 e.refs.events = vec!["evt_prior".to_string()];
776 e.refs.provenance = vec![Provenance {
777 target: "evt_old".to_string(),
778 rel: "supersedes".to_string(),
779 note: Some("re-decided".to_string()),
780 }];
781
782 store.append_event(&e).unwrap();
783 let events = store.iter_events().unwrap();
784 assert_eq!(events.len(), 1);
785 assert_eq!(events[0].refs.blobs, vec!["blob:sha256:abc123"]);
786 assert_eq!(events[0].refs.events, vec!["evt_prior"]);
787 assert_eq!(events[0].refs.provenance.len(), 1);
788 assert_eq!(events[0].refs.provenance[0].rel, "supersedes");
789 assert_eq!(
790 events[0].refs.provenance[0].note.as_deref(),
791 Some("re-decided")
792 );
793
794 drop(store);
795 let _ = std::fs::remove_dir_all(&dir);
796 }
797
798 #[test]
799 fn wal_checkpoint_on_drop() {
800 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
801 let dir = std::env::temp_dir().join(format!("edda_sqlite_wal_{}_{n}", std::process::id()));
802 let _ = std::fs::remove_dir_all(&dir);
803 std::fs::create_dir_all(&dir).unwrap();
804 let db_path = dir.join("ledger.db");
805
806 {
807 let store = SqliteStore::open_or_create(&db_path).unwrap();
808 let e = new_note_event("main", None, "system", "wal test", &[]).unwrap();
809 store.append_event(&e).unwrap();
810 }
812
813 assert!(db_path.exists());
815 let wal_path = dir.join("ledger.db-wal");
816 if wal_path.exists() {
817 let size = std::fs::metadata(&wal_path).unwrap().len();
819 assert_eq!(size, 0, "WAL file should be empty after checkpoint");
820 }
821
822 let _ = std::fs::remove_dir_all(&dir);
823 }
824
825 #[test]
826 fn event_ordering_preserved() {
827 let (dir, store) = tmp_db();
828 let mut prev_hash: Option<String> = None;
829 for i in 0..10 {
830 let e = new_note_event(
831 "main",
832 prev_hash.as_deref(),
833 "system",
834 &format!("event {i}"),
835 &[],
836 )
837 .unwrap();
838 prev_hash = Some(e.hash.clone());
839 store.append_event(&e).unwrap();
840 }
841
842 let events = store.iter_events().unwrap();
843 assert_eq!(events.len(), 10);
844 for (i, event) in events.iter().enumerate() {
845 assert_eq!(event.payload["text"], format!("event {i}"));
846 }
847 for i in 1..10 {
849 assert_eq!(
850 events[i].parent_hash.as_deref(),
851 Some(events[i - 1].hash.as_str())
852 );
853 }
854
855 drop(store);
856 let _ = std::fs::remove_dir_all(&dir);
857 }
858
859 #[test]
860 fn duplicate_event_id_errors() {
861 let (dir, store) = tmp_db();
862 let e = new_note_event("main", None, "system", "first", &[]).unwrap();
863 store.append_event(&e).unwrap();
864 assert!(store.append_event(&e).is_err());
866 drop(store);
867 let _ = std::fs::remove_dir_all(&dir);
868 }
869
870 #[test]
871 fn idempotent_schema_apply() {
872 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
873 let dir = std::env::temp_dir().join(format!("edda_sqlite_idem_{}_{n}", std::process::id()));
874 let _ = std::fs::remove_dir_all(&dir);
875 std::fs::create_dir_all(&dir).unwrap();
876 let db_path = dir.join("ledger.db");
877
878 let store1 = SqliteStore::open_or_create(&db_path).unwrap();
880 store1.set_head_branch("main").unwrap();
881 drop(store1);
882
883 let store2 = SqliteStore::open_or_create(&db_path).unwrap();
884 assert_eq!(store2.head_branch().unwrap(), "main");
885 drop(store2);
886
887 let _ = std::fs::remove_dir_all(&dir);
888 }
889
890 fn make_decision_event(
893 branch: &str,
894 key: &str,
895 value: &str,
896 reason: Option<&str>,
897 supersedes: Option<&str>,
898 ) -> Event {
899 use edda_core::event::finalize_event;
900
901 let text = match reason {
902 Some(r) => format!("{key}: {value} — {r}"),
903 None => format!("{key}: {value}"),
904 };
905 let tags = vec!["decision".to_string()];
906 let mut event = new_note_event(branch, None, "system", &text, &tags).unwrap();
907
908 let decision_obj = match reason {
910 Some(r) => serde_json::json!({"key": key, "value": value, "reason": r}),
911 None => serde_json::json!({"key": key, "value": value}),
912 };
913 event.payload["decision"] = decision_obj;
914
915 if let Some(target) = supersedes {
916 event.refs.provenance.push(Provenance {
917 target: target.to_string(),
918 rel: "supersedes".to_string(),
919 note: Some(format!("key '{key}' re-decided")),
920 });
921 }
922
923 finalize_event(&mut event);
924 event
925 }
926
927 #[test]
928 fn decision_materialized_on_append() {
929 let (dir, store) = tmp_db();
930 let e = make_decision_event("main", "db.engine", "postgres", Some("JSONB support"), None);
931 store.append_event(&e).unwrap();
932
933 let active = store.active_decisions(None, None).unwrap();
934 assert_eq!(active.len(), 1);
935 assert_eq!(active[0].key, "db.engine");
936 assert_eq!(active[0].value, "postgres");
937 assert_eq!(active[0].reason, "JSONB support");
938 assert_eq!(active[0].domain, "db");
939 assert_eq!(active[0].branch, "main");
940 assert!(active[0].is_active);
941 assert!(active[0].supersedes_id.is_none());
942
943 drop(store);
944 let _ = std::fs::remove_dir_all(&dir);
945 }
946
947 #[test]
948 fn supersede_deactivates_prior() {
949 let (dir, store) = tmp_db();
950 let d1 = make_decision_event("main", "db.engine", "mysql", None, None);
951 let d1_id = d1.event_id.clone();
952 store.append_event(&d1).unwrap();
953
954 let d2 = make_decision_event("main", "db.engine", "postgres", Some("JSONB"), Some(&d1_id));
955 store.append_event(&d2).unwrap();
956
957 let active = store.active_decisions(None, None).unwrap();
958 assert_eq!(active.len(), 1);
959 assert_eq!(active[0].value, "postgres");
960 assert_eq!(active[0].supersedes_id.as_deref(), Some(d1_id.as_str()));
961
962 let timeline = store.decision_timeline("db.engine").unwrap();
964 assert_eq!(timeline.len(), 2);
965 assert!(!timeline[0].is_active); assert!(timeline[1].is_active); drop(store);
969 let _ = std::fs::remove_dir_all(&dir);
970 }
971
972 #[test]
973 fn domain_auto_extracted() {
974 let (dir, store) = tmp_db();
975 store
976 .append_event(&make_decision_event(
977 "main",
978 "db.engine",
979 "postgres",
980 None,
981 None,
982 ))
983 .unwrap();
984 store
985 .append_event(&make_decision_event(
986 "main",
987 "db.pool_size",
988 "10",
989 None,
990 None,
991 ))
992 .unwrap();
993 store
994 .append_event(&make_decision_event(
995 "main",
996 "auth.method",
997 "JWT",
998 None,
999 None,
1000 ))
1001 .unwrap();
1002
1003 let db_decisions = store.active_decisions(Some("db"), None).unwrap();
1004 assert_eq!(db_decisions.len(), 2);
1005
1006 let auth_decisions = store.active_decisions(Some("auth"), None).unwrap();
1007 assert_eq!(auth_decisions.len(), 1);
1008 assert_eq!(auth_decisions[0].key, "auth.method");
1009
1010 drop(store);
1011 let _ = std::fs::remove_dir_all(&dir);
1012 }
1013
1014 #[test]
1015 fn legacy_text_only_decision() {
1016 let (dir, store) = tmp_db();
1017 use edda_core::event::finalize_event;
1019
1020 let tags = vec!["decision".to_string()];
1021 let mut event = new_note_event(
1022 "main",
1023 None,
1024 "system",
1025 "orm: sqlx — compile-time checks",
1026 &tags,
1027 )
1028 .unwrap();
1029 event.payload.as_object_mut().unwrap().remove("decision");
1032 finalize_event(&mut event);
1033 store.append_event(&event).unwrap();
1034
1035 let active = store.active_decisions(None, None).unwrap();
1036 assert_eq!(active.len(), 1);
1037 assert_eq!(active[0].key, "orm");
1038 assert_eq!(active[0].value, "sqlx");
1039 assert_eq!(active[0].reason, "compile-time checks");
1040
1041 drop(store);
1042 let _ = std::fs::remove_dir_all(&dir);
1043 }
1044
1045 #[test]
1046 fn active_decisions_key_pattern_search() {
1047 let (dir, store) = tmp_db();
1048 store
1049 .append_event(&make_decision_event(
1050 "main",
1051 "db.engine",
1052 "postgres",
1053 None,
1054 None,
1055 ))
1056 .unwrap();
1057 store
1058 .append_event(&make_decision_event(
1059 "main",
1060 "auth.method",
1061 "JWT",
1062 None,
1063 None,
1064 ))
1065 .unwrap();
1066 store
1067 .append_event(&make_decision_event(
1068 "main",
1069 "cache.driver",
1070 "redis",
1071 None,
1072 None,
1073 ))
1074 .unwrap();
1075
1076 let results = store.active_decisions(None, Some("postgres")).unwrap();
1078 assert_eq!(results.len(), 1);
1079 assert_eq!(results[0].key, "db.engine");
1080
1081 let results = store.active_decisions(None, Some("auth")).unwrap();
1082 assert_eq!(results.len(), 1);
1083 assert_eq!(results[0].key, "auth.method");
1084
1085 drop(store);
1086 let _ = std::fs::remove_dir_all(&dir);
1087 }
1088
1089 #[test]
1090 fn find_active_decision_by_branch_key() {
1091 let (dir, store) = tmp_db();
1092 store
1093 .append_event(&make_decision_event(
1094 "main",
1095 "db.engine",
1096 "postgres",
1097 None,
1098 None,
1099 ))
1100 .unwrap();
1101
1102 let found = store.find_active_decision("main", "db.engine").unwrap();
1103 assert!(found.is_some());
1104 assert_eq!(found.unwrap().value, "postgres");
1105
1106 let not_found = store.find_active_decision("main", "db.pool_size").unwrap();
1107 assert!(not_found.is_none());
1108
1109 let wrong_branch = store.find_active_decision("dev", "db.engine").unwrap();
1110 assert!(wrong_branch.is_none());
1111
1112 drop(store);
1113 let _ = std::fs::remove_dir_all(&dir);
1114 }
1115
1116 #[test]
1117 fn branch_scoped_supersession() {
1118 let (dir, store) = tmp_db();
1119 store
1121 .append_event(&make_decision_event(
1122 "main",
1123 "db.engine",
1124 "postgres",
1125 None,
1126 None,
1127 ))
1128 .unwrap();
1129 store
1130 .append_event(&make_decision_event(
1131 "dev",
1132 "db.engine",
1133 "sqlite",
1134 None,
1135 None,
1136 ))
1137 .unwrap();
1138
1139 let all = store.active_decisions(None, None).unwrap();
1140 assert_eq!(all.len(), 2);
1141
1142 let main = store
1143 .find_active_decision("main", "db.engine")
1144 .unwrap()
1145 .unwrap();
1146 assert_eq!(main.value, "postgres");
1147
1148 let dev = store
1149 .find_active_decision("dev", "db.engine")
1150 .unwrap()
1151 .unwrap();
1152 assert_eq!(dev.value, "sqlite");
1153
1154 drop(store);
1155 let _ = std::fs::remove_dir_all(&dir);
1156 }
1157
1158 #[test]
1159 fn schema_migration_v1_to_v2() {
1160 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
1161 let dir =
1162 std::env::temp_dir().join(format!("edda_sqlite_migrate_{}_{n}", std::process::id()));
1163 let _ = std::fs::remove_dir_all(&dir);
1164 std::fs::create_dir_all(&dir).unwrap();
1165 let db_path = dir.join("ledger.db");
1166
1167 {
1169 let conn = rusqlite::Connection::open(&db_path).unwrap();
1170 conn.execute_batch(SCHEMA_SQL).unwrap();
1171 conn.execute(
1172 "INSERT INTO schema_meta (key, value) VALUES ('version', '1')",
1173 [],
1174 )
1175 .unwrap();
1176
1177 conn.execute(
1179 "INSERT INTO events (event_id, ts, event_type, branch, hash, payload, schema_version)
1180 VALUES ('evt_v1', '2026-01-01T00:00:00Z', 'note', 'main', 'abc', ?1, 1)",
1181 params![serde_json::to_string(&serde_json::json!({
1182 "role": "system",
1183 "text": "db.engine: postgres — need JSONB",
1184 "tags": ["decision"],
1185 "decision": {"key": "db.engine", "value": "postgres", "reason": "need JSONB"}
1186 })).unwrap()],
1187 ).unwrap();
1188 }
1189
1190 let store = SqliteStore::open_or_create(&db_path).unwrap();
1192 assert_eq!(store.schema_version().unwrap(), 2);
1193
1194 let active = store.active_decisions(None, None).unwrap();
1196 assert_eq!(active.len(), 1);
1197 assert_eq!(active[0].key, "db.engine");
1198 assert_eq!(active[0].value, "postgres");
1199 assert_eq!(active[0].domain, "db");
1200
1201 drop(store);
1202 let _ = std::fs::remove_dir_all(&dir);
1203 }
1204
1205 #[test]
1206 fn non_decision_event_not_materialized() {
1207 let (dir, store) = tmp_db();
1208 let e = new_note_event("main", None, "system", "just a note", &["todo".into()]).unwrap();
1210 store.append_event(&e).unwrap();
1211
1212 let active = store.active_decisions(None, None).unwrap();
1213 assert!(active.is_empty());
1214
1215 drop(store);
1216 let _ = std::fs::remove_dir_all(&dir);
1217 }
1218
1219 #[test]
1220 fn domain_timeline_returns_active_and_superseded() {
1221 let (dir, store) = tmp_db();
1222 let d1 = make_decision_event("main", "db.engine", "sqlite", Some("MVP"), None);
1223 let d1_id = d1.event_id.clone();
1224 store.append_event(&d1).unwrap();
1225
1226 let d2 = make_decision_event("main", "db.engine", "postgres", Some("JSONB"), Some(&d1_id));
1227 store.append_event(&d2).unwrap();
1228
1229 store
1231 .append_event(&make_decision_event(
1232 "main",
1233 "auth.method",
1234 "JWT",
1235 None,
1236 None,
1237 ))
1238 .unwrap();
1239
1240 let timeline = store.domain_timeline("db").unwrap();
1241 assert_eq!(timeline.len(), 2);
1242 assert_eq!(timeline[0].value, "sqlite");
1243 assert!(!timeline[0].is_active);
1244 assert_eq!(timeline[1].value, "postgres");
1245 assert!(timeline[1].is_active);
1246
1247 drop(store);
1248 let _ = std::fs::remove_dir_all(&dir);
1249 }
1250
1251 #[test]
1252 fn domain_timeline_empty_for_unknown_domain() {
1253 let (dir, store) = tmp_db();
1254 store
1255 .append_event(&make_decision_event(
1256 "main",
1257 "db.engine",
1258 "postgres",
1259 None,
1260 None,
1261 ))
1262 .unwrap();
1263
1264 let timeline = store.domain_timeline("nonexistent").unwrap();
1265 assert!(timeline.is_empty());
1266
1267 drop(store);
1268 let _ = std::fs::remove_dir_all(&dir);
1269 }
1270
1271 #[test]
1272 fn list_domains_returns_sorted_unique() {
1273 let (dir, store) = tmp_db();
1274 store
1275 .append_event(&make_decision_event(
1276 "main",
1277 "db.engine",
1278 "postgres",
1279 None,
1280 None,
1281 ))
1282 .unwrap();
1283 store
1284 .append_event(&make_decision_event(
1285 "main",
1286 "db.pool_size",
1287 "10",
1288 None,
1289 None,
1290 ))
1291 .unwrap();
1292 store
1293 .append_event(&make_decision_event(
1294 "main",
1295 "auth.method",
1296 "JWT",
1297 None,
1298 None,
1299 ))
1300 .unwrap();
1301
1302 let domains = store.list_domains().unwrap();
1303 assert_eq!(domains, vec!["auth", "db"]);
1304
1305 drop(store);
1306 let _ = std::fs::remove_dir_all(&dir);
1307 }
1308
1309 #[test]
1310 fn list_domains_excludes_superseded_only() {
1311 let (dir, store) = tmp_db();
1312 let d1 = make_decision_event("main", "cache.strategy", "redis", None, None);
1313 let d1_id = d1.event_id.clone();
1314 store.append_event(&d1).unwrap();
1315
1316 let d2 = make_decision_event("main", "cache.strategy", "memcached", None, Some(&d1_id));
1318 store.append_event(&d2).unwrap();
1319
1320 let domains = store.list_domains().unwrap();
1322 assert!(domains.contains(&"cache".to_string()));
1323
1324 let d3 = make_decision_event("main", "temp.flag", "on", None, None);
1329 let d3_id = d3.event_id.clone();
1330 store.append_event(&d3).unwrap();
1331 assert!(store.list_domains().unwrap().contains(&"temp".to_string()));
1332
1333 let d4 = make_decision_event("main", "temp.flag", "off", None, Some(&d3_id));
1334 store.append_event(&d4).unwrap();
1335 assert!(store.list_domains().unwrap().contains(&"temp".to_string()));
1337
1338 drop(store);
1339 let _ = std::fs::remove_dir_all(&dir);
1340 }
1341
1342 #[test]
1343 fn decisions_table_in_schema() {
1344 let (dir, store) = tmp_db();
1345 let tables: Vec<String> = store
1346 .conn
1347 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
1348 .unwrap()
1349 .query_map([], |row| row.get(0))
1350 .unwrap()
1351 .collect::<Result<Vec<_>, _>>()
1352 .unwrap();
1353 assert!(tables.contains(&"decisions".to_string()));
1354 drop(store);
1355 let _ = std::fs::remove_dir_all(&dir);
1356 }
1357}