Skip to main content

harn_vm/flow/
store.rs

1//! SQLite-backed Harn Flow atom DAG store.
2//!
3//! The store is intentionally narrow: atoms are append-only, parent edges are
4//! indexed for DAG traversal, and state vectors track per-site clocks for
5//! causal delta sync between replicas. It also implements [`VcsBackend`] so the
6//! same flow shipping surface can use durable SQLite storage.
7
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::fmt;
10use std::path::Path;
11use std::sync::{Mutex, MutexGuard};
12
13use rusqlite::{params, Connection, OptionalExtension, Transaction};
14use serde::{Deserialize, Serialize};
15use time::format_description::well_known::Rfc3339;
16use time::OffsetDateTime;
17
18use super::backend::{AtomRef, FlowSlice, GitExportReceipt, ShipReceipt, VcsBackend};
19use super::{Atom, AtomId, Intent, IntentId, Slice as DerivedSlice, SliceId, VcsBackendError};
20
21const SQLITE_ATOM_REF_PREFIX: &str = "sqlite://atoms";
22const SQLITE_SLICE_REF_PREFIX: &str = "sqlite://slices";
23
24/// Per-site causal clock vector for one principal/persona stream.
25#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
26pub struct StateVector {
27    clocks: BTreeMap<String, u64>,
28}
29
30impl StateVector {
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    pub fn insert(&mut self, site_id: impl Into<String>, clock: u64) {
36        self.clocks.insert(site_id.into(), clock);
37    }
38
39    pub fn clock(&self, site_id: &str) -> u64 {
40        self.clocks.get(site_id).copied().unwrap_or(0)
41    }
42
43    pub fn iter(&self) -> impl Iterator<Item = (&str, u64)> {
44        self.clocks
45            .iter()
46            .map(|(site_id, clock)| (site_id.as_str(), *clock))
47    }
48}
49
50/// Atom plus the site clock needed to apply it to another replica.
51#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct AtomDelta {
53    pub atom: Atom,
54    pub site_id: String,
55    pub clock: u64,
56}
57
58/// Persisted derived slice plus immutable store audit metadata.
59#[derive(Clone, Debug, PartialEq, Eq)]
60pub struct StoredDerivedSlice {
61    pub slice: DerivedSlice,
62    pub created_at: String,
63}
64
65/// SQLite-backed Flow store.
66pub struct SqliteFlowStore {
67    site_id: String,
68    conn: Mutex<Connection>,
69}
70
71impl fmt::Debug for SqliteFlowStore {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("SqliteFlowStore")
74            .field("site_id", &self.site_id)
75            .finish_non_exhaustive()
76    }
77}
78
79impl SqliteFlowStore {
80    /// Open or create a store at `path` using `site_id` for locally emitted
81    /// atoms.
82    pub fn open(
83        path: impl AsRef<Path>,
84        site_id: impl Into<String>,
85    ) -> Result<Self, VcsBackendError> {
86        let site_id = normalize_site_id(site_id.into())?;
87        let conn = Connection::open(path)?;
88        initialize_schema(&conn)?;
89        Ok(Self {
90            site_id,
91            conn: Mutex::new(conn),
92        })
93    }
94
95    /// Create an in-memory store for tests and ephemeral callers.
96    pub fn in_memory(site_id: impl Into<String>) -> Result<Self, VcsBackendError> {
97        let site_id = normalize_site_id(site_id.into())?;
98        let conn = Connection::open_in_memory()?;
99        initialize_schema(&conn)?;
100        Ok(Self {
101            site_id,
102            conn: Mutex::new(conn),
103        })
104    }
105
106    pub fn site_id(&self) -> &str {
107        &self.site_id
108    }
109
110    /// Persist multiple locally emitted atoms in one transaction.
111    pub fn emit_atoms(&self, atoms: &[Atom]) -> Result<Vec<AtomRef>, VcsBackendError> {
112        self.emit_atoms_inner(atoms, true)
113    }
114
115    /// Persist new atoms that the caller has already verified.
116    ///
117    /// This is intended for sync and benchmark hot paths that validate a batch
118    /// once at the boundary, then measure storage throughput independently from
119    /// signature verification cost. The caller must guarantee these atoms are
120    /// not already present in the store.
121    pub fn emit_preverified_atoms(&self, atoms: &[Atom]) -> Result<Vec<AtomRef>, VcsBackendError> {
122        let mut conn = self.lock_conn()?;
123        let tx = conn.transaction()?;
124        let mut clocks: HashMap<(String, String), u64> = HashMap::new();
125        let mut refs = Vec::with_capacity(atoms.len());
126
127        {
128            let mut insert_atom = tx.prepare_cached(
129                "INSERT INTO atoms (
130                     id, principal, persona, timestamp_ns, timestamp_rfc3339,
131                     site_id, site_clock, inverse_of, body_binary
132                 )
133                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
134            )?;
135            let mut insert_parent = tx.prepare_cached(
136                "INSERT INTO atom_parents (child_id, parent_id, ordinal)
137                 VALUES (?1, ?2, ?3)",
138            )?;
139
140            for atom in atoms {
141                let key = (
142                    atom.provenance.principal.clone(),
143                    atom.provenance.persona.clone(),
144                );
145                if !clocks.contains_key(&key) {
146                    let current = state_vector_clock_tx(
147                        &tx,
148                        &atom.provenance.principal,
149                        &atom.provenance.persona,
150                        &self.site_id,
151                    )?;
152                    clocks.insert(key.clone(), current);
153                }
154                let clock = clocks
155                    .get_mut(&key)
156                    .expect("clock was inserted before increment");
157                *clock = clock
158                    .checked_add(1)
159                    .ok_or_else(|| VcsBackendError::Invalid("site clock overflow".to_string()))?;
160
161                let body = atom.to_binary()?;
162                let timestamp_ns = atom_timestamp_ns(atom)?;
163                let timestamp_rfc3339 = atom_timestamp_rfc3339(atom)?;
164                let inverse_of = atom.inverse_of.map(|id| id.0.to_vec());
165                insert_atom.execute(params![
166                    atom.id.0.as_slice(),
167                    atom.provenance.principal,
168                    atom.provenance.persona,
169                    timestamp_ns,
170                    timestamp_rfc3339,
171                    self.site_id.as_str(),
172                    i64_from_u64(*clock, "atom site clock")?,
173                    inverse_of.as_deref(),
174                    body.as_slice(),
175                ])?;
176
177                for (ordinal, parent) in atom.parents.iter().enumerate() {
178                    insert_parent.execute(params![
179                        atom.id.0.as_slice(),
180                        parent.0.as_slice(),
181                        i64_from_usize(ordinal, "atom parent ordinal")?
182                    ])?;
183                }
184                refs.push(sqlite_atom_ref(atom.id, &self.site_id, *clock));
185            }
186        }
187
188        for ((principal, persona), clock) in clocks {
189            advance_state_vector_tx(&tx, &principal, &persona, &self.site_id, clock)?;
190        }
191        tx.commit()?;
192        Ok(refs)
193    }
194
195    fn emit_atoms_inner(
196        &self,
197        atoms: &[Atom],
198        verify: bool,
199    ) -> Result<Vec<AtomRef>, VcsBackendError> {
200        let mut conn = self.lock_conn()?;
201        let tx = conn.transaction()?;
202        let mut refs = Vec::with_capacity(atoms.len());
203        for atom in atoms {
204            if verify {
205                atom.verify()?;
206            }
207            refs.push(insert_atom_tx(&tx, atom, &self.site_id, None)?);
208        }
209        tx.commit()?;
210        Ok(refs)
211    }
212
213    /// Persist a remote atom at its original site clock.
214    pub fn insert_remote_atom(
215        &self,
216        atom: &Atom,
217        site_id: &str,
218        clock: u64,
219    ) -> Result<AtomRef, VcsBackendError> {
220        atom.verify()?;
221        if clock == 0 {
222            return Err(VcsBackendError::Invalid(
223                "remote atom clock must be greater than zero".to_string(),
224            ));
225        }
226        let site_id = normalize_site_id(site_id.to_string())?;
227        let mut conn = self.lock_conn()?;
228        let tx = conn.transaction()?;
229        let atom_ref = insert_atom_tx(&tx, atom, &site_id, Some(clock))?;
230        tx.commit()?;
231        Ok(atom_ref)
232    }
233
234    /// Load one atom by id.
235    pub fn get_atom(&self, atom_id: AtomId) -> Result<Atom, VcsBackendError> {
236        let conn = self.lock_conn()?;
237        load_atom(&conn, atom_id)
238    }
239
240    /// Find an atom by its content hash. For Flow atoms the content hash is the
241    /// atom id, so this uses the primary-key index directly.
242    pub fn atom_by_content_hash(
243        &self,
244        content_hash: AtomId,
245    ) -> Result<Option<Atom>, VcsBackendError> {
246        let conn = self.lock_conn()?;
247        conn.query_row(
248            "SELECT body_binary FROM atoms WHERE id = ?1",
249            params![content_hash.0.as_slice()],
250            |row| row.get::<_, Vec<u8>>(0),
251        )
252        .optional()?
253        .map(|body| Atom::from_binary_slice(&body).map_err(Into::into))
254        .transpose()
255    }
256
257    /// Load atoms for a principal/persona ordered by timestamp and atom id.
258    pub fn atoms_for_principal_persona(
259        &self,
260        principal: &str,
261        persona: &str,
262    ) -> Result<Vec<Atom>, VcsBackendError> {
263        let conn = self.lock_conn()?;
264        let mut stmt = conn.prepare(
265            "SELECT id FROM atoms
266             WHERE principal = ?1 AND persona = ?2
267             ORDER BY timestamp_ns, id",
268        )?;
269        let rows = stmt.query_map(params![principal, persona], |row| row.get::<_, Vec<u8>>(0))?;
270        let mut atoms = Vec::new();
271        for row in rows {
272            atoms.push(load_atom(&conn, atom_id_from_blob(row?)?)?);
273        }
274        Ok(atoms)
275    }
276
277    /// Count atoms for a principal/persona using the timestamp index.
278    pub fn atom_count_for_principal_persona(
279        &self,
280        principal: &str,
281        persona: &str,
282    ) -> Result<u64, VcsBackendError> {
283        let conn = self.lock_conn()?;
284        let count = conn.query_row(
285            "SELECT COUNT(*) FROM atoms WHERE principal = ?1 AND persona = ?2",
286            params![principal, persona],
287            |row| row.get::<_, i64>(0),
288        )?;
289        u64_from_i64(count, "atom count")
290    }
291
292    /// Load all child atoms that list `parent` as a parent edge.
293    pub fn atoms_with_parent(&self, parent: AtomId) -> Result<Vec<Atom>, VcsBackendError> {
294        let conn = self.lock_conn()?;
295        let mut stmt = conn.prepare(
296            "SELECT child_id FROM atom_parents
297             WHERE parent_id = ?1
298             ORDER BY child_id",
299        )?;
300        let rows = stmt.query_map(params![parent.0.as_slice()], |row| row.get::<_, Vec<u8>>(0))?;
301        let mut atoms = Vec::new();
302        for row in rows {
303            atoms.push(load_atom(&conn, atom_id_from_blob(row?)?)?);
304        }
305        Ok(atoms)
306    }
307
308    /// Current state vector for one principal/persona stream.
309    pub fn state_vector(
310        &self,
311        principal: &str,
312        persona: &str,
313    ) -> Result<StateVector, VcsBackendError> {
314        let conn = self.lock_conn()?;
315        let mut stmt = conn.prepare(
316            "SELECT site_id, clock FROM state_vectors
317             WHERE principal = ?1 AND persona = ?2
318             ORDER BY site_id",
319        )?;
320        let rows = stmt.query_map(params![principal, persona], |row| {
321            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
322        })?;
323        let mut vector = StateVector::new();
324        for row in rows {
325            let (site_id, clock) = row?;
326            vector.insert(site_id, u64_from_i64(clock, "state vector clock")?);
327        }
328        Ok(vector)
329    }
330
331    /// Return atoms this store has that are newer than `remote`.
332    pub fn causal_delta(
333        &self,
334        principal: &str,
335        persona: &str,
336        remote: &StateVector,
337    ) -> Result<Vec<AtomDelta>, VcsBackendError> {
338        let conn = self.lock_conn()?;
339        let mut stmt = conn.prepare(
340            "SELECT id, site_id, site_clock FROM atoms
341             WHERE principal = ?1 AND persona = ?2
342             ORDER BY site_id, site_clock, id",
343        )?;
344        let rows = stmt.query_map(params![principal, persona], |row| {
345            Ok((
346                row.get::<_, Vec<u8>>(0)?,
347                row.get::<_, String>(1)?,
348                row.get::<_, i64>(2)?,
349            ))
350        })?;
351        let mut delta = Vec::new();
352        for row in rows {
353            let (id_blob, site_id, clock_raw) = row?;
354            let clock = u64_from_i64(clock_raw, "atom site clock")?;
355            if clock > remote.clock(&site_id) {
356                delta.push(AtomDelta {
357                    atom: load_atom(&conn, atom_id_from_blob(id_blob)?)?,
358                    site_id,
359                    clock,
360                });
361            }
362        }
363        Ok(delta)
364    }
365
366    /// Persist an intent record and its atom edges.
367    pub fn put_intent(&self, intent: &Intent) -> Result<(), VcsBackendError> {
368        let body = serde_json::to_vec(intent)?;
369        let mut conn = self.lock_conn()?;
370        let tx = conn.transaction()?;
371        tx.execute(
372            "INSERT OR IGNORE INTO intents (id, body_json, goal_description, confidence)
373             VALUES (?1, ?2, ?3, ?4)",
374            params![
375                intent.id.0.as_slice(),
376                body.as_slice(),
377                intent.goal_description,
378                f64::from(intent.confidence)
379            ],
380        )?;
381        for (ordinal, atom_id) in intent.atoms.iter().enumerate() {
382            tx.execute(
383                "INSERT OR IGNORE INTO intent_atoms (intent_id, atom_id, ordinal)
384                 VALUES (?1, ?2, ?3)",
385                params![
386                    intent.id.0.as_slice(),
387                    atom_id.0.as_slice(),
388                    i64_from_usize(ordinal, "intent atom ordinal")?
389                ],
390            )?;
391        }
392        tx.commit()?;
393        Ok(())
394    }
395
396    pub fn get_intent(&self, intent_id: IntentId) -> Result<Intent, VcsBackendError> {
397        let conn = self.lock_conn()?;
398        let body = conn
399            .query_row(
400                "SELECT body_json FROM intents WHERE id = ?1",
401                params![intent_id.0.as_slice()],
402                |row| row.get::<_, Vec<u8>>(0),
403            )
404            .optional()?
405            .ok_or_else(|| VcsBackendError::NotFound(format!("intent {intent_id} not found")))?;
406        serde_json::from_slice(&body).map_err(Into::into)
407    }
408
409    /// Persist a derived Flow slice record.
410    pub fn put_derived_slice(&self, slice: &DerivedSlice) -> Result<(), VcsBackendError> {
411        let body = serde_json::to_vec(slice)?;
412        self.insert_slice_record(slice.id, &slice.atoms, "derived", body, false)
413    }
414
415    /// Persist a derived Flow slice as shipped.
416    ///
417    /// This writes the immutable shipped record directly; callers should not
418    /// persist an unshipped row first and later mutate it.
419    pub fn put_shipped_derived_slice(&self, slice: &DerivedSlice) -> Result<(), VcsBackendError> {
420        let body = serde_json::to_vec(slice)?;
421        self.insert_slice_record(slice.id, &slice.atoms, "derived", body, true)
422    }
423
424    pub fn get_derived_slice(&self, slice_id: SliceId) -> Result<DerivedSlice, VcsBackendError> {
425        let conn = self.lock_conn()?;
426        let body = conn
427            .query_row(
428                "SELECT body_json FROM slices WHERE id = ?1 AND slice_kind = 'derived'",
429                params![slice_id.0.as_slice()],
430                |row| row.get::<_, Vec<u8>>(0),
431            )
432            .optional()?
433            .ok_or_else(|| VcsBackendError::NotFound(format!("slice {slice_id} not found")))?;
434        serde_json::from_slice(&body).map_err(Into::into)
435    }
436
437    /// List shipped derived slices, optionally filtering by store creation
438    /// timestamp.
439    pub fn shipped_derived_slices_since(
440        &self,
441        since: Option<OffsetDateTime>,
442    ) -> Result<Vec<StoredDerivedSlice>, VcsBackendError> {
443        let since = since
444            .map(|value| value.format(&Rfc3339))
445            .transpose()
446            .map_err(|error| VcsBackendError::Invalid(format!("timestamp format: {error}")))?;
447        let conn = self.lock_conn()?;
448        let mut stmt = conn.prepare(
449            "SELECT body_json, created_at FROM slices
450             WHERE slice_kind = 'derived'
451               AND shipped = 1
452               AND (?1 IS NULL OR created_at >= datetime(?1))
453             ORDER BY created_at, id",
454        )?;
455        let rows = stmt.query_map(params![since.as_deref()], |row| {
456            Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, String>(1)?))
457        })?;
458        let mut slices = Vec::new();
459        for row in rows {
460            let (body, created_at) = row?;
461            slices.push(StoredDerivedSlice {
462                slice: serde_json::from_slice(&body)?,
463                created_at,
464            });
465        }
466        Ok(slices)
467    }
468
469    fn insert_flow_slice(&self, slice: &FlowSlice, shipped: bool) -> Result<(), VcsBackendError> {
470        let body = serde_json::to_vec(slice)?;
471        self.insert_slice_record(slice.id, &slice.atoms, "flow", body, shipped)
472    }
473
474    fn insert_slice_record(
475        &self,
476        slice_id: SliceId,
477        atoms: &[AtomId],
478        kind: &str,
479        body: Vec<u8>,
480        shipped: bool,
481    ) -> Result<(), VcsBackendError> {
482        let mut conn = self.lock_conn()?;
483        let tx = conn.transaction()?;
484        insert_slice_record_tx(&tx, slice_id, atoms, kind, &body, shipped)?;
485        tx.commit()?;
486        Ok(())
487    }
488
489    fn atom_closure(&self, roots: &[AtomId]) -> Result<Vec<AtomId>, VcsBackendError> {
490        let mut opened = HashSet::new();
491        let mut emitted = HashSet::new();
492        let mut out = Vec::new();
493        let mut stack: Vec<(AtomId, bool)> = roots
494            .iter()
495            .rev()
496            .copied()
497            .map(|atom_id| (atom_id, false))
498            .collect();
499
500        while let Some((atom_id, emit)) = stack.pop() {
501            if emit {
502                if emitted.insert(atom_id) {
503                    out.push(atom_id);
504                }
505                continue;
506            }
507            if emitted.contains(&atom_id) || !opened.insert(atom_id) {
508                continue;
509            }
510
511            let atom = self.get_atom(atom_id)?;
512            stack.push((atom_id, true));
513            for parent in atom.parents.iter().rev() {
514                if !emitted.contains(parent) {
515                    stack.push((*parent, false));
516                }
517            }
518        }
519
520        Ok(out)
521    }
522
523    fn lock_conn(&self) -> Result<MutexGuard<'_, Connection>, VcsBackendError> {
524        self.conn
525            .lock()
526            .map_err(|_| VcsBackendError::Io("sqlite flow store lock poisoned".to_string()))
527    }
528}
529
530impl VcsBackend for SqliteFlowStore {
531    fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError> {
532        self.emit_atoms(std::slice::from_ref(atom))
533            .map(|mut refs| refs.remove(0))
534    }
535
536    fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
537        FlowSlice::new(self.atom_closure(atoms)?)
538    }
539
540    fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
541        self.insert_flow_slice(slice, true)?;
542        Ok(ShipReceipt {
543            slice_id: slice.id,
544            commit: slice.id.to_string(),
545            ref_name: format!("{SQLITE_SLICE_REF_PREFIX}/{}", slice.id),
546        })
547    }
548
549    fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
550        let conn = self.lock_conn()?;
551        let mut stmt = conn.prepare(
552            "SELECT id, site_id, site_clock FROM atoms
553             ORDER BY principal, persona, timestamp_ns, id",
554        )?;
555        let rows = stmt.query_map([], |row| {
556            Ok((
557                row.get::<_, Vec<u8>>(0)?,
558                row.get::<_, String>(1)?,
559                row.get::<_, i64>(2)?,
560            ))
561        })?;
562        let mut atoms = Vec::new();
563        for row in rows {
564            let (id_blob, site_id, clock_raw) = row?;
565            atoms.push(sqlite_atom_ref(
566                atom_id_from_blob(id_blob)?,
567                &site_id,
568                u64_from_i64(clock_raw, "atom site clock")?,
569            ));
570        }
571        Ok(atoms)
572    }
573
574    fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
575        slice
576            .atoms
577            .iter()
578            .map(|atom_id| self.get_atom(*atom_id))
579            .collect()
580    }
581
582    fn export_git(
583        &self,
584        _slice: &FlowSlice,
585        _ref_name: &str,
586    ) -> Result<GitExportReceipt, VcsBackendError> {
587        Err(VcsBackendError::Unsupported(
588            "SqliteFlowStore cannot export git refs; use ShadowGitBackend for git export"
589                .to_string(),
590        ))
591    }
592
593    fn import_git(&self, _ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
594        Err(VcsBackendError::Unsupported(
595            "SqliteFlowStore cannot import git refs; use ShadowGitBackend for git import"
596                .to_string(),
597        ))
598    }
599}
600
601fn initialize_schema(conn: &Connection) -> Result<(), VcsBackendError> {
602    conn.execute_batch(
603        r#"
604        PRAGMA foreign_keys = ON;
605        PRAGMA journal_mode = WAL;
606        PRAGMA synchronous = NORMAL;
607
608        CREATE TABLE IF NOT EXISTS atoms (
609            id BLOB PRIMARY KEY CHECK(length(id) = 32),
610            principal TEXT NOT NULL,
611            persona TEXT NOT NULL,
612            timestamp_ns INTEGER NOT NULL,
613            timestamp_rfc3339 TEXT NOT NULL,
614            site_id TEXT NOT NULL,
615            site_clock INTEGER NOT NULL CHECK(site_clock > 0),
616            inverse_of BLOB CHECK(inverse_of IS NULL OR length(inverse_of) = 32),
617            body_binary BLOB NOT NULL,
618            created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
619            UNIQUE(principal, persona, site_id, site_clock)
620        );
621
622        CREATE INDEX IF NOT EXISTS atoms_principal_persona_timestamp_idx
623            ON atoms(principal, persona, timestamp_ns, id);
624        CREATE INDEX IF NOT EXISTS atoms_principal_persona_site_clock_idx
625            ON atoms(principal, persona, site_id, site_clock);
626        CREATE INDEX IF NOT EXISTS atoms_inverse_of_idx ON atoms(inverse_of);
627
628        CREATE TABLE IF NOT EXISTS atom_parents (
629            child_id BLOB NOT NULL CHECK(length(child_id) = 32),
630            parent_id BLOB NOT NULL CHECK(length(parent_id) = 32),
631            ordinal INTEGER NOT NULL CHECK(ordinal >= 0),
632            PRIMARY KEY(child_id, ordinal),
633            UNIQUE(child_id, parent_id),
634            FOREIGN KEY(child_id) REFERENCES atoms(id)
635        );
636        CREATE INDEX IF NOT EXISTS atom_parents_parent_idx
637            ON atom_parents(parent_id, child_id);
638
639        CREATE TABLE IF NOT EXISTS intents (
640            id BLOB PRIMARY KEY CHECK(length(id) = 32),
641            body_json BLOB NOT NULL,
642            goal_description TEXT NOT NULL,
643            confidence REAL NOT NULL,
644            created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
645        );
646
647        CREATE TABLE IF NOT EXISTS intent_atoms (
648            intent_id BLOB NOT NULL CHECK(length(intent_id) = 32),
649            atom_id BLOB NOT NULL CHECK(length(atom_id) = 32),
650            ordinal INTEGER NOT NULL CHECK(ordinal >= 0),
651            PRIMARY KEY(intent_id, ordinal),
652            UNIQUE(intent_id, atom_id),
653            FOREIGN KEY(intent_id) REFERENCES intents(id)
654        );
655        CREATE INDEX IF NOT EXISTS intent_atoms_atom_idx
656            ON intent_atoms(atom_id, intent_id);
657
658        CREATE TABLE IF NOT EXISTS slices (
659            id BLOB PRIMARY KEY CHECK(length(id) = 32),
660            slice_kind TEXT NOT NULL,
661            body_json BLOB NOT NULL,
662            shipped INTEGER NOT NULL DEFAULT 0,
663            ref_name TEXT,
664            created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
665        );
666
667        CREATE TABLE IF NOT EXISTS slice_atoms (
668            slice_id BLOB NOT NULL CHECK(length(slice_id) = 32),
669            atom_id BLOB NOT NULL CHECK(length(atom_id) = 32),
670            ordinal INTEGER NOT NULL CHECK(ordinal >= 0),
671            PRIMARY KEY(slice_id, ordinal),
672            UNIQUE(slice_id, atom_id),
673            FOREIGN KEY(slice_id) REFERENCES slices(id)
674        );
675        CREATE INDEX IF NOT EXISTS slice_atoms_atom_idx
676            ON slice_atoms(atom_id, slice_id);
677
678        CREATE TABLE IF NOT EXISTS state_vectors (
679            principal TEXT NOT NULL,
680            persona TEXT NOT NULL,
681            site_id TEXT NOT NULL,
682            clock INTEGER NOT NULL CHECK(clock >= 0),
683            updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
684            PRIMARY KEY(principal, persona, site_id)
685        );
686
687        CREATE TRIGGER IF NOT EXISTS atoms_no_update
688        BEFORE UPDATE ON atoms
689        BEGIN
690            SELECT RAISE(ABORT, 'atoms are append-only');
691        END;
692
693        CREATE TRIGGER IF NOT EXISTS atoms_no_delete
694        BEFORE DELETE ON atoms
695        BEGIN
696            SELECT RAISE(ABORT, 'atoms are append-only');
697        END;
698
699        CREATE TRIGGER IF NOT EXISTS atom_parents_no_update
700        BEFORE UPDATE ON atom_parents
701        BEGIN
702            SELECT RAISE(ABORT, 'atom parent edges are append-only');
703        END;
704
705        CREATE TRIGGER IF NOT EXISTS atom_parents_no_delete
706        BEFORE DELETE ON atom_parents
707        BEGIN
708            SELECT RAISE(ABORT, 'atom parent edges are append-only');
709        END;
710
711        CREATE TRIGGER IF NOT EXISTS slices_no_update
712        BEFORE UPDATE ON slices
713        BEGIN
714            SELECT RAISE(ABORT, 'slices are append-only');
715        END;
716
717        CREATE TRIGGER IF NOT EXISTS slices_no_delete
718        BEFORE DELETE ON slices
719        BEGIN
720            SELECT RAISE(ABORT, 'slices are append-only');
721        END;
722
723        CREATE TRIGGER IF NOT EXISTS slice_atoms_no_update
724        BEFORE UPDATE ON slice_atoms
725        BEGIN
726            SELECT RAISE(ABORT, 'slice atom edges are append-only');
727        END;
728
729        CREATE TRIGGER IF NOT EXISTS slice_atoms_no_delete
730        BEFORE DELETE ON slice_atoms
731        BEGIN
732            SELECT RAISE(ABORT, 'slice atom edges are append-only');
733        END;
734        "#,
735    )?;
736    Ok(())
737}
738
739fn insert_atom_tx(
740    tx: &Transaction<'_>,
741    atom: &Atom,
742    site_id: &str,
743    explicit_clock: Option<u64>,
744) -> Result<AtomRef, VcsBackendError> {
745    if let Some((existing_site, existing_clock)) = atom_clock_tx(tx, atom.id)? {
746        return Ok(sqlite_atom_ref(atom.id, &existing_site, existing_clock));
747    }
748
749    let clock = match explicit_clock {
750        Some(clock) => {
751            reject_site_clock_conflict(
752                tx,
753                &atom.provenance.principal,
754                &atom.provenance.persona,
755                site_id,
756                clock,
757                atom.id,
758            )?;
759            advance_state_vector_tx(
760                tx,
761                &atom.provenance.principal,
762                &atom.provenance.persona,
763                site_id,
764                clock,
765            )?;
766            clock
767        }
768        None => reserve_next_clock_tx(
769            tx,
770            &atom.provenance.principal,
771            &atom.provenance.persona,
772            site_id,
773        )?,
774    };
775
776    let body = atom.to_binary()?;
777    let timestamp_ns = atom_timestamp_ns(atom)?;
778    let timestamp_rfc3339 = atom_timestamp_rfc3339(atom)?;
779    let inverse_of = atom.inverse_of.map(|id| id.0.to_vec());
780    tx.execute(
781        "INSERT INTO atoms (
782             id, principal, persona, timestamp_ns, timestamp_rfc3339,
783             site_id, site_clock, inverse_of, body_binary
784         )
785         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
786        params![
787            atom.id.0.as_slice(),
788            atom.provenance.principal,
789            atom.provenance.persona,
790            timestamp_ns,
791            timestamp_rfc3339,
792            site_id,
793            i64_from_u64(clock, "atom site clock")?,
794            inverse_of.as_deref(),
795            body.as_slice(),
796        ],
797    )?;
798
799    for (ordinal, parent) in atom.parents.iter().enumerate() {
800        tx.execute(
801            "INSERT INTO atom_parents (child_id, parent_id, ordinal)
802             VALUES (?1, ?2, ?3)",
803            params![
804                atom.id.0.as_slice(),
805                parent.0.as_slice(),
806                i64_from_usize(ordinal, "atom parent ordinal")?
807            ],
808        )?;
809    }
810
811    Ok(sqlite_atom_ref(atom.id, site_id, clock))
812}
813
814fn insert_slice_record_tx(
815    tx: &Transaction<'_>,
816    slice_id: SliceId,
817    atoms: &[AtomId],
818    kind: &str,
819    body: &[u8],
820    shipped: bool,
821) -> Result<(), VcsBackendError> {
822    tx.execute(
823        "INSERT OR IGNORE INTO slices (id, slice_kind, body_json, shipped, ref_name)
824         VALUES (?1, ?2, ?3, ?4, ?5)",
825        params![
826            slice_id.0.as_slice(),
827            kind,
828            body,
829            if shipped { 1 } else { 0 },
830            if shipped {
831                Some(format!("{SQLITE_SLICE_REF_PREFIX}/{slice_id}"))
832            } else {
833                None
834            }
835        ],
836    )?;
837    for (ordinal, atom_id) in atoms.iter().enumerate() {
838        tx.execute(
839            "INSERT OR IGNORE INTO slice_atoms (slice_id, atom_id, ordinal)
840             VALUES (?1, ?2, ?3)",
841            params![
842                slice_id.0.as_slice(),
843                atom_id.0.as_slice(),
844                i64_from_usize(ordinal, "slice atom ordinal")?
845            ],
846        )?;
847    }
848    Ok(())
849}
850
851fn load_atom(conn: &Connection, atom_id: AtomId) -> Result<Atom, VcsBackendError> {
852    let body = conn
853        .query_row(
854            "SELECT body_binary FROM atoms WHERE id = ?1",
855            params![atom_id.0.as_slice()],
856            |row| row.get::<_, Vec<u8>>(0),
857        )
858        .optional()?
859        .ok_or_else(|| VcsBackendError::NotFound(format!("atom {atom_id} not found")))?;
860    Atom::from_binary_slice(&body).map_err(Into::into)
861}
862
863fn atom_clock_tx(
864    tx: &Transaction<'_>,
865    atom_id: AtomId,
866) -> Result<Option<(String, u64)>, VcsBackendError> {
867    tx.query_row(
868        "SELECT site_id, site_clock FROM atoms WHERE id = ?1",
869        params![atom_id.0.as_slice()],
870        |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
871    )
872    .optional()?
873    .map(|(site_id, clock)| Ok((site_id, u64_from_i64(clock, "atom site clock")?)))
874    .transpose()
875}
876
877fn reserve_next_clock_tx(
878    tx: &Transaction<'_>,
879    principal: &str,
880    persona: &str,
881    site_id: &str,
882) -> Result<u64, VcsBackendError> {
883    let current = state_vector_clock_tx(tx, principal, persona, site_id)?;
884    let next = current
885        .checked_add(1)
886        .ok_or_else(|| VcsBackendError::Invalid("state vector clock overflow".to_string()))?;
887    advance_state_vector_tx(tx, principal, persona, site_id, next)?;
888    Ok(next)
889}
890
891fn state_vector_clock_tx(
892    tx: &Transaction<'_>,
893    principal: &str,
894    persona: &str,
895    site_id: &str,
896) -> Result<u64, VcsBackendError> {
897    tx.query_row(
898        "SELECT clock FROM state_vectors
899         WHERE principal = ?1 AND persona = ?2 AND site_id = ?3",
900        params![principal, persona, site_id],
901        |row| row.get::<_, i64>(0),
902    )
903    .optional()?
904    .map(|clock| u64_from_i64(clock, "state vector clock"))
905    .transpose()
906    .map(|clock| clock.unwrap_or(0))
907}
908
909fn advance_state_vector_tx(
910    tx: &Transaction<'_>,
911    principal: &str,
912    persona: &str,
913    site_id: &str,
914    clock: u64,
915) -> Result<(), VcsBackendError> {
916    tx.execute(
917        "INSERT INTO state_vectors (principal, persona, site_id, clock, updated_at)
918         VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP)
919         ON CONFLICT(principal, persona, site_id) DO UPDATE SET
920             clock = CASE
921                 WHEN excluded.clock > state_vectors.clock THEN excluded.clock
922                 ELSE state_vectors.clock
923             END,
924             updated_at = CURRENT_TIMESTAMP",
925        params![
926            principal,
927            persona,
928            site_id,
929            i64_from_u64(clock, "state vector clock")?
930        ],
931    )?;
932    Ok(())
933}
934
935fn reject_site_clock_conflict(
936    tx: &Transaction<'_>,
937    principal: &str,
938    persona: &str,
939    site_id: &str,
940    clock: u64,
941    atom_id: AtomId,
942) -> Result<(), VcsBackendError> {
943    let existing = tx
944        .query_row(
945            "SELECT id FROM atoms
946             WHERE principal = ?1 AND persona = ?2 AND site_id = ?3 AND site_clock = ?4",
947            params![
948                principal,
949                persona,
950                site_id,
951                i64_from_u64(clock, "atom site clock")?
952            ],
953            |row| row.get::<_, Vec<u8>>(0),
954        )
955        .optional()?;
956    if let Some(existing) = existing {
957        let existing = atom_id_from_blob(existing)?;
958        if existing != atom_id {
959            return Err(VcsBackendError::Invalid(format!(
960                "site clock conflict for {site_id}@{clock}: existing atom {existing}, new atom {atom_id}"
961            )));
962        }
963    }
964    Ok(())
965}
966
967fn sqlite_atom_ref(atom_id: AtomId, site_id: &str, clock: u64) -> AtomRef {
968    AtomRef {
969        atom_id,
970        commit: format!("{site_id}:{clock}"),
971        ref_name: format!("{SQLITE_ATOM_REF_PREFIX}/{atom_id}"),
972    }
973}
974
975fn atom_timestamp_ns(atom: &Atom) -> Result<i64, VcsBackendError> {
976    i64::try_from(atom.provenance.timestamp.unix_timestamp_nanos())
977        .map_err(|_| VcsBackendError::Invalid("atom timestamp is out of SQLite range".to_string()))
978}
979
980fn atom_timestamp_rfc3339(atom: &Atom) -> Result<String, VcsBackendError> {
981    atom.provenance
982        .timestamp
983        .format(&Rfc3339)
984        .map_err(|error| VcsBackendError::Invalid(format!("atom timestamp format: {error}")))
985}
986
987fn atom_id_from_blob(blob: Vec<u8>) -> Result<AtomId, VcsBackendError> {
988    if blob.len() != 32 {
989        return Err(VcsBackendError::Invalid(format!(
990            "atom id blob must be 32 bytes, got {}",
991            blob.len()
992        )));
993    }
994    let mut out = [0u8; 32];
995    out.copy_from_slice(&blob);
996    Ok(AtomId(out))
997}
998
999fn normalize_site_id(site_id: String) -> Result<String, VcsBackendError> {
1000    if site_id.trim().is_empty() {
1001        return Err(VcsBackendError::Invalid(
1002            "flow store site_id must not be empty".to_string(),
1003        ));
1004    }
1005    Ok(site_id)
1006}
1007
1008fn i64_from_u64(value: u64, field: &str) -> Result<i64, VcsBackendError> {
1009    i64::try_from(value)
1010        .map_err(|_| VcsBackendError::Invalid(format!("{field} exceeds SQLite i64 range")))
1011}
1012
1013fn i64_from_usize(value: usize, field: &str) -> Result<i64, VcsBackendError> {
1014    i64::try_from(value)
1015        .map_err(|_| VcsBackendError::Invalid(format!("{field} exceeds SQLite i64 range")))
1016}
1017
1018fn u64_from_i64(value: i64, field: &str) -> Result<u64, VcsBackendError> {
1019    u64::try_from(value).map_err(|_| VcsBackendError::Invalid(format!("{field} is negative")))
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use super::*;
1025    use crate::flow::{Approval, CoverageMap, PredicateHash, Slice, SliceStatus, TestId, TextOp};
1026    use ed25519_dalek::SigningKey;
1027    use time::OffsetDateTime;
1028
1029    fn key(seed: u8) -> SigningKey {
1030        SigningKey::from_bytes(&[seed; 32])
1031    }
1032
1033    fn atom(index: u64, parents: Vec<AtomId>) -> Atom {
1034        let principal = key(1);
1035        let persona = key(2);
1036        let timestamp = OffsetDateTime::from_unix_timestamp(1_775_000_000 + index as i64).unwrap();
1037        Atom::sign(
1038            vec![TextOp::Insert {
1039                offset: index,
1040                content: format!("atom-{index}"),
1041            }],
1042            parents,
1043            crate::flow::Provenance {
1044                principal: "user:alice".to_string(),
1045                persona: "ship-captain".to_string(),
1046                agent_run_id: format!("run-{index}"),
1047                tool_call_id: Some(format!("tool-{index}")),
1048                trace_id: "trace-1".to_string(),
1049                transcript_ref: "transcript-1".to_string(),
1050                timestamp,
1051            },
1052            None,
1053            &principal,
1054            &persona,
1055        )
1056        .unwrap()
1057    }
1058
1059    #[test]
1060    fn emits_replays_and_queries_atoms() {
1061        let store = SqliteFlowStore::in_memory("site-a").unwrap();
1062        let first = atom(1, vec![]);
1063        let second = atom(2, vec![first.id]);
1064
1065        let refs = store.emit_atoms(&[first.clone(), second.clone()]).unwrap();
1066        assert_eq!(refs.len(), 2);
1067        assert_eq!(refs[0].commit, "site-a:1");
1068        assert_eq!(refs[1].commit, "site-a:2");
1069        assert_eq!(store.get_atom(first.id).unwrap(), first);
1070        assert_eq!(
1071            store.atom_by_content_hash(second.id).unwrap(),
1072            Some(second.clone())
1073        );
1074        assert_eq!(
1075            store.atoms_with_parent(first.id).unwrap(),
1076            vec![second.clone()]
1077        );
1078        assert_eq!(
1079            store
1080                .atoms_for_principal_persona("user:alice", "ship-captain")
1081                .unwrap(),
1082            vec![first, second]
1083        );
1084    }
1085
1086    #[test]
1087    fn derives_and_replays_parent_closed_slices() {
1088        let store = SqliteFlowStore::in_memory("site-a").unwrap();
1089        let first = atom(1, vec![]);
1090        let second = atom(2, vec![first.id]);
1091        store.emit_atoms(&[first.clone(), second.clone()]).unwrap();
1092
1093        let slice = store.derive_slice(&[second.id]).unwrap();
1094        assert_eq!(slice.atoms, vec![first.id, second.id]);
1095        let receipt = store.ship_slice(&slice).unwrap();
1096        assert_eq!(receipt.slice_id, slice.id);
1097        assert_eq!(store.replay_slice(&slice).unwrap(), vec![first, second]);
1098    }
1099
1100    #[test]
1101    fn state_vector_delta_round_trips_between_replicas() {
1102        let source = SqliteFlowStore::in_memory("site-a").unwrap();
1103        let replica = SqliteFlowStore::in_memory("site-b").unwrap();
1104        let first = atom(1, vec![]);
1105        let second = atom(2, vec![first.id]);
1106        source.emit_atoms(&[first.clone(), second.clone()]).unwrap();
1107
1108        let empty = replica.state_vector("user:alice", "ship-captain").unwrap();
1109        let delta = source
1110            .causal_delta("user:alice", "ship-captain", &empty)
1111            .unwrap();
1112        assert_eq!(delta.len(), 2);
1113        for item in &delta {
1114            replica
1115                .insert_remote_atom(&item.atom, &item.site_id, item.clock)
1116                .unwrap();
1117        }
1118
1119        let vector = replica.state_vector("user:alice", "ship-captain").unwrap();
1120        assert_eq!(vector.clock("site-a"), 2);
1121        assert!(source
1122            .causal_delta("user:alice", "ship-captain", &vector)
1123            .unwrap()
1124            .is_empty());
1125        assert_eq!(replica.get_atom(second.id).unwrap(), second);
1126    }
1127
1128    #[test]
1129    fn persists_intents_and_derived_slices() {
1130        let store = SqliteFlowStore::in_memory("site-a").unwrap();
1131        let first = atom(1, vec![]);
1132        store.emit_atom(&first).unwrap();
1133
1134        let intent = Intent::new(
1135            vec![first.id],
1136            "ship the smallest possible change",
1137            crate::flow::TranscriptSpan::new("transcript-1", 1, 1).unwrap(),
1138            0.9,
1139        )
1140        .unwrap();
1141        store.put_intent(&intent).unwrap();
1142        assert_eq!(store.get_intent(intent.id).unwrap(), intent);
1143
1144        let mut coverage = CoverageMap::new();
1145        coverage.insert(first.id, TestId::new("flow-store"));
1146        let slice = Slice {
1147            id: SliceId([3; 32]),
1148            atoms: vec![first.id],
1149            intents: vec![intent.id],
1150            invariants_applied: vec![(
1151                PredicateHash::new("pred"),
1152                crate::flow::InvariantResult::allow(),
1153            )],
1154            required_tests: vec![TestId::new("flow-store")],
1155            approval_chain: vec![Approval {
1156                reviewer: "alice".to_string(),
1157                approved_at: "2026-04-25T00:00:00Z".to_string(),
1158                reason: None,
1159                signature: None,
1160            }],
1161            base_ref: first.id,
1162            status: SliceStatus::Ready,
1163        };
1164        store.put_derived_slice(&slice).unwrap();
1165        assert_eq!(store.get_derived_slice(slice.id).unwrap(), slice);
1166    }
1167
1168    #[test]
1169    fn lists_only_shipped_derived_slices_for_replay_audit() {
1170        let store = SqliteFlowStore::in_memory("site-a").unwrap();
1171        let first = atom(1, vec![]);
1172        store.emit_atom(&first).unwrap();
1173
1174        let shipped = Slice {
1175            id: SliceId([4; 32]),
1176            atoms: vec![first.id],
1177            intents: Vec::new(),
1178            invariants_applied: vec![(
1179                PredicateHash::new("sha256:retro"),
1180                crate::flow::InvariantResult::allow(),
1181            )],
1182            required_tests: vec![TestId::new("flow-store")],
1183            approval_chain: Vec::new(),
1184            base_ref: first.id,
1185            status: SliceStatus::Ready,
1186        };
1187        let unshipped = Slice {
1188            id: SliceId([5; 32]),
1189            atoms: vec![first.id],
1190            intents: Vec::new(),
1191            invariants_applied: Vec::new(),
1192            required_tests: Vec::new(),
1193            approval_chain: Vec::new(),
1194            base_ref: first.id,
1195            status: SliceStatus::Ready,
1196        };
1197
1198        store.put_shipped_derived_slice(&shipped).unwrap();
1199        store.put_derived_slice(&unshipped).unwrap();
1200
1201        let rows = store.shipped_derived_slices_since(None).unwrap();
1202        assert_eq!(rows.len(), 1);
1203        assert_eq!(rows[0].slice, shipped);
1204        assert!(!rows[0].created_at.is_empty());
1205    }
1206
1207    #[test]
1208    fn atoms_are_append_only_at_sql_boundary() {
1209        let store = SqliteFlowStore::in_memory("site-a").unwrap();
1210        let first = atom(1, vec![]);
1211        store.emit_atom(&first).unwrap();
1212
1213        let conn = store.lock_conn().unwrap();
1214        let error = conn
1215            .execute(
1216                "DELETE FROM atoms WHERE id = ?1",
1217                params![first.id.0.as_slice()],
1218            )
1219            .unwrap_err();
1220        assert!(error.to_string().contains("atoms are append-only"));
1221    }
1222
1223    #[test]
1224    fn slices_are_append_only_at_sql_boundary() {
1225        let store = SqliteFlowStore::in_memory("site-a").unwrap();
1226        let first = atom(1, vec![]);
1227        store.emit_atom(&first).unwrap();
1228        let slice = Slice {
1229            id: SliceId([6; 32]),
1230            atoms: vec![first.id],
1231            intents: Vec::new(),
1232            invariants_applied: Vec::new(),
1233            required_tests: Vec::new(),
1234            approval_chain: Vec::new(),
1235            base_ref: first.id,
1236            status: SliceStatus::Ready,
1237        };
1238        store.put_shipped_derived_slice(&slice).unwrap();
1239
1240        let conn = store.lock_conn().unwrap();
1241        let error = conn
1242            .execute(
1243                "DELETE FROM slices WHERE id = ?1",
1244                params![slice.id.0.as_slice()],
1245            )
1246            .unwrap_err();
1247        assert!(error.to_string().contains("slices are append-only"));
1248    }
1249}