Skip to main content

edda_ledger/
sqlite_store.rs

1//! SQLite-backed storage for the edda ledger.
2//!
3//! Replaces the file-based storage (events.jsonl, refs/HEAD, refs/branches.json)
4//! with a single `ledger.db` SQLite file using WAL mode.
5
6use edda_core::types::{Digest, Event, Provenance, Refs};
7use rusqlite::{params, Connection, OptionalExtension};
8use std::path::Path;
9
10const SCHEMA_SQL: &str = "
11PRAGMA journal_mode = WAL;
12PRAGMA foreign_keys = ON;
13
14CREATE TABLE IF NOT EXISTS events (
15    rowid INTEGER PRIMARY KEY,
16    event_id TEXT UNIQUE NOT NULL,
17    ts TEXT NOT NULL,
18    event_type TEXT NOT NULL,
19    branch TEXT NOT NULL,
20    parent_hash TEXT,
21    hash TEXT NOT NULL,
22    payload TEXT NOT NULL,
23    refs_blobs TEXT NOT NULL DEFAULT '[]',
24    refs_events TEXT NOT NULL DEFAULT '[]',
25    refs_provenance TEXT NOT NULL DEFAULT '[]',
26    schema_version INTEGER NOT NULL DEFAULT 0,
27    digests TEXT NOT NULL DEFAULT '[]',
28    event_family TEXT,
29    event_level TEXT
30);
31
32CREATE INDEX IF NOT EXISTS idx_events_branch ON events(branch);
33CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
34CREATE INDEX IF NOT EXISTS idx_events_branch_type ON events(branch, event_type);
35CREATE INDEX IF NOT EXISTS idx_events_ts ON events(ts);
36CREATE INDEX IF NOT EXISTS idx_events_branch_ts ON events(branch, ts DESC);
37
38CREATE TABLE IF NOT EXISTS refs (
39    key TEXT PRIMARY KEY,
40    value TEXT NOT NULL
41);
42
43CREATE TABLE IF NOT EXISTS schema_meta (
44    key TEXT PRIMARY KEY,
45    value TEXT NOT NULL
46);
47";
48
49const SCHEMA_V2_SQL: &str = "
50CREATE TABLE IF NOT EXISTS decisions (
51    event_id TEXT PRIMARY KEY REFERENCES events(event_id),
52    key TEXT NOT NULL,
53    value TEXT NOT NULL,
54    reason TEXT NOT NULL DEFAULT '',
55    domain TEXT NOT NULL DEFAULT '',
56    branch TEXT NOT NULL,
57    supersedes_id TEXT,
58    is_active BOOLEAN NOT NULL DEFAULT TRUE
59);
60CREATE INDEX IF NOT EXISTS idx_decisions_key ON decisions(key);
61CREATE INDEX IF NOT EXISTS idx_decisions_domain ON decisions(domain);
62CREATE INDEX IF NOT EXISTS idx_decisions_active ON decisions(is_active) WHERE is_active = TRUE;
63CREATE INDEX IF NOT EXISTS idx_decisions_branch_key ON decisions(branch, key);
64";
65
66/// A row from the `decisions` table.
67#[derive(Debug, Clone)]
68pub struct DecisionRow {
69    pub event_id: String,
70    pub key: String,
71    pub value: String,
72    pub reason: String,
73    pub domain: String,
74    pub branch: String,
75    pub supersedes_id: Option<String>,
76    pub is_active: bool,
77    pub ts: Option<String>,
78}
79
80/// SQLite-backed storage engine.
81pub struct SqliteStore {
82    conn: Connection,
83}
84
85impl SqliteStore {
86    /// Open an existing ledger.db.
87    pub fn open(db_path: &Path) -> anyhow::Result<Self> {
88        let conn = Connection::open(db_path)?;
89        let store = Self { conn };
90        store.apply_pragmas()?;
91        Ok(store)
92    }
93
94    /// Open or create ledger.db with full schema.
95    pub fn open_or_create(db_path: &Path) -> anyhow::Result<Self> {
96        if let Some(parent) = db_path.parent() {
97            std::fs::create_dir_all(parent)?;
98        }
99        let conn = Connection::open(db_path)?;
100        let store = Self { conn };
101        store.apply_pragmas()?;
102        store.apply_schema()?;
103        Ok(store)
104    }
105
106    fn apply_pragmas(&self) -> anyhow::Result<()> {
107        self.conn.execute_batch(
108            "PRAGMA journal_mode = WAL;
109             PRAGMA foreign_keys = ON;
110             PRAGMA busy_timeout = 5000;",
111        )?;
112        Ok(())
113    }
114
115    fn apply_schema(&self) -> anyhow::Result<()> {
116        // Always apply v1 base schema (idempotent via IF NOT EXISTS)
117        self.conn.execute_batch(SCHEMA_SQL)?;
118
119        // Bootstrap version if not set
120        self.conn.execute(
121            "INSERT OR IGNORE INTO schema_meta (key, value) VALUES ('version', '1')",
122            [],
123        )?;
124
125        // Migrate to v2 if needed
126        let current = self.schema_version()?;
127        if current < 2 {
128            self.migrate_v1_to_v2()?;
129        }
130
131        Ok(())
132    }
133
134    fn schema_version(&self) -> anyhow::Result<u32> {
135        let version_str: String = self
136            .conn
137            .query_row(
138                "SELECT value FROM schema_meta WHERE key = 'version'",
139                [],
140                |row| row.get(0),
141            )
142            .unwrap_or_else(|_| "1".to_string());
143        Ok(version_str.parse().unwrap_or(1))
144    }
145
146    fn set_schema_version(&self, version: u32) -> anyhow::Result<()> {
147        self.conn.execute(
148            "INSERT OR REPLACE INTO schema_meta (key, value) VALUES ('version', ?1)",
149            params![version.to_string()],
150        )?;
151        Ok(())
152    }
153
154    fn migrate_v1_to_v2(&self) -> anyhow::Result<()> {
155        // Create decisions table + indexes
156        self.conn.execute_batch(SCHEMA_V2_SQL)?;
157
158        // Backfill: scan existing events for decisions
159        let mut stmt = self.conn.prepare(
160            "SELECT event_id, ts, branch, payload, refs_provenance FROM events
161             WHERE event_type = 'note' ORDER BY rowid",
162        )?;
163        let rows: Vec<(String, String, String, String, String)> = stmt
164            .query_map([], |row| {
165                Ok((
166                    row.get(0)?,
167                    row.get(1)?,
168                    row.get(2)?,
169                    row.get(3)?,
170                    row.get(4)?,
171                ))
172            })?
173            .collect::<Result<Vec<_>, _>>()?;
174
175        for (event_id, _ts, branch, payload_str, prov_str) in &rows {
176            let payload: serde_json::Value = match serde_json::from_str(payload_str) {
177                Ok(v) => v,
178                Err(_) => continue,
179            };
180
181            if !is_decision_payload(&payload) {
182                continue;
183            }
184
185            let (key, value, reason) = extract_decision_from_payload(&payload);
186            if key.is_empty() && value.is_empty() {
187                continue;
188            }
189            let domain = extract_domain(&key);
190
191            let provenance: Vec<Provenance> = serde_json::from_str(prov_str).unwrap_or_default();
192            let supersedes_id = provenance
193                .iter()
194                .find(|p| p.rel == "supersedes")
195                .map(|p| p.target.as_str());
196
197            self.conn.execute(
198                "INSERT OR IGNORE INTO decisions
199                 (event_id, key, value, reason, domain, branch, supersedes_id, is_active)
200                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, TRUE)",
201                params![event_id, key, value, reason, domain, branch, supersedes_id],
202            )?;
203        }
204
205        // Fix is_active: deactivate decisions that have been superseded
206        self.conn.execute(
207            "UPDATE decisions SET is_active = FALSE
208             WHERE event_id IN (
209                 SELECT d_old.event_id FROM decisions d_old
210                 JOIN decisions d_new ON d_new.supersedes_id = d_old.event_id
211             )",
212            [],
213        )?;
214
215        // Also deactivate by key+branch: for each (key, branch), only the latest is active
216        // This handles cases where supersedes_id wasn't set (legacy events)
217        self.conn.execute_batch(
218            "UPDATE decisions SET is_active = FALSE
219             WHERE rowid NOT IN (
220                 SELECT MAX(d.rowid) FROM decisions d
221                 GROUP BY d.key, d.branch
222             ) AND is_active = TRUE",
223        )?;
224
225        self.set_schema_version(2)?;
226        Ok(())
227    }
228
229    // ── Events ──────────────────────────────────────────────────────
230
231    /// Append an event. Append-only (CONTRACT LEDGER-02).
232    ///
233    /// If the event is a decision (note with `"decision"` tag), the `decisions`
234    /// table is also updated atomically within the same transaction.
235    pub fn append_event(&self, event: &Event) -> anyhow::Result<()> {
236        let payload = serde_json::to_string(&event.payload)?;
237        let refs_blobs = serde_json::to_string(&event.refs.blobs)?;
238        let refs_events = serde_json::to_string(&event.refs.events)?;
239        let refs_provenance = serde_json::to_string(&event.refs.provenance)?;
240        let digests = serde_json::to_string(&event.digests)?;
241
242        let tx = self.conn.unchecked_transaction()?;
243
244        tx.execute(
245            "INSERT INTO events (
246                event_id, ts, event_type, branch, parent_hash, hash,
247                payload, refs_blobs, refs_events, refs_provenance,
248                schema_version, digests, event_family, event_level
249            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
250            params![
251                event.event_id,
252                event.ts,
253                event.event_type,
254                event.branch,
255                event.parent_hash,
256                event.hash,
257                payload,
258                refs_blobs,
259                refs_events,
260                refs_provenance,
261                event.schema_version,
262                digests,
263                event.event_family,
264                event.event_level,
265            ],
266        )?;
267
268        // Materialize decision if applicable
269        if is_decision_event(event) {
270            let (key, value, reason) = extract_decision_from_payload(&event.payload);
271            if !key.is_empty() || !value.is_empty() {
272                let domain = extract_domain(&key);
273                let supersedes_id = event
274                    .refs
275                    .provenance
276                    .iter()
277                    .find(|p| p.rel == "supersedes")
278                    .map(|p| p.target.as_str());
279
280                // Deactivate prior decision with same key on same branch
281                tx.execute(
282                    "UPDATE decisions SET is_active = FALSE
283                     WHERE key = ?1 AND branch = ?2 AND is_active = TRUE",
284                    params![key, event.branch],
285                )?;
286
287                tx.execute(
288                    "INSERT INTO decisions
289                     (event_id, key, value, reason, domain, branch, supersedes_id, is_active)
290                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, TRUE)",
291                    params![
292                        event.event_id,
293                        key,
294                        value,
295                        reason,
296                        domain,
297                        event.branch,
298                        supersedes_id
299                    ],
300                )?;
301            }
302        }
303
304        tx.commit()?;
305        Ok(())
306    }
307
308    /// Read all events in insertion order.
309    pub fn iter_events(&self) -> anyhow::Result<Vec<Event>> {
310        let mut stmt = self.conn.prepare(
311            "SELECT event_id, ts, event_type, branch, parent_hash, hash,
312                    payload, refs_blobs, refs_events, refs_provenance,
313                    schema_version, digests, event_family, event_level
314             FROM events ORDER BY rowid",
315        )?;
316
317        let events = stmt
318            .query_map([], |row| {
319                let payload_str: String = row.get(6)?;
320                let refs_blobs_str: String = row.get(7)?;
321                let refs_events_str: String = row.get(8)?;
322                let refs_prov_str: String = row.get(9)?;
323                let digests_str: String = row.get(11)?;
324
325                Ok(EventRow {
326                    event_id: row.get(0)?,
327                    ts: row.get(1)?,
328                    event_type: row.get(2)?,
329                    branch: row.get(3)?,
330                    parent_hash: row.get(4)?,
331                    hash: row.get(5)?,
332                    payload_str,
333                    refs_blobs_str,
334                    refs_events_str,
335                    refs_prov_str,
336                    schema_version: row.get(10)?,
337                    digests_str,
338                    event_family: row.get(12)?,
339                    event_level: row.get(13)?,
340                })
341            })?
342            .collect::<Result<Vec<_>, _>>()?;
343
344        events.into_iter().map(row_to_event).collect()
345    }
346
347    /// Get the hash of the last event.
348    pub fn last_event_hash(&self) -> anyhow::Result<Option<String>> {
349        let result: Option<String> = self
350            .conn
351            .query_row(
352                "SELECT hash FROM events ORDER BY rowid DESC LIMIT 1",
353                [],
354                |row| row.get(0),
355            )
356            .optional()?;
357        Ok(result)
358    }
359
360    // ── Refs ────────────────────────────────────────────────────────
361
362    /// Read the current HEAD branch name.
363    pub fn head_branch(&self) -> anyhow::Result<String> {
364        let value: String = self
365            .conn
366            .query_row("SELECT value FROM refs WHERE key = 'HEAD'", [], |row| {
367                row.get(0)
368            })
369            .map_err(|_| anyhow::anyhow!("HEAD not set in refs table"))?;
370        Ok(value)
371    }
372
373    /// Write the HEAD branch name.
374    pub fn set_head_branch(&self, name: &str) -> anyhow::Result<()> {
375        self.conn.execute(
376            "INSERT OR REPLACE INTO refs (key, value) VALUES ('HEAD', ?1)",
377            params![name],
378        )?;
379        Ok(())
380    }
381
382    /// Read branches.json equivalent from refs table.
383    pub fn branches_json(&self) -> anyhow::Result<serde_json::Value> {
384        let value: String = self
385            .conn
386            .query_row("SELECT value FROM refs WHERE key = 'branches'", [], |row| {
387                row.get(0)
388            })
389            .map_err(|_| anyhow::anyhow!("branches not set in refs table"))?;
390        let json: serde_json::Value = serde_json::from_str(&value)?;
391        Ok(json)
392    }
393
394    /// Write branches.json equivalent to refs table.
395    pub fn set_branches_json(&self, value: &serde_json::Value) -> anyhow::Result<()> {
396        let json_str = serde_json::to_string(value)?;
397        self.conn.execute(
398            "INSERT OR REPLACE INTO refs (key, value) VALUES ('branches', ?1)",
399            params![json_str],
400        )?;
401        Ok(())
402    }
403
404    // ── Decisions ───────────────────────────────────────────────────
405
406    /// Query active decisions, optionally filtered by domain or key prefix.
407    pub fn active_decisions(
408        &self,
409        domain: Option<&str>,
410        key_pattern: Option<&str>,
411    ) -> anyhow::Result<Vec<DecisionRow>> {
412        let sql = match (domain, key_pattern) {
413            (Some(_), _) => {
414                "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
415                        d.supersedes_id, d.is_active, e.ts
416                 FROM decisions d JOIN events e ON d.event_id = e.event_id
417                 WHERE d.is_active = TRUE AND d.domain = ?1
418                 ORDER BY d.domain, d.key"
419            }
420            (_, Some(_)) => {
421                "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
422                        d.supersedes_id, d.is_active, e.ts
423                 FROM decisions d JOIN events e ON d.event_id = e.event_id
424                 WHERE d.is_active = TRUE AND (d.key LIKE ?1 OR d.value LIKE ?1)
425                 ORDER BY d.domain, d.key"
426            }
427            (None, None) => {
428                "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
429                        d.supersedes_id, d.is_active, e.ts
430                 FROM decisions d JOIN events e ON d.event_id = e.event_id
431                 WHERE d.is_active = TRUE
432                 ORDER BY d.domain, d.key"
433            }
434        };
435
436        let param: String = match (domain, key_pattern) {
437            (Some(d), _) => d.to_string(),
438            (_, Some(k)) => format!("%{k}%"),
439            _ => String::new(),
440        };
441
442        let mut stmt = self.conn.prepare(sql)?;
443        let rows = if domain.is_some() || key_pattern.is_some() {
444            stmt.query_map(params![param], map_decision_row)?
445        } else {
446            stmt.query_map([], map_decision_row)?
447        };
448
449        rows.collect::<Result<Vec<_>, _>>()
450            .map_err(|e| anyhow::anyhow!("decision query failed: {e}"))
451    }
452
453    /// All decisions for a key (active + superseded), ordered by time.
454    pub fn decision_timeline(&self, key: &str) -> anyhow::Result<Vec<DecisionRow>> {
455        let mut stmt = self.conn.prepare(
456            "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
457                    d.supersedes_id, d.is_active, e.ts
458             FROM decisions d JOIN events e ON d.event_id = e.event_id
459             WHERE d.key = ?1
460             ORDER BY e.ts",
461        )?;
462        let rows = stmt.query_map(params![key], map_decision_row)?;
463        rows.collect::<Result<Vec<_>, _>>()
464            .map_err(|e| anyhow::anyhow!("decision timeline query failed: {e}"))
465    }
466
467    /// All decisions for a domain (active + superseded), ordered by time.
468    pub fn domain_timeline(&self, domain: &str) -> anyhow::Result<Vec<DecisionRow>> {
469        let mut stmt = self.conn.prepare(
470            "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
471                    d.supersedes_id, d.is_active, e.ts
472             FROM decisions d JOIN events e ON d.event_id = e.event_id
473             WHERE d.domain = ?1
474             ORDER BY e.ts",
475        )?;
476        let rows = stmt.query_map(params![domain], map_decision_row)?;
477        rows.collect::<Result<Vec<_>, _>>()
478            .map_err(|e| anyhow::anyhow!("domain timeline query failed: {e}"))
479    }
480
481    /// Distinct domain values from active decisions.
482    pub fn list_domains(&self) -> anyhow::Result<Vec<String>> {
483        let mut stmt = self.conn.prepare(
484            "SELECT DISTINCT domain FROM decisions WHERE is_active = TRUE ORDER BY domain",
485        )?;
486        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
487        rows.collect::<Result<Vec<_>, _>>()
488            .map_err(|e| anyhow::anyhow!("list domains query failed: {e}"))
489    }
490
491    /// Find the active decision for a specific key on a branch.
492    pub fn find_active_decision(
493        &self,
494        branch: &str,
495        key: &str,
496    ) -> anyhow::Result<Option<DecisionRow>> {
497        let mut stmt = self.conn.prepare(
498            "SELECT d.event_id, d.key, d.value, d.reason, d.domain, d.branch,
499                    d.supersedes_id, d.is_active, e.ts
500             FROM decisions d JOIN events e ON d.event_id = e.event_id
501             WHERE d.key = ?1 AND d.branch = ?2 AND d.is_active = TRUE
502             LIMIT 1",
503        )?;
504        let result = stmt
505            .query_map(params![key, branch], map_decision_row)?
506            .next();
507        match result {
508            Some(Ok(row)) => Ok(Some(row)),
509            Some(Err(e)) => Err(anyhow::anyhow!("decision query failed: {e}")),
510            None => Ok(None),
511        }
512    }
513}
514
515impl Drop for SqliteStore {
516    fn drop(&mut self) {
517        // Merge WAL back into main DB so users see a single file when idle.
518        let _ = self.conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
519    }
520}
521
522// ── Decision helpers ────────────────────────────────────────────────
523
524/// Check if an event is a decision (note with "decision" tag).
525fn is_decision_event(event: &Event) -> bool {
526    event.event_type == "note"
527        && event
528            .payload
529            .get("tags")
530            .and_then(|v| v.as_array())
531            .map(|arr| arr.iter().any(|t| t.as_str() == Some("decision")))
532            .unwrap_or(false)
533}
534
535/// Check if a payload JSON contains a "decision" tag.
536fn is_decision_payload(payload: &serde_json::Value) -> bool {
537    payload
538        .get("tags")
539        .and_then(|v| v.as_array())
540        .map(|arr| arr.iter().any(|t| t.as_str() == Some("decision")))
541        .unwrap_or(false)
542}
543
544/// Extract (key, value, reason) from a payload.
545/// Prefers structured `payload.decision`, falls back to text parse.
546fn extract_decision_from_payload(payload: &serde_json::Value) -> (String, String, String) {
547    if let Some(d) = payload.get("decision") {
548        let key = d
549            .get("key")
550            .and_then(|v| v.as_str())
551            .unwrap_or("")
552            .to_string();
553        let value = d
554            .get("value")
555            .and_then(|v| v.as_str())
556            .unwrap_or("")
557            .to_string();
558        let reason = d
559            .get("reason")
560            .and_then(|v| v.as_str())
561            .unwrap_or("")
562            .to_string();
563        return (key, value, reason);
564    }
565    // Fallback: parse text "key: value — reason"
566    let text = payload.get("text").and_then(|v| v.as_str()).unwrap_or("");
567    let (key, rest) = match text.split_once(": ") {
568        Some((k, r)) => (k.to_string(), r),
569        None => return (String::new(), text.to_string(), String::new()),
570    };
571    let (value, reason) = match rest.split_once(" — ") {
572        Some((v, r)) => (v.to_string(), r.to_string()),
573        None => (rest.to_string(), String::new()),
574    };
575    (key, value, reason)
576}
577
578/// Extract domain from a decision key: "db.engine" → "db".
579fn extract_domain(key: &str) -> String {
580    key.split('.').next().unwrap_or(key).to_string()
581}
582
583fn map_decision_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DecisionRow> {
584    Ok(DecisionRow {
585        event_id: row.get(0)?,
586        key: row.get(1)?,
587        value: row.get(2)?,
588        reason: row.get(3)?,
589        domain: row.get(4)?,
590        branch: row.get(5)?,
591        supersedes_id: row.get(6)?,
592        is_active: row.get(7)?,
593        ts: row.get(8)?,
594    })
595}
596
597// ── Internal helpers ────────────────────────────────────────────────
598
599/// Intermediate row struct for deserialization.
600struct EventRow {
601    event_id: String,
602    ts: String,
603    event_type: String,
604    branch: String,
605    parent_hash: Option<String>,
606    hash: String,
607    payload_str: String,
608    refs_blobs_str: String,
609    refs_events_str: String,
610    refs_prov_str: String,
611    schema_version: u32,
612    digests_str: String,
613    event_family: Option<String>,
614    event_level: Option<String>,
615}
616
617fn row_to_event(row: EventRow) -> anyhow::Result<Event> {
618    let payload: serde_json::Value = serde_json::from_str(&row.payload_str)?;
619    let blobs: Vec<String> = serde_json::from_str(&row.refs_blobs_str)?;
620    let events: Vec<String> = serde_json::from_str(&row.refs_events_str)?;
621    let provenance: Vec<Provenance> = serde_json::from_str(&row.refs_prov_str)?;
622    let digests: Vec<Digest> = serde_json::from_str(&row.digests_str)?;
623
624    Ok(Event {
625        event_id: row.event_id,
626        ts: row.ts,
627        event_type: row.event_type,
628        branch: row.branch,
629        parent_hash: row.parent_hash,
630        hash: row.hash,
631        payload,
632        refs: Refs {
633            blobs,
634            events,
635            provenance,
636        },
637        schema_version: row.schema_version,
638        digests,
639        event_family: row.event_family,
640        event_level: row.event_level,
641    })
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use edda_core::event::new_note_event;
648    use std::sync::atomic::{AtomicU64, Ordering};
649
650    static COUNTER: AtomicU64 = AtomicU64::new(0);
651
652    fn tmp_db() -> (std::path::PathBuf, SqliteStore) {
653        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
654        let dir = std::env::temp_dir().join(format!("edda_sqlite_test_{}_{n}", std::process::id()));
655        let _ = std::fs::remove_dir_all(&dir);
656        std::fs::create_dir_all(&dir).unwrap();
657        let db_path = dir.join("ledger.db");
658        let store = SqliteStore::open_or_create(&db_path).unwrap();
659        (dir, store)
660    }
661
662    #[test]
663    fn schema_creation() {
664        let (dir, store) = tmp_db();
665        // Verify tables exist
666        let tables: Vec<String> = store
667            .conn
668            .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
669            .unwrap()
670            .query_map([], |row| row.get(0))
671            .unwrap()
672            .collect::<Result<Vec<_>, _>>()
673            .unwrap();
674        assert!(tables.contains(&"events".to_string()));
675        assert!(tables.contains(&"refs".to_string()));
676        assert!(tables.contains(&"schema_meta".to_string()));
677        drop(store);
678        let _ = std::fs::remove_dir_all(&dir);
679    }
680
681    #[test]
682    fn event_round_trip() {
683        let (dir, store) = tmp_db();
684        let e1 = new_note_event("main", None, "system", "first note", &["test".into()]).unwrap();
685        store.append_event(&e1).unwrap();
686
687        let e2 = new_note_event(
688            "main",
689            Some(&e1.hash),
690            "user",
691            "second note",
692            &["test".into()],
693        )
694        .unwrap();
695        store.append_event(&e2).unwrap();
696
697        let events = store.iter_events().unwrap();
698        assert_eq!(events.len(), 2);
699        assert_eq!(events[0].event_id, e1.event_id);
700        assert_eq!(events[0].hash, e1.hash);
701        assert_eq!(events[0].event_type, "note");
702        assert_eq!(events[0].branch, "main");
703        assert_eq!(events[1].event_id, e2.event_id);
704        assert_eq!(events[1].parent_hash, Some(e1.hash.clone()));
705
706        // Payload preserved
707        assert_eq!(events[0].payload["text"], "first note");
708        assert_eq!(events[1].payload["text"], "second note");
709
710        drop(store);
711        let _ = std::fs::remove_dir_all(&dir);
712    }
713
714    #[test]
715    fn last_event_hash_empty() {
716        let (dir, store) = tmp_db();
717        assert_eq!(store.last_event_hash().unwrap(), None);
718        drop(store);
719        let _ = std::fs::remove_dir_all(&dir);
720    }
721
722    #[test]
723    fn last_event_hash_returns_latest() {
724        let (dir, store) = tmp_db();
725        let e1 = new_note_event("main", None, "system", "init", &[]).unwrap();
726        store.append_event(&e1).unwrap();
727        assert_eq!(store.last_event_hash().unwrap(), Some(e1.hash.clone()));
728
729        let e2 = new_note_event("main", Some(&e1.hash), "user", "hello", &[]).unwrap();
730        store.append_event(&e2).unwrap();
731        assert_eq!(store.last_event_hash().unwrap(), Some(e2.hash.clone()));
732
733        drop(store);
734        let _ = std::fs::remove_dir_all(&dir);
735    }
736
737    #[test]
738    fn refs_head_branch() {
739        let (dir, store) = tmp_db();
740        // HEAD not set yet
741        assert!(store.head_branch().is_err());
742
743        store.set_head_branch("main").unwrap();
744        assert_eq!(store.head_branch().unwrap(), "main");
745
746        store.set_head_branch("feat/x").unwrap();
747        assert_eq!(store.head_branch().unwrap(), "feat/x");
748
749        drop(store);
750        let _ = std::fs::remove_dir_all(&dir);
751    }
752
753    #[test]
754    fn refs_branches_json() {
755        let (dir, store) = tmp_db();
756        let json = serde_json::json!({
757            "branches": {
758                "main": { "created_at": "2026-01-01T00:00:00Z" }
759            }
760        });
761        store.set_branches_json(&json).unwrap();
762        let loaded = store.branches_json().unwrap();
763        assert_eq!(loaded, json);
764
765        drop(store);
766        let _ = std::fs::remove_dir_all(&dir);
767    }
768
769    #[test]
770    fn event_with_refs() {
771        let (dir, store) = tmp_db();
772        let mut e =
773            new_note_event("main", None, "system", "with refs", &["decision".into()]).unwrap();
774        e.refs.blobs = vec!["blob:sha256:abc123".to_string()];
775        e.refs.events = vec!["evt_prior".to_string()];
776        e.refs.provenance = vec![Provenance {
777            target: "evt_old".to_string(),
778            rel: "supersedes".to_string(),
779            note: Some("re-decided".to_string()),
780        }];
781
782        store.append_event(&e).unwrap();
783        let events = store.iter_events().unwrap();
784        assert_eq!(events.len(), 1);
785        assert_eq!(events[0].refs.blobs, vec!["blob:sha256:abc123"]);
786        assert_eq!(events[0].refs.events, vec!["evt_prior"]);
787        assert_eq!(events[0].refs.provenance.len(), 1);
788        assert_eq!(events[0].refs.provenance[0].rel, "supersedes");
789        assert_eq!(
790            events[0].refs.provenance[0].note.as_deref(),
791            Some("re-decided")
792        );
793
794        drop(store);
795        let _ = std::fs::remove_dir_all(&dir);
796    }
797
798    #[test]
799    fn wal_checkpoint_on_drop() {
800        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
801        let dir = std::env::temp_dir().join(format!("edda_sqlite_wal_{}_{n}", std::process::id()));
802        let _ = std::fs::remove_dir_all(&dir);
803        std::fs::create_dir_all(&dir).unwrap();
804        let db_path = dir.join("ledger.db");
805
806        {
807            let store = SqliteStore::open_or_create(&db_path).unwrap();
808            let e = new_note_event("main", None, "system", "wal test", &[]).unwrap();
809            store.append_event(&e).unwrap();
810            // Drop triggers checkpoint
811        }
812
813        // After drop, WAL should be checkpointed (file may still exist but be empty)
814        assert!(db_path.exists());
815        let wal_path = dir.join("ledger.db-wal");
816        if wal_path.exists() {
817            // WAL file exists but should be 0 bytes after TRUNCATE checkpoint
818            let size = std::fs::metadata(&wal_path).unwrap().len();
819            assert_eq!(size, 0, "WAL file should be empty after checkpoint");
820        }
821
822        let _ = std::fs::remove_dir_all(&dir);
823    }
824
825    #[test]
826    fn event_ordering_preserved() {
827        let (dir, store) = tmp_db();
828        let mut prev_hash: Option<String> = None;
829        for i in 0..10 {
830            let e = new_note_event(
831                "main",
832                prev_hash.as_deref(),
833                "system",
834                &format!("event {i}"),
835                &[],
836            )
837            .unwrap();
838            prev_hash = Some(e.hash.clone());
839            store.append_event(&e).unwrap();
840        }
841
842        let events = store.iter_events().unwrap();
843        assert_eq!(events.len(), 10);
844        for (i, event) in events.iter().enumerate() {
845            assert_eq!(event.payload["text"], format!("event {i}"));
846        }
847        // Verify hash chain
848        for i in 1..10 {
849            assert_eq!(
850                events[i].parent_hash.as_deref(),
851                Some(events[i - 1].hash.as_str())
852            );
853        }
854
855        drop(store);
856        let _ = std::fs::remove_dir_all(&dir);
857    }
858
859    #[test]
860    fn duplicate_event_id_errors() {
861        let (dir, store) = tmp_db();
862        let e = new_note_event("main", None, "system", "first", &[]).unwrap();
863        store.append_event(&e).unwrap();
864        // Same event_id should fail (UNIQUE constraint)
865        assert!(store.append_event(&e).is_err());
866        drop(store);
867        let _ = std::fs::remove_dir_all(&dir);
868    }
869
870    #[test]
871    fn idempotent_schema_apply() {
872        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
873        let dir = std::env::temp_dir().join(format!("edda_sqlite_idem_{}_{n}", std::process::id()));
874        let _ = std::fs::remove_dir_all(&dir);
875        std::fs::create_dir_all(&dir).unwrap();
876        let db_path = dir.join("ledger.db");
877
878        // Create twice — should not error
879        let store1 = SqliteStore::open_or_create(&db_path).unwrap();
880        store1.set_head_branch("main").unwrap();
881        drop(store1);
882
883        let store2 = SqliteStore::open_or_create(&db_path).unwrap();
884        assert_eq!(store2.head_branch().unwrap(), "main");
885        drop(store2);
886
887        let _ = std::fs::remove_dir_all(&dir);
888    }
889
890    // ── Decision tests ──────────────────────────────────────────────
891
892    fn make_decision_event(
893        branch: &str,
894        key: &str,
895        value: &str,
896        reason: Option<&str>,
897        supersedes: Option<&str>,
898    ) -> Event {
899        use edda_core::event::finalize_event;
900
901        let text = match reason {
902            Some(r) => format!("{key}: {value} — {r}"),
903            None => format!("{key}: {value}"),
904        };
905        let tags = vec!["decision".to_string()];
906        let mut event = new_note_event(branch, None, "system", &text, &tags).unwrap();
907
908        // Inject structured decision object
909        let decision_obj = match reason {
910            Some(r) => serde_json::json!({"key": key, "value": value, "reason": r}),
911            None => serde_json::json!({"key": key, "value": value}),
912        };
913        event.payload["decision"] = decision_obj;
914
915        if let Some(target) = supersedes {
916            event.refs.provenance.push(Provenance {
917                target: target.to_string(),
918                rel: "supersedes".to_string(),
919                note: Some(format!("key '{key}' re-decided")),
920            });
921        }
922
923        finalize_event(&mut event);
924        event
925    }
926
927    #[test]
928    fn decision_materialized_on_append() {
929        let (dir, store) = tmp_db();
930        let e = make_decision_event("main", "db.engine", "postgres", Some("JSONB support"), None);
931        store.append_event(&e).unwrap();
932
933        let active = store.active_decisions(None, None).unwrap();
934        assert_eq!(active.len(), 1);
935        assert_eq!(active[0].key, "db.engine");
936        assert_eq!(active[0].value, "postgres");
937        assert_eq!(active[0].reason, "JSONB support");
938        assert_eq!(active[0].domain, "db");
939        assert_eq!(active[0].branch, "main");
940        assert!(active[0].is_active);
941        assert!(active[0].supersedes_id.is_none());
942
943        drop(store);
944        let _ = std::fs::remove_dir_all(&dir);
945    }
946
947    #[test]
948    fn supersede_deactivates_prior() {
949        let (dir, store) = tmp_db();
950        let d1 = make_decision_event("main", "db.engine", "mysql", None, None);
951        let d1_id = d1.event_id.clone();
952        store.append_event(&d1).unwrap();
953
954        let d2 = make_decision_event("main", "db.engine", "postgres", Some("JSONB"), Some(&d1_id));
955        store.append_event(&d2).unwrap();
956
957        let active = store.active_decisions(None, None).unwrap();
958        assert_eq!(active.len(), 1);
959        assert_eq!(active[0].value, "postgres");
960        assert_eq!(active[0].supersedes_id.as_deref(), Some(d1_id.as_str()));
961
962        // Timeline should show both
963        let timeline = store.decision_timeline("db.engine").unwrap();
964        assert_eq!(timeline.len(), 2);
965        assert!(!timeline[0].is_active); // mysql deactivated
966        assert!(timeline[1].is_active); // postgres active
967
968        drop(store);
969        let _ = std::fs::remove_dir_all(&dir);
970    }
971
972    #[test]
973    fn domain_auto_extracted() {
974        let (dir, store) = tmp_db();
975        store
976            .append_event(&make_decision_event(
977                "main",
978                "db.engine",
979                "postgres",
980                None,
981                None,
982            ))
983            .unwrap();
984        store
985            .append_event(&make_decision_event(
986                "main",
987                "db.pool_size",
988                "10",
989                None,
990                None,
991            ))
992            .unwrap();
993        store
994            .append_event(&make_decision_event(
995                "main",
996                "auth.method",
997                "JWT",
998                None,
999                None,
1000            ))
1001            .unwrap();
1002
1003        let db_decisions = store.active_decisions(Some("db"), None).unwrap();
1004        assert_eq!(db_decisions.len(), 2);
1005
1006        let auth_decisions = store.active_decisions(Some("auth"), None).unwrap();
1007        assert_eq!(auth_decisions.len(), 1);
1008        assert_eq!(auth_decisions[0].key, "auth.method");
1009
1010        drop(store);
1011        let _ = std::fs::remove_dir_all(&dir);
1012    }
1013
1014    #[test]
1015    fn legacy_text_only_decision() {
1016        let (dir, store) = tmp_db();
1017        // Old-format event: no payload.decision field, only text
1018        use edda_core::event::finalize_event;
1019
1020        let tags = vec!["decision".to_string()];
1021        let mut event = new_note_event(
1022            "main",
1023            None,
1024            "system",
1025            "orm: sqlx — compile-time checks",
1026            &tags,
1027        )
1028        .unwrap();
1029        // Do NOT add payload.decision — simulate legacy format
1030        // Remove it if new_note_event somehow adds it (it doesn't)
1031        event.payload.as_object_mut().unwrap().remove("decision");
1032        finalize_event(&mut event);
1033        store.append_event(&event).unwrap();
1034
1035        let active = store.active_decisions(None, None).unwrap();
1036        assert_eq!(active.len(), 1);
1037        assert_eq!(active[0].key, "orm");
1038        assert_eq!(active[0].value, "sqlx");
1039        assert_eq!(active[0].reason, "compile-time checks");
1040
1041        drop(store);
1042        let _ = std::fs::remove_dir_all(&dir);
1043    }
1044
1045    #[test]
1046    fn active_decisions_key_pattern_search() {
1047        let (dir, store) = tmp_db();
1048        store
1049            .append_event(&make_decision_event(
1050                "main",
1051                "db.engine",
1052                "postgres",
1053                None,
1054                None,
1055            ))
1056            .unwrap();
1057        store
1058            .append_event(&make_decision_event(
1059                "main",
1060                "auth.method",
1061                "JWT",
1062                None,
1063                None,
1064            ))
1065            .unwrap();
1066        store
1067            .append_event(&make_decision_event(
1068                "main",
1069                "cache.driver",
1070                "redis",
1071                None,
1072                None,
1073            ))
1074            .unwrap();
1075
1076        // Search by key/value pattern
1077        let results = store.active_decisions(None, Some("postgres")).unwrap();
1078        assert_eq!(results.len(), 1);
1079        assert_eq!(results[0].key, "db.engine");
1080
1081        let results = store.active_decisions(None, Some("auth")).unwrap();
1082        assert_eq!(results.len(), 1);
1083        assert_eq!(results[0].key, "auth.method");
1084
1085        drop(store);
1086        let _ = std::fs::remove_dir_all(&dir);
1087    }
1088
1089    #[test]
1090    fn find_active_decision_by_branch_key() {
1091        let (dir, store) = tmp_db();
1092        store
1093            .append_event(&make_decision_event(
1094                "main",
1095                "db.engine",
1096                "postgres",
1097                None,
1098                None,
1099            ))
1100            .unwrap();
1101
1102        let found = store.find_active_decision("main", "db.engine").unwrap();
1103        assert!(found.is_some());
1104        assert_eq!(found.unwrap().value, "postgres");
1105
1106        let not_found = store.find_active_decision("main", "db.pool_size").unwrap();
1107        assert!(not_found.is_none());
1108
1109        let wrong_branch = store.find_active_decision("dev", "db.engine").unwrap();
1110        assert!(wrong_branch.is_none());
1111
1112        drop(store);
1113        let _ = std::fs::remove_dir_all(&dir);
1114    }
1115
1116    #[test]
1117    fn branch_scoped_supersession() {
1118        let (dir, store) = tmp_db();
1119        // Same key on different branches — both should stay active
1120        store
1121            .append_event(&make_decision_event(
1122                "main",
1123                "db.engine",
1124                "postgres",
1125                None,
1126                None,
1127            ))
1128            .unwrap();
1129        store
1130            .append_event(&make_decision_event(
1131                "dev",
1132                "db.engine",
1133                "sqlite",
1134                None,
1135                None,
1136            ))
1137            .unwrap();
1138
1139        let all = store.active_decisions(None, None).unwrap();
1140        assert_eq!(all.len(), 2);
1141
1142        let main = store
1143            .find_active_decision("main", "db.engine")
1144            .unwrap()
1145            .unwrap();
1146        assert_eq!(main.value, "postgres");
1147
1148        let dev = store
1149            .find_active_decision("dev", "db.engine")
1150            .unwrap()
1151            .unwrap();
1152        assert_eq!(dev.value, "sqlite");
1153
1154        drop(store);
1155        let _ = std::fs::remove_dir_all(&dir);
1156    }
1157
1158    #[test]
1159    fn schema_migration_v1_to_v2() {
1160        let n = COUNTER.fetch_add(1, Ordering::SeqCst);
1161        let dir =
1162            std::env::temp_dir().join(format!("edda_sqlite_migrate_{}_{n}", std::process::id()));
1163        let _ = std::fs::remove_dir_all(&dir);
1164        std::fs::create_dir_all(&dir).unwrap();
1165        let db_path = dir.join("ledger.db");
1166
1167        // Create a v1 database manually (only base schema)
1168        {
1169            let conn = rusqlite::Connection::open(&db_path).unwrap();
1170            conn.execute_batch(SCHEMA_SQL).unwrap();
1171            conn.execute(
1172                "INSERT INTO schema_meta (key, value) VALUES ('version', '1')",
1173                [],
1174            )
1175            .unwrap();
1176
1177            // Insert a decision event directly into v1 events table
1178            conn.execute(
1179                "INSERT INTO events (event_id, ts, event_type, branch, hash, payload, schema_version)
1180                 VALUES ('evt_v1', '2026-01-01T00:00:00Z', 'note', 'main', 'abc', ?1, 1)",
1181                params![serde_json::to_string(&serde_json::json!({
1182                    "role": "system",
1183                    "text": "db.engine: postgres — need JSONB",
1184                    "tags": ["decision"],
1185                    "decision": {"key": "db.engine", "value": "postgres", "reason": "need JSONB"}
1186                })).unwrap()],
1187            ).unwrap();
1188        }
1189
1190        // Open with open_or_create — should trigger v1→v2 migration
1191        let store = SqliteStore::open_or_create(&db_path).unwrap();
1192        assert_eq!(store.schema_version().unwrap(), 2);
1193
1194        // Verify decisions table was populated by backfill
1195        let active = store.active_decisions(None, None).unwrap();
1196        assert_eq!(active.len(), 1);
1197        assert_eq!(active[0].key, "db.engine");
1198        assert_eq!(active[0].value, "postgres");
1199        assert_eq!(active[0].domain, "db");
1200
1201        drop(store);
1202        let _ = std::fs::remove_dir_all(&dir);
1203    }
1204
1205    #[test]
1206    fn non_decision_event_not_materialized() {
1207        let (dir, store) = tmp_db();
1208        // Regular note (no decision tag)
1209        let e = new_note_event("main", None, "system", "just a note", &["todo".into()]).unwrap();
1210        store.append_event(&e).unwrap();
1211
1212        let active = store.active_decisions(None, None).unwrap();
1213        assert!(active.is_empty());
1214
1215        drop(store);
1216        let _ = std::fs::remove_dir_all(&dir);
1217    }
1218
1219    #[test]
1220    fn domain_timeline_returns_active_and_superseded() {
1221        let (dir, store) = tmp_db();
1222        let d1 = make_decision_event("main", "db.engine", "sqlite", Some("MVP"), None);
1223        let d1_id = d1.event_id.clone();
1224        store.append_event(&d1).unwrap();
1225
1226        let d2 = make_decision_event("main", "db.engine", "postgres", Some("JSONB"), Some(&d1_id));
1227        store.append_event(&d2).unwrap();
1228
1229        // Also add a decision in a different domain
1230        store
1231            .append_event(&make_decision_event(
1232                "main",
1233                "auth.method",
1234                "JWT",
1235                None,
1236                None,
1237            ))
1238            .unwrap();
1239
1240        let timeline = store.domain_timeline("db").unwrap();
1241        assert_eq!(timeline.len(), 2);
1242        assert_eq!(timeline[0].value, "sqlite");
1243        assert!(!timeline[0].is_active);
1244        assert_eq!(timeline[1].value, "postgres");
1245        assert!(timeline[1].is_active);
1246
1247        drop(store);
1248        let _ = std::fs::remove_dir_all(&dir);
1249    }
1250
1251    #[test]
1252    fn domain_timeline_empty_for_unknown_domain() {
1253        let (dir, store) = tmp_db();
1254        store
1255            .append_event(&make_decision_event(
1256                "main",
1257                "db.engine",
1258                "postgres",
1259                None,
1260                None,
1261            ))
1262            .unwrap();
1263
1264        let timeline = store.domain_timeline("nonexistent").unwrap();
1265        assert!(timeline.is_empty());
1266
1267        drop(store);
1268        let _ = std::fs::remove_dir_all(&dir);
1269    }
1270
1271    #[test]
1272    fn list_domains_returns_sorted_unique() {
1273        let (dir, store) = tmp_db();
1274        store
1275            .append_event(&make_decision_event(
1276                "main",
1277                "db.engine",
1278                "postgres",
1279                None,
1280                None,
1281            ))
1282            .unwrap();
1283        store
1284            .append_event(&make_decision_event(
1285                "main",
1286                "db.pool_size",
1287                "10",
1288                None,
1289                None,
1290            ))
1291            .unwrap();
1292        store
1293            .append_event(&make_decision_event(
1294                "main",
1295                "auth.method",
1296                "JWT",
1297                None,
1298                None,
1299            ))
1300            .unwrap();
1301
1302        let domains = store.list_domains().unwrap();
1303        assert_eq!(domains, vec!["auth", "db"]);
1304
1305        drop(store);
1306        let _ = std::fs::remove_dir_all(&dir);
1307    }
1308
1309    #[test]
1310    fn list_domains_excludes_superseded_only() {
1311        let (dir, store) = tmp_db();
1312        let d1 = make_decision_event("main", "cache.strategy", "redis", None, None);
1313        let d1_id = d1.event_id.clone();
1314        store.append_event(&d1).unwrap();
1315
1316        // Supersede with a different domain key
1317        let d2 = make_decision_event("main", "cache.strategy", "memcached", None, Some(&d1_id));
1318        store.append_event(&d2).unwrap();
1319
1320        // "cache" should still appear (d2 is active)
1321        let domains = store.list_domains().unwrap();
1322        assert!(domains.contains(&"cache".to_string()));
1323
1324        // Now supersede d2 but with NO replacement active
1325        // We can't easily do this with current API, so instead test
1326        // that a domain with only superseded decisions is excluded:
1327        // Create a new domain, supersede its only decision, check it disappears
1328        let d3 = make_decision_event("main", "temp.flag", "on", None, None);
1329        let d3_id = d3.event_id.clone();
1330        store.append_event(&d3).unwrap();
1331        assert!(store.list_domains().unwrap().contains(&"temp".to_string()));
1332
1333        let d4 = make_decision_event("main", "temp.flag", "off", None, Some(&d3_id));
1334        store.append_event(&d4).unwrap();
1335        // "temp" still has active decision (d4)
1336        assert!(store.list_domains().unwrap().contains(&"temp".to_string()));
1337
1338        drop(store);
1339        let _ = std::fs::remove_dir_all(&dir);
1340    }
1341
1342    #[test]
1343    fn decisions_table_in_schema() {
1344        let (dir, store) = tmp_db();
1345        let tables: Vec<String> = store
1346            .conn
1347            .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
1348            .unwrap()
1349            .query_map([], |row| row.get(0))
1350            .unwrap()
1351            .collect::<Result<Vec<_>, _>>()
1352            .unwrap();
1353        assert!(tables.contains(&"decisions".to_string()));
1354        drop(store);
1355        let _ = std::fs::remove_dir_all(&dir);
1356    }
1357}