1use std::collections::{BTreeMap, HashMap, HashSet};
9use std::fmt;
10use std::path::Path;
11use std::sync::{Mutex, MutexGuard};
12
13use rusqlite::{params, Connection, OptionalExtension, Transaction};
14use serde::{Deserialize, Serialize};
15use time::format_description::well_known::Rfc3339;
16use time::OffsetDateTime;
17
18use super::backend::{AtomRef, FlowSlice, GitExportReceipt, ShipReceipt, VcsBackend};
19use super::{Atom, AtomId, Intent, IntentId, Slice as DerivedSlice, SliceId, VcsBackendError};
20
21const SQLITE_ATOM_REF_PREFIX: &str = "sqlite://atoms";
22const SQLITE_SLICE_REF_PREFIX: &str = "sqlite://slices";
23
24#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
26pub struct StateVector {
27 clocks: BTreeMap<String, u64>,
28}
29
30impl StateVector {
31 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub fn insert(&mut self, site_id: impl Into<String>, clock: u64) {
36 self.clocks.insert(site_id.into(), clock);
37 }
38
39 pub fn clock(&self, site_id: &str) -> u64 {
40 self.clocks.get(site_id).copied().unwrap_or(0)
41 }
42
43 pub fn iter(&self) -> impl Iterator<Item = (&str, u64)> {
44 self.clocks
45 .iter()
46 .map(|(site_id, clock)| (site_id.as_str(), *clock))
47 }
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct AtomDelta {
53 pub atom: Atom,
54 pub site_id: String,
55 pub clock: u64,
56}
57
58#[derive(Clone, Debug, PartialEq, Eq)]
60pub struct StoredDerivedSlice {
61 pub slice: DerivedSlice,
62 pub created_at: String,
63}
64
65pub struct SqliteFlowStore {
67 site_id: String,
68 conn: Mutex<Connection>,
69}
70
71impl fmt::Debug for SqliteFlowStore {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("SqliteFlowStore")
74 .field("site_id", &self.site_id)
75 .finish_non_exhaustive()
76 }
77}
78
79impl SqliteFlowStore {
80 pub fn open(
83 path: impl AsRef<Path>,
84 site_id: impl Into<String>,
85 ) -> Result<Self, VcsBackendError> {
86 let site_id = normalize_site_id(site_id.into())?;
87 let conn = Connection::open(path)?;
88 initialize_schema(&conn)?;
89 Ok(Self {
90 site_id,
91 conn: Mutex::new(conn),
92 })
93 }
94
95 pub fn in_memory(site_id: impl Into<String>) -> Result<Self, VcsBackendError> {
97 let site_id = normalize_site_id(site_id.into())?;
98 let conn = Connection::open_in_memory()?;
99 initialize_schema(&conn)?;
100 Ok(Self {
101 site_id,
102 conn: Mutex::new(conn),
103 })
104 }
105
106 pub fn site_id(&self) -> &str {
107 &self.site_id
108 }
109
110 pub fn emit_atoms(&self, atoms: &[Atom]) -> Result<Vec<AtomRef>, VcsBackendError> {
112 self.emit_atoms_inner(atoms, true)
113 }
114
115 pub fn emit_preverified_atoms(&self, atoms: &[Atom]) -> Result<Vec<AtomRef>, VcsBackendError> {
122 let mut conn = self.lock_conn()?;
123 let tx = conn.transaction()?;
124 let mut clocks: HashMap<(String, String), u64> = HashMap::new();
125 let mut refs = Vec::with_capacity(atoms.len());
126
127 {
128 let mut insert_atom = tx.prepare_cached(
129 "INSERT INTO atoms (
130 id, principal, persona, timestamp_ns, timestamp_rfc3339,
131 site_id, site_clock, inverse_of, body_binary
132 )
133 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
134 )?;
135 let mut insert_parent = tx.prepare_cached(
136 "INSERT INTO atom_parents (child_id, parent_id, ordinal)
137 VALUES (?1, ?2, ?3)",
138 )?;
139
140 for atom in atoms {
141 let key = (
142 atom.provenance.principal.clone(),
143 atom.provenance.persona.clone(),
144 );
145 if !clocks.contains_key(&key) {
146 let current = state_vector_clock_tx(
147 &tx,
148 &atom.provenance.principal,
149 &atom.provenance.persona,
150 &self.site_id,
151 )?;
152 clocks.insert(key.clone(), current);
153 }
154 let clock = clocks
155 .get_mut(&key)
156 .expect("clock was inserted before increment");
157 *clock = clock
158 .checked_add(1)
159 .ok_or_else(|| VcsBackendError::Invalid("site clock overflow".to_string()))?;
160
161 let body = atom.to_binary()?;
162 let timestamp_ns = atom_timestamp_ns(atom)?;
163 let timestamp_rfc3339 = atom_timestamp_rfc3339(atom)?;
164 let inverse_of = atom.inverse_of.map(|id| id.0.to_vec());
165 insert_atom.execute(params![
166 atom.id.0.as_slice(),
167 atom.provenance.principal,
168 atom.provenance.persona,
169 timestamp_ns,
170 timestamp_rfc3339,
171 self.site_id.as_str(),
172 i64_from_u64(*clock, "atom site clock")?,
173 inverse_of.as_deref(),
174 body.as_slice(),
175 ])?;
176
177 for (ordinal, parent) in atom.parents.iter().enumerate() {
178 insert_parent.execute(params![
179 atom.id.0.as_slice(),
180 parent.0.as_slice(),
181 i64_from_usize(ordinal, "atom parent ordinal")?
182 ])?;
183 }
184 refs.push(sqlite_atom_ref(atom.id, &self.site_id, *clock));
185 }
186 }
187
188 for ((principal, persona), clock) in clocks {
189 advance_state_vector_tx(&tx, &principal, &persona, &self.site_id, clock)?;
190 }
191 tx.commit()?;
192 Ok(refs)
193 }
194
195 fn emit_atoms_inner(
196 &self,
197 atoms: &[Atom],
198 verify: bool,
199 ) -> Result<Vec<AtomRef>, VcsBackendError> {
200 let mut conn = self.lock_conn()?;
201 let tx = conn.transaction()?;
202 let mut refs = Vec::with_capacity(atoms.len());
203 for atom in atoms {
204 if verify {
205 atom.verify()?;
206 }
207 refs.push(insert_atom_tx(&tx, atom, &self.site_id, None)?);
208 }
209 tx.commit()?;
210 Ok(refs)
211 }
212
213 pub fn insert_remote_atom(
215 &self,
216 atom: &Atom,
217 site_id: &str,
218 clock: u64,
219 ) -> Result<AtomRef, VcsBackendError> {
220 atom.verify()?;
221 if clock == 0 {
222 return Err(VcsBackendError::Invalid(
223 "remote atom clock must be greater than zero".to_string(),
224 ));
225 }
226 let site_id = normalize_site_id(site_id.to_string())?;
227 let mut conn = self.lock_conn()?;
228 let tx = conn.transaction()?;
229 let atom_ref = insert_atom_tx(&tx, atom, &site_id, Some(clock))?;
230 tx.commit()?;
231 Ok(atom_ref)
232 }
233
234 pub fn get_atom(&self, atom_id: AtomId) -> Result<Atom, VcsBackendError> {
236 let conn = self.lock_conn()?;
237 load_atom(&conn, atom_id)
238 }
239
240 pub fn atom_by_content_hash(
243 &self,
244 content_hash: AtomId,
245 ) -> Result<Option<Atom>, VcsBackendError> {
246 let conn = self.lock_conn()?;
247 conn.query_row(
248 "SELECT body_binary FROM atoms WHERE id = ?1",
249 params![content_hash.0.as_slice()],
250 |row| row.get::<_, Vec<u8>>(0),
251 )
252 .optional()?
253 .map(|body| Atom::from_binary_slice(&body).map_err(Into::into))
254 .transpose()
255 }
256
257 pub fn atoms_for_principal_persona(
259 &self,
260 principal: &str,
261 persona: &str,
262 ) -> Result<Vec<Atom>, VcsBackendError> {
263 let conn = self.lock_conn()?;
264 let mut stmt = conn.prepare(
265 "SELECT id FROM atoms
266 WHERE principal = ?1 AND persona = ?2
267 ORDER BY timestamp_ns, id",
268 )?;
269 let rows = stmt.query_map(params![principal, persona], |row| row.get::<_, Vec<u8>>(0))?;
270 let mut atoms = Vec::new();
271 for row in rows {
272 atoms.push(load_atom(&conn, atom_id_from_blob(row?)?)?);
273 }
274 Ok(atoms)
275 }
276
277 pub fn atom_count_for_principal_persona(
279 &self,
280 principal: &str,
281 persona: &str,
282 ) -> Result<u64, VcsBackendError> {
283 let conn = self.lock_conn()?;
284 let count = conn.query_row(
285 "SELECT COUNT(*) FROM atoms WHERE principal = ?1 AND persona = ?2",
286 params![principal, persona],
287 |row| row.get::<_, i64>(0),
288 )?;
289 u64_from_i64(count, "atom count")
290 }
291
292 pub fn atoms_with_parent(&self, parent: AtomId) -> Result<Vec<Atom>, VcsBackendError> {
294 let conn = self.lock_conn()?;
295 let mut stmt = conn.prepare(
296 "SELECT child_id FROM atom_parents
297 WHERE parent_id = ?1
298 ORDER BY child_id",
299 )?;
300 let rows = stmt.query_map(params![parent.0.as_slice()], |row| row.get::<_, Vec<u8>>(0))?;
301 let mut atoms = Vec::new();
302 for row in rows {
303 atoms.push(load_atom(&conn, atom_id_from_blob(row?)?)?);
304 }
305 Ok(atoms)
306 }
307
308 pub fn state_vector(
310 &self,
311 principal: &str,
312 persona: &str,
313 ) -> Result<StateVector, VcsBackendError> {
314 let conn = self.lock_conn()?;
315 let mut stmt = conn.prepare(
316 "SELECT site_id, clock FROM state_vectors
317 WHERE principal = ?1 AND persona = ?2
318 ORDER BY site_id",
319 )?;
320 let rows = stmt.query_map(params![principal, persona], |row| {
321 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
322 })?;
323 let mut vector = StateVector::new();
324 for row in rows {
325 let (site_id, clock) = row?;
326 vector.insert(site_id, u64_from_i64(clock, "state vector clock")?);
327 }
328 Ok(vector)
329 }
330
331 pub fn causal_delta(
333 &self,
334 principal: &str,
335 persona: &str,
336 remote: &StateVector,
337 ) -> Result<Vec<AtomDelta>, VcsBackendError> {
338 let conn = self.lock_conn()?;
339 let mut stmt = conn.prepare(
340 "SELECT id, site_id, site_clock FROM atoms
341 WHERE principal = ?1 AND persona = ?2
342 ORDER BY site_id, site_clock, id",
343 )?;
344 let rows = stmt.query_map(params![principal, persona], |row| {
345 Ok((
346 row.get::<_, Vec<u8>>(0)?,
347 row.get::<_, String>(1)?,
348 row.get::<_, i64>(2)?,
349 ))
350 })?;
351 let mut delta = Vec::new();
352 for row in rows {
353 let (id_blob, site_id, clock_raw) = row?;
354 let clock = u64_from_i64(clock_raw, "atom site clock")?;
355 if clock > remote.clock(&site_id) {
356 delta.push(AtomDelta {
357 atom: load_atom(&conn, atom_id_from_blob(id_blob)?)?,
358 site_id,
359 clock,
360 });
361 }
362 }
363 Ok(delta)
364 }
365
366 pub fn put_intent(&self, intent: &Intent) -> Result<(), VcsBackendError> {
368 let body = serde_json::to_vec(intent)?;
369 let mut conn = self.lock_conn()?;
370 let tx = conn.transaction()?;
371 tx.execute(
372 "INSERT OR IGNORE INTO intents (id, body_json, goal_description, confidence)
373 VALUES (?1, ?2, ?3, ?4)",
374 params![
375 intent.id.0.as_slice(),
376 body.as_slice(),
377 intent.goal_description,
378 f64::from(intent.confidence)
379 ],
380 )?;
381 for (ordinal, atom_id) in intent.atoms.iter().enumerate() {
382 tx.execute(
383 "INSERT OR IGNORE INTO intent_atoms (intent_id, atom_id, ordinal)
384 VALUES (?1, ?2, ?3)",
385 params![
386 intent.id.0.as_slice(),
387 atom_id.0.as_slice(),
388 i64_from_usize(ordinal, "intent atom ordinal")?
389 ],
390 )?;
391 }
392 tx.commit()?;
393 Ok(())
394 }
395
396 pub fn get_intent(&self, intent_id: IntentId) -> Result<Intent, VcsBackendError> {
397 let conn = self.lock_conn()?;
398 let body = conn
399 .query_row(
400 "SELECT body_json FROM intents WHERE id = ?1",
401 params![intent_id.0.as_slice()],
402 |row| row.get::<_, Vec<u8>>(0),
403 )
404 .optional()?
405 .ok_or_else(|| VcsBackendError::NotFound(format!("intent {intent_id} not found")))?;
406 serde_json::from_slice(&body).map_err(Into::into)
407 }
408
409 pub fn put_derived_slice(&self, slice: &DerivedSlice) -> Result<(), VcsBackendError> {
411 let body = serde_json::to_vec(slice)?;
412 self.insert_slice_record(slice.id, &slice.atoms, "derived", body, false)
413 }
414
415 pub fn put_shipped_derived_slice(&self, slice: &DerivedSlice) -> Result<(), VcsBackendError> {
420 let body = serde_json::to_vec(slice)?;
421 self.insert_slice_record(slice.id, &slice.atoms, "derived", body, true)
422 }
423
424 pub fn get_derived_slice(&self, slice_id: SliceId) -> Result<DerivedSlice, VcsBackendError> {
425 let conn = self.lock_conn()?;
426 let body = conn
427 .query_row(
428 "SELECT body_json FROM slices WHERE id = ?1 AND slice_kind = 'derived'",
429 params![slice_id.0.as_slice()],
430 |row| row.get::<_, Vec<u8>>(0),
431 )
432 .optional()?
433 .ok_or_else(|| VcsBackendError::NotFound(format!("slice {slice_id} not found")))?;
434 serde_json::from_slice(&body).map_err(Into::into)
435 }
436
437 pub fn shipped_derived_slices_since(
440 &self,
441 since: Option<OffsetDateTime>,
442 ) -> Result<Vec<StoredDerivedSlice>, VcsBackendError> {
443 let since = since
444 .map(|value| value.format(&Rfc3339))
445 .transpose()
446 .map_err(|error| VcsBackendError::Invalid(format!("timestamp format: {error}")))?;
447 let conn = self.lock_conn()?;
448 let mut stmt = conn.prepare(
449 "SELECT body_json, created_at FROM slices
450 WHERE slice_kind = 'derived'
451 AND shipped = 1
452 AND (?1 IS NULL OR created_at >= datetime(?1))
453 ORDER BY created_at, id",
454 )?;
455 let rows = stmt.query_map(params![since.as_deref()], |row| {
456 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, String>(1)?))
457 })?;
458 let mut slices = Vec::new();
459 for row in rows {
460 let (body, created_at) = row?;
461 slices.push(StoredDerivedSlice {
462 slice: serde_json::from_slice(&body)?,
463 created_at,
464 });
465 }
466 Ok(slices)
467 }
468
469 fn insert_flow_slice(&self, slice: &FlowSlice, shipped: bool) -> Result<(), VcsBackendError> {
470 let body = serde_json::to_vec(slice)?;
471 self.insert_slice_record(slice.id, &slice.atoms, "flow", body, shipped)
472 }
473
474 fn insert_slice_record(
475 &self,
476 slice_id: SliceId,
477 atoms: &[AtomId],
478 kind: &str,
479 body: Vec<u8>,
480 shipped: bool,
481 ) -> Result<(), VcsBackendError> {
482 let mut conn = self.lock_conn()?;
483 let tx = conn.transaction()?;
484 insert_slice_record_tx(&tx, slice_id, atoms, kind, &body, shipped)?;
485 tx.commit()?;
486 Ok(())
487 }
488
489 fn atom_closure(&self, roots: &[AtomId]) -> Result<Vec<AtomId>, VcsBackendError> {
490 let mut opened = HashSet::new();
491 let mut emitted = HashSet::new();
492 let mut out = Vec::new();
493 let mut stack: Vec<(AtomId, bool)> = roots
494 .iter()
495 .rev()
496 .copied()
497 .map(|atom_id| (atom_id, false))
498 .collect();
499
500 while let Some((atom_id, emit)) = stack.pop() {
501 if emit {
502 if emitted.insert(atom_id) {
503 out.push(atom_id);
504 }
505 continue;
506 }
507 if emitted.contains(&atom_id) || !opened.insert(atom_id) {
508 continue;
509 }
510
511 let atom = self.get_atom(atom_id)?;
512 stack.push((atom_id, true));
513 for parent in atom.parents.iter().rev() {
514 if !emitted.contains(parent) {
515 stack.push((*parent, false));
516 }
517 }
518 }
519
520 Ok(out)
521 }
522
523 fn lock_conn(&self) -> Result<MutexGuard<'_, Connection>, VcsBackendError> {
524 self.conn
525 .lock()
526 .map_err(|_| VcsBackendError::Io("sqlite flow store lock poisoned".to_string()))
527 }
528}
529
530impl VcsBackend for SqliteFlowStore {
531 fn emit_atom(&self, atom: &Atom) -> Result<AtomRef, VcsBackendError> {
532 self.emit_atoms(std::slice::from_ref(atom))
533 .map(|mut refs| refs.remove(0))
534 }
535
536 fn derive_slice(&self, atoms: &[AtomId]) -> Result<FlowSlice, VcsBackendError> {
537 FlowSlice::new(self.atom_closure(atoms)?)
538 }
539
540 fn ship_slice(&self, slice: &FlowSlice) -> Result<ShipReceipt, VcsBackendError> {
541 self.insert_flow_slice(slice, true)?;
542 Ok(ShipReceipt {
543 slice_id: slice.id,
544 commit: slice.id.to_string(),
545 ref_name: format!("{SQLITE_SLICE_REF_PREFIX}/{}", slice.id),
546 })
547 }
548
549 fn list_atoms(&self) -> Result<Vec<AtomRef>, VcsBackendError> {
550 let conn = self.lock_conn()?;
551 let mut stmt = conn.prepare(
552 "SELECT id, site_id, site_clock FROM atoms
553 ORDER BY principal, persona, timestamp_ns, id",
554 )?;
555 let rows = stmt.query_map([], |row| {
556 Ok((
557 row.get::<_, Vec<u8>>(0)?,
558 row.get::<_, String>(1)?,
559 row.get::<_, i64>(2)?,
560 ))
561 })?;
562 let mut atoms = Vec::new();
563 for row in rows {
564 let (id_blob, site_id, clock_raw) = row?;
565 atoms.push(sqlite_atom_ref(
566 atom_id_from_blob(id_blob)?,
567 &site_id,
568 u64_from_i64(clock_raw, "atom site clock")?,
569 ));
570 }
571 Ok(atoms)
572 }
573
574 fn replay_slice(&self, slice: &FlowSlice) -> Result<Vec<Atom>, VcsBackendError> {
575 slice
576 .atoms
577 .iter()
578 .map(|atom_id| self.get_atom(*atom_id))
579 .collect()
580 }
581
582 fn export_git(
583 &self,
584 _slice: &FlowSlice,
585 _ref_name: &str,
586 ) -> Result<GitExportReceipt, VcsBackendError> {
587 Err(VcsBackendError::Unsupported(
588 "SqliteFlowStore cannot export git refs; use ShadowGitBackend for git export"
589 .to_string(),
590 ))
591 }
592
593 fn import_git(&self, _ref_name: &str) -> Result<FlowSlice, VcsBackendError> {
594 Err(VcsBackendError::Unsupported(
595 "SqliteFlowStore cannot import git refs; use ShadowGitBackend for git import"
596 .to_string(),
597 ))
598 }
599}
600
601fn initialize_schema(conn: &Connection) -> Result<(), VcsBackendError> {
602 conn.execute_batch(
603 r#"
604 PRAGMA foreign_keys = ON;
605 PRAGMA journal_mode = WAL;
606 PRAGMA synchronous = NORMAL;
607
608 CREATE TABLE IF NOT EXISTS atoms (
609 id BLOB PRIMARY KEY CHECK(length(id) = 32),
610 principal TEXT NOT NULL,
611 persona TEXT NOT NULL,
612 timestamp_ns INTEGER NOT NULL,
613 timestamp_rfc3339 TEXT NOT NULL,
614 site_id TEXT NOT NULL,
615 site_clock INTEGER NOT NULL CHECK(site_clock > 0),
616 inverse_of BLOB CHECK(inverse_of IS NULL OR length(inverse_of) = 32),
617 body_binary BLOB NOT NULL,
618 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
619 UNIQUE(principal, persona, site_id, site_clock)
620 );
621
622 CREATE INDEX IF NOT EXISTS atoms_principal_persona_timestamp_idx
623 ON atoms(principal, persona, timestamp_ns, id);
624 CREATE INDEX IF NOT EXISTS atoms_principal_persona_site_clock_idx
625 ON atoms(principal, persona, site_id, site_clock);
626 CREATE INDEX IF NOT EXISTS atoms_inverse_of_idx ON atoms(inverse_of);
627
628 CREATE TABLE IF NOT EXISTS atom_parents (
629 child_id BLOB NOT NULL CHECK(length(child_id) = 32),
630 parent_id BLOB NOT NULL CHECK(length(parent_id) = 32),
631 ordinal INTEGER NOT NULL CHECK(ordinal >= 0),
632 PRIMARY KEY(child_id, ordinal),
633 UNIQUE(child_id, parent_id),
634 FOREIGN KEY(child_id) REFERENCES atoms(id)
635 );
636 CREATE INDEX IF NOT EXISTS atom_parents_parent_idx
637 ON atom_parents(parent_id, child_id);
638
639 CREATE TABLE IF NOT EXISTS intents (
640 id BLOB PRIMARY KEY CHECK(length(id) = 32),
641 body_json BLOB NOT NULL,
642 goal_description TEXT NOT NULL,
643 confidence REAL NOT NULL,
644 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
645 );
646
647 CREATE TABLE IF NOT EXISTS intent_atoms (
648 intent_id BLOB NOT NULL CHECK(length(intent_id) = 32),
649 atom_id BLOB NOT NULL CHECK(length(atom_id) = 32),
650 ordinal INTEGER NOT NULL CHECK(ordinal >= 0),
651 PRIMARY KEY(intent_id, ordinal),
652 UNIQUE(intent_id, atom_id),
653 FOREIGN KEY(intent_id) REFERENCES intents(id)
654 );
655 CREATE INDEX IF NOT EXISTS intent_atoms_atom_idx
656 ON intent_atoms(atom_id, intent_id);
657
658 CREATE TABLE IF NOT EXISTS slices (
659 id BLOB PRIMARY KEY CHECK(length(id) = 32),
660 slice_kind TEXT NOT NULL,
661 body_json BLOB NOT NULL,
662 shipped INTEGER NOT NULL DEFAULT 0,
663 ref_name TEXT,
664 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
665 );
666
667 CREATE TABLE IF NOT EXISTS slice_atoms (
668 slice_id BLOB NOT NULL CHECK(length(slice_id) = 32),
669 atom_id BLOB NOT NULL CHECK(length(atom_id) = 32),
670 ordinal INTEGER NOT NULL CHECK(ordinal >= 0),
671 PRIMARY KEY(slice_id, ordinal),
672 UNIQUE(slice_id, atom_id),
673 FOREIGN KEY(slice_id) REFERENCES slices(id)
674 );
675 CREATE INDEX IF NOT EXISTS slice_atoms_atom_idx
676 ON slice_atoms(atom_id, slice_id);
677
678 CREATE TABLE IF NOT EXISTS state_vectors (
679 principal TEXT NOT NULL,
680 persona TEXT NOT NULL,
681 site_id TEXT NOT NULL,
682 clock INTEGER NOT NULL CHECK(clock >= 0),
683 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
684 PRIMARY KEY(principal, persona, site_id)
685 );
686
687 CREATE TRIGGER IF NOT EXISTS atoms_no_update
688 BEFORE UPDATE ON atoms
689 BEGIN
690 SELECT RAISE(ABORT, 'atoms are append-only');
691 END;
692
693 CREATE TRIGGER IF NOT EXISTS atoms_no_delete
694 BEFORE DELETE ON atoms
695 BEGIN
696 SELECT RAISE(ABORT, 'atoms are append-only');
697 END;
698
699 CREATE TRIGGER IF NOT EXISTS atom_parents_no_update
700 BEFORE UPDATE ON atom_parents
701 BEGIN
702 SELECT RAISE(ABORT, 'atom parent edges are append-only');
703 END;
704
705 CREATE TRIGGER IF NOT EXISTS atom_parents_no_delete
706 BEFORE DELETE ON atom_parents
707 BEGIN
708 SELECT RAISE(ABORT, 'atom parent edges are append-only');
709 END;
710
711 CREATE TRIGGER IF NOT EXISTS slices_no_update
712 BEFORE UPDATE ON slices
713 BEGIN
714 SELECT RAISE(ABORT, 'slices are append-only');
715 END;
716
717 CREATE TRIGGER IF NOT EXISTS slices_no_delete
718 BEFORE DELETE ON slices
719 BEGIN
720 SELECT RAISE(ABORT, 'slices are append-only');
721 END;
722
723 CREATE TRIGGER IF NOT EXISTS slice_atoms_no_update
724 BEFORE UPDATE ON slice_atoms
725 BEGIN
726 SELECT RAISE(ABORT, 'slice atom edges are append-only');
727 END;
728
729 CREATE TRIGGER IF NOT EXISTS slice_atoms_no_delete
730 BEFORE DELETE ON slice_atoms
731 BEGIN
732 SELECT RAISE(ABORT, 'slice atom edges are append-only');
733 END;
734 "#,
735 )?;
736 Ok(())
737}
738
739fn insert_atom_tx(
740 tx: &Transaction<'_>,
741 atom: &Atom,
742 site_id: &str,
743 explicit_clock: Option<u64>,
744) -> Result<AtomRef, VcsBackendError> {
745 if let Some((existing_site, existing_clock)) = atom_clock_tx(tx, atom.id)? {
746 return Ok(sqlite_atom_ref(atom.id, &existing_site, existing_clock));
747 }
748
749 let clock = match explicit_clock {
750 Some(clock) => {
751 reject_site_clock_conflict(
752 tx,
753 &atom.provenance.principal,
754 &atom.provenance.persona,
755 site_id,
756 clock,
757 atom.id,
758 )?;
759 advance_state_vector_tx(
760 tx,
761 &atom.provenance.principal,
762 &atom.provenance.persona,
763 site_id,
764 clock,
765 )?;
766 clock
767 }
768 None => reserve_next_clock_tx(
769 tx,
770 &atom.provenance.principal,
771 &atom.provenance.persona,
772 site_id,
773 )?,
774 };
775
776 let body = atom.to_binary()?;
777 let timestamp_ns = atom_timestamp_ns(atom)?;
778 let timestamp_rfc3339 = atom_timestamp_rfc3339(atom)?;
779 let inverse_of = atom.inverse_of.map(|id| id.0.to_vec());
780 tx.execute(
781 "INSERT INTO atoms (
782 id, principal, persona, timestamp_ns, timestamp_rfc3339,
783 site_id, site_clock, inverse_of, body_binary
784 )
785 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
786 params![
787 atom.id.0.as_slice(),
788 atom.provenance.principal,
789 atom.provenance.persona,
790 timestamp_ns,
791 timestamp_rfc3339,
792 site_id,
793 i64_from_u64(clock, "atom site clock")?,
794 inverse_of.as_deref(),
795 body.as_slice(),
796 ],
797 )?;
798
799 for (ordinal, parent) in atom.parents.iter().enumerate() {
800 tx.execute(
801 "INSERT INTO atom_parents (child_id, parent_id, ordinal)
802 VALUES (?1, ?2, ?3)",
803 params![
804 atom.id.0.as_slice(),
805 parent.0.as_slice(),
806 i64_from_usize(ordinal, "atom parent ordinal")?
807 ],
808 )?;
809 }
810
811 Ok(sqlite_atom_ref(atom.id, site_id, clock))
812}
813
814fn insert_slice_record_tx(
815 tx: &Transaction<'_>,
816 slice_id: SliceId,
817 atoms: &[AtomId],
818 kind: &str,
819 body: &[u8],
820 shipped: bool,
821) -> Result<(), VcsBackendError> {
822 tx.execute(
823 "INSERT OR IGNORE INTO slices (id, slice_kind, body_json, shipped, ref_name)
824 VALUES (?1, ?2, ?3, ?4, ?5)",
825 params![
826 slice_id.0.as_slice(),
827 kind,
828 body,
829 if shipped { 1 } else { 0 },
830 if shipped {
831 Some(format!("{SQLITE_SLICE_REF_PREFIX}/{slice_id}"))
832 } else {
833 None
834 }
835 ],
836 )?;
837 for (ordinal, atom_id) in atoms.iter().enumerate() {
838 tx.execute(
839 "INSERT OR IGNORE INTO slice_atoms (slice_id, atom_id, ordinal)
840 VALUES (?1, ?2, ?3)",
841 params![
842 slice_id.0.as_slice(),
843 atom_id.0.as_slice(),
844 i64_from_usize(ordinal, "slice atom ordinal")?
845 ],
846 )?;
847 }
848 Ok(())
849}
850
851fn load_atom(conn: &Connection, atom_id: AtomId) -> Result<Atom, VcsBackendError> {
852 let body = conn
853 .query_row(
854 "SELECT body_binary FROM atoms WHERE id = ?1",
855 params![atom_id.0.as_slice()],
856 |row| row.get::<_, Vec<u8>>(0),
857 )
858 .optional()?
859 .ok_or_else(|| VcsBackendError::NotFound(format!("atom {atom_id} not found")))?;
860 Atom::from_binary_slice(&body).map_err(Into::into)
861}
862
863fn atom_clock_tx(
864 tx: &Transaction<'_>,
865 atom_id: AtomId,
866) -> Result<Option<(String, u64)>, VcsBackendError> {
867 tx.query_row(
868 "SELECT site_id, site_clock FROM atoms WHERE id = ?1",
869 params![atom_id.0.as_slice()],
870 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
871 )
872 .optional()?
873 .map(|(site_id, clock)| Ok((site_id, u64_from_i64(clock, "atom site clock")?)))
874 .transpose()
875}
876
877fn reserve_next_clock_tx(
878 tx: &Transaction<'_>,
879 principal: &str,
880 persona: &str,
881 site_id: &str,
882) -> Result<u64, VcsBackendError> {
883 let current = state_vector_clock_tx(tx, principal, persona, site_id)?;
884 let next = current
885 .checked_add(1)
886 .ok_or_else(|| VcsBackendError::Invalid("state vector clock overflow".to_string()))?;
887 advance_state_vector_tx(tx, principal, persona, site_id, next)?;
888 Ok(next)
889}
890
891fn state_vector_clock_tx(
892 tx: &Transaction<'_>,
893 principal: &str,
894 persona: &str,
895 site_id: &str,
896) -> Result<u64, VcsBackendError> {
897 tx.query_row(
898 "SELECT clock FROM state_vectors
899 WHERE principal = ?1 AND persona = ?2 AND site_id = ?3",
900 params![principal, persona, site_id],
901 |row| row.get::<_, i64>(0),
902 )
903 .optional()?
904 .map(|clock| u64_from_i64(clock, "state vector clock"))
905 .transpose()
906 .map(|clock| clock.unwrap_or(0))
907}
908
909fn advance_state_vector_tx(
910 tx: &Transaction<'_>,
911 principal: &str,
912 persona: &str,
913 site_id: &str,
914 clock: u64,
915) -> Result<(), VcsBackendError> {
916 tx.execute(
917 "INSERT INTO state_vectors (principal, persona, site_id, clock, updated_at)
918 VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP)
919 ON CONFLICT(principal, persona, site_id) DO UPDATE SET
920 clock = CASE
921 WHEN excluded.clock > state_vectors.clock THEN excluded.clock
922 ELSE state_vectors.clock
923 END,
924 updated_at = CURRENT_TIMESTAMP",
925 params![
926 principal,
927 persona,
928 site_id,
929 i64_from_u64(clock, "state vector clock")?
930 ],
931 )?;
932 Ok(())
933}
934
935fn reject_site_clock_conflict(
936 tx: &Transaction<'_>,
937 principal: &str,
938 persona: &str,
939 site_id: &str,
940 clock: u64,
941 atom_id: AtomId,
942) -> Result<(), VcsBackendError> {
943 let existing = tx
944 .query_row(
945 "SELECT id FROM atoms
946 WHERE principal = ?1 AND persona = ?2 AND site_id = ?3 AND site_clock = ?4",
947 params![
948 principal,
949 persona,
950 site_id,
951 i64_from_u64(clock, "atom site clock")?
952 ],
953 |row| row.get::<_, Vec<u8>>(0),
954 )
955 .optional()?;
956 if let Some(existing) = existing {
957 let existing = atom_id_from_blob(existing)?;
958 if existing != atom_id {
959 return Err(VcsBackendError::Invalid(format!(
960 "site clock conflict for {site_id}@{clock}: existing atom {existing}, new atom {atom_id}"
961 )));
962 }
963 }
964 Ok(())
965}
966
967fn sqlite_atom_ref(atom_id: AtomId, site_id: &str, clock: u64) -> AtomRef {
968 AtomRef {
969 atom_id,
970 commit: format!("{site_id}:{clock}"),
971 ref_name: format!("{SQLITE_ATOM_REF_PREFIX}/{atom_id}"),
972 }
973}
974
975fn atom_timestamp_ns(atom: &Atom) -> Result<i64, VcsBackendError> {
976 i64::try_from(atom.provenance.timestamp.unix_timestamp_nanos())
977 .map_err(|_| VcsBackendError::Invalid("atom timestamp is out of SQLite range".to_string()))
978}
979
980fn atom_timestamp_rfc3339(atom: &Atom) -> Result<String, VcsBackendError> {
981 atom.provenance
982 .timestamp
983 .format(&Rfc3339)
984 .map_err(|error| VcsBackendError::Invalid(format!("atom timestamp format: {error}")))
985}
986
987fn atom_id_from_blob(blob: Vec<u8>) -> Result<AtomId, VcsBackendError> {
988 if blob.len() != 32 {
989 return Err(VcsBackendError::Invalid(format!(
990 "atom id blob must be 32 bytes, got {}",
991 blob.len()
992 )));
993 }
994 let mut out = [0u8; 32];
995 out.copy_from_slice(&blob);
996 Ok(AtomId(out))
997}
998
999fn normalize_site_id(site_id: String) -> Result<String, VcsBackendError> {
1000 if site_id.trim().is_empty() {
1001 return Err(VcsBackendError::Invalid(
1002 "flow store site_id must not be empty".to_string(),
1003 ));
1004 }
1005 Ok(site_id)
1006}
1007
1008fn i64_from_u64(value: u64, field: &str) -> Result<i64, VcsBackendError> {
1009 i64::try_from(value)
1010 .map_err(|_| VcsBackendError::Invalid(format!("{field} exceeds SQLite i64 range")))
1011}
1012
1013fn i64_from_usize(value: usize, field: &str) -> Result<i64, VcsBackendError> {
1014 i64::try_from(value)
1015 .map_err(|_| VcsBackendError::Invalid(format!("{field} exceeds SQLite i64 range")))
1016}
1017
1018fn u64_from_i64(value: i64, field: &str) -> Result<u64, VcsBackendError> {
1019 u64::try_from(value).map_err(|_| VcsBackendError::Invalid(format!("{field} is negative")))
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use super::*;
1025 use crate::flow::{Approval, CoverageMap, PredicateHash, Slice, SliceStatus, TestId, TextOp};
1026 use ed25519_dalek::SigningKey;
1027 use time::OffsetDateTime;
1028
1029 fn key(seed: u8) -> SigningKey {
1030 SigningKey::from_bytes(&[seed; 32])
1031 }
1032
1033 fn atom(index: u64, parents: Vec<AtomId>) -> Atom {
1034 let principal = key(1);
1035 let persona = key(2);
1036 let timestamp = OffsetDateTime::from_unix_timestamp(1_775_000_000 + index as i64).unwrap();
1037 Atom::sign(
1038 vec![TextOp::Insert {
1039 offset: index,
1040 content: format!("atom-{index}"),
1041 }],
1042 parents,
1043 crate::flow::Provenance {
1044 principal: "user:alice".to_string(),
1045 persona: "ship-captain".to_string(),
1046 agent_run_id: format!("run-{index}"),
1047 tool_call_id: Some(format!("tool-{index}")),
1048 trace_id: "trace-1".to_string(),
1049 transcript_ref: "transcript-1".to_string(),
1050 timestamp,
1051 },
1052 None,
1053 &principal,
1054 &persona,
1055 )
1056 .unwrap()
1057 }
1058
1059 #[test]
1060 fn emits_replays_and_queries_atoms() {
1061 let store = SqliteFlowStore::in_memory("site-a").unwrap();
1062 let first = atom(1, vec![]);
1063 let second = atom(2, vec![first.id]);
1064
1065 let refs = store.emit_atoms(&[first.clone(), second.clone()]).unwrap();
1066 assert_eq!(refs.len(), 2);
1067 assert_eq!(refs[0].commit, "site-a:1");
1068 assert_eq!(refs[1].commit, "site-a:2");
1069 assert_eq!(store.get_atom(first.id).unwrap(), first);
1070 assert_eq!(
1071 store.atom_by_content_hash(second.id).unwrap(),
1072 Some(second.clone())
1073 );
1074 assert_eq!(
1075 store.atoms_with_parent(first.id).unwrap(),
1076 vec![second.clone()]
1077 );
1078 assert_eq!(
1079 store
1080 .atoms_for_principal_persona("user:alice", "ship-captain")
1081 .unwrap(),
1082 vec![first, second]
1083 );
1084 }
1085
1086 #[test]
1087 fn derives_and_replays_parent_closed_slices() {
1088 let store = SqliteFlowStore::in_memory("site-a").unwrap();
1089 let first = atom(1, vec![]);
1090 let second = atom(2, vec![first.id]);
1091 store.emit_atoms(&[first.clone(), second.clone()]).unwrap();
1092
1093 let slice = store.derive_slice(&[second.id]).unwrap();
1094 assert_eq!(slice.atoms, vec![first.id, second.id]);
1095 let receipt = store.ship_slice(&slice).unwrap();
1096 assert_eq!(receipt.slice_id, slice.id);
1097 assert_eq!(store.replay_slice(&slice).unwrap(), vec![first, second]);
1098 }
1099
1100 #[test]
1101 fn state_vector_delta_round_trips_between_replicas() {
1102 let source = SqliteFlowStore::in_memory("site-a").unwrap();
1103 let replica = SqliteFlowStore::in_memory("site-b").unwrap();
1104 let first = atom(1, vec![]);
1105 let second = atom(2, vec![first.id]);
1106 source.emit_atoms(&[first.clone(), second.clone()]).unwrap();
1107
1108 let empty = replica.state_vector("user:alice", "ship-captain").unwrap();
1109 let delta = source
1110 .causal_delta("user:alice", "ship-captain", &empty)
1111 .unwrap();
1112 assert_eq!(delta.len(), 2);
1113 for item in &delta {
1114 replica
1115 .insert_remote_atom(&item.atom, &item.site_id, item.clock)
1116 .unwrap();
1117 }
1118
1119 let vector = replica.state_vector("user:alice", "ship-captain").unwrap();
1120 assert_eq!(vector.clock("site-a"), 2);
1121 assert!(source
1122 .causal_delta("user:alice", "ship-captain", &vector)
1123 .unwrap()
1124 .is_empty());
1125 assert_eq!(replica.get_atom(second.id).unwrap(), second);
1126 }
1127
1128 #[test]
1129 fn persists_intents_and_derived_slices() {
1130 let store = SqliteFlowStore::in_memory("site-a").unwrap();
1131 let first = atom(1, vec![]);
1132 store.emit_atom(&first).unwrap();
1133
1134 let intent = Intent::new(
1135 vec![first.id],
1136 "ship the smallest possible change",
1137 crate::flow::TranscriptSpan::new("transcript-1", 1, 1).unwrap(),
1138 0.9,
1139 )
1140 .unwrap();
1141 store.put_intent(&intent).unwrap();
1142 assert_eq!(store.get_intent(intent.id).unwrap(), intent);
1143
1144 let mut coverage = CoverageMap::new();
1145 coverage.insert(first.id, TestId::new("flow-store"));
1146 let slice = Slice {
1147 id: SliceId([3; 32]),
1148 atoms: vec![first.id],
1149 intents: vec![intent.id],
1150 invariants_applied: vec![(
1151 PredicateHash::new("pred"),
1152 crate::flow::InvariantResult::allow(),
1153 )],
1154 required_tests: vec![TestId::new("flow-store")],
1155 approval_chain: vec![Approval {
1156 reviewer: "alice".to_string(),
1157 approved_at: "2026-04-25T00:00:00Z".to_string(),
1158 reason: None,
1159 signature: None,
1160 }],
1161 base_ref: first.id,
1162 status: SliceStatus::Ready,
1163 };
1164 store.put_derived_slice(&slice).unwrap();
1165 assert_eq!(store.get_derived_slice(slice.id).unwrap(), slice);
1166 }
1167
1168 #[test]
1169 fn lists_only_shipped_derived_slices_for_replay_audit() {
1170 let store = SqliteFlowStore::in_memory("site-a").unwrap();
1171 let first = atom(1, vec![]);
1172 store.emit_atom(&first).unwrap();
1173
1174 let shipped = Slice {
1175 id: SliceId([4; 32]),
1176 atoms: vec![first.id],
1177 intents: Vec::new(),
1178 invariants_applied: vec![(
1179 PredicateHash::new("sha256:retro"),
1180 crate::flow::InvariantResult::allow(),
1181 )],
1182 required_tests: vec![TestId::new("flow-store")],
1183 approval_chain: Vec::new(),
1184 base_ref: first.id,
1185 status: SliceStatus::Ready,
1186 };
1187 let unshipped = Slice {
1188 id: SliceId([5; 32]),
1189 atoms: vec![first.id],
1190 intents: Vec::new(),
1191 invariants_applied: Vec::new(),
1192 required_tests: Vec::new(),
1193 approval_chain: Vec::new(),
1194 base_ref: first.id,
1195 status: SliceStatus::Ready,
1196 };
1197
1198 store.put_shipped_derived_slice(&shipped).unwrap();
1199 store.put_derived_slice(&unshipped).unwrap();
1200
1201 let rows = store.shipped_derived_slices_since(None).unwrap();
1202 assert_eq!(rows.len(), 1);
1203 assert_eq!(rows[0].slice, shipped);
1204 assert!(!rows[0].created_at.is_empty());
1205 }
1206
1207 #[test]
1208 fn atoms_are_append_only_at_sql_boundary() {
1209 let store = SqliteFlowStore::in_memory("site-a").unwrap();
1210 let first = atom(1, vec![]);
1211 store.emit_atom(&first).unwrap();
1212
1213 let conn = store.lock_conn().unwrap();
1214 let error = conn
1215 .execute(
1216 "DELETE FROM atoms WHERE id = ?1",
1217 params![first.id.0.as_slice()],
1218 )
1219 .unwrap_err();
1220 assert!(error.to_string().contains("atoms are append-only"));
1221 }
1222
1223 #[test]
1224 fn slices_are_append_only_at_sql_boundary() {
1225 let store = SqliteFlowStore::in_memory("site-a").unwrap();
1226 let first = atom(1, vec![]);
1227 store.emit_atom(&first).unwrap();
1228 let slice = Slice {
1229 id: SliceId([6; 32]),
1230 atoms: vec![first.id],
1231 intents: Vec::new(),
1232 invariants_applied: Vec::new(),
1233 required_tests: Vec::new(),
1234 approval_chain: Vec::new(),
1235 base_ref: first.id,
1236 status: SliceStatus::Ready,
1237 };
1238 store.put_shipped_derived_slice(&slice).unwrap();
1239
1240 let conn = store.lock_conn().unwrap();
1241 let error = conn
1242 .execute(
1243 "DELETE FROM slices WHERE id = ?1",
1244 params![slice.id.0.as_slice()],
1245 )
1246 .unwrap_err();
1247 assert!(error.to_string().contains("slices are append-only"));
1248 }
1249}