Skip to main content

agent_store/
backend.rs

1//! The pluggable storage backend.
2//!
3//! The substrate's primitives ([`crate::Generation`], [`crate::WriterLog`])
4//! are written against the [`Backend`] trait, never against a concrete
5//! database. Today the only implementation is [`SqliteBackend`] (rusqlite,
6//! bundled — zero system deps, the fleet default). A `PgBackend` over the
7//! **synchronous** `postgres` crate slots in behind the same trait in Phase 2
8//! without touching any primitive code; it will wrap its client in interior
9//! mutability so the `&self` shape here still holds.
10//!
11//! SQL is written with `?` positional placeholders. SQLite consumes them
12//! directly; the future Postgres backend rewrites `?` → `$1, $2, …`. Keep the
13//! SQL to the portable subset both dialects share (the primitives do).
14
15use std::path::Path;
16
17use crate::error::{Result, StoreError};
18
19/// A database-neutral value, used for both bind parameters and returned cells.
20#[derive(Clone, Debug, PartialEq)]
21pub enum Value {
22    Null,
23    Int(i64),
24    Real(f64),
25    Text(String),
26    Blob(Vec<u8>),
27}
28
29/// One returned row: a column-ordered vector of [`Value`]s.
30pub type Row = Vec<Value>;
31
32/// Which SQL dialect a backend speaks — primitives use this only for the few
33/// places the portable subset is not enough.
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub enum Dialect {
36    Sqlite,
37    Postgres,
38}
39
40/// A pluggable, **synchronous** storage backend.
41///
42/// Object-safe on purpose: primitives take `&dyn Backend`, so a single
43/// compiled primitive serves every backend.
44pub trait Backend {
45    /// The dialect this backend speaks.
46    fn dialect(&self) -> Dialect;
47
48    /// Run a statement, returning the number of rows affected.
49    fn exec(&self, sql: &str, params: &[Value]) -> Result<u64>;
50
51    /// Run a query, returning all rows. Also used for `RETURNING` statements.
52    fn query(&self, sql: &str, params: &[Value]) -> Result<Vec<Row>>;
53}
54
55// ---------------------------------------------------------------------------
56// SQLite backend (the fleet default)
57// ---------------------------------------------------------------------------
58
59/// A SQLite-backed [`Backend`] using bundled rusqlite (no system libsqlite3).
60pub struct SqliteBackend {
61    conn: rusqlite::Connection,
62}
63
64impl SqliteBackend {
65    /// Open (or create) a SQLite database at `path`, in WAL mode with a busy
66    /// timeout so co-located processes serialize cleanly rather than erroring.
67    pub fn open(path: &Path) -> Result<Self> {
68        let conn = rusqlite::Connection::open(path)?;
69        Self::apply_pragmas(&conn)?;
70        Ok(Self { conn })
71    }
72
73    /// Open an in-memory database (tests, ephemeral use).
74    pub fn in_memory() -> Result<Self> {
75        let conn = rusqlite::Connection::open_in_memory()?;
76        Ok(Self { conn })
77    }
78
79    /// Wrap a connection a consumer already owns — the **incremental-adoption
80    /// seam**. A consumer (newt's `ConversationStore`, modulex's `Store`) that
81    /// already holds a `rusqlite::Connection` hands it over, keeps running its
82    /// own domain SQL through [`SqliteBackend::connection`], and gets the
83    /// agent-store primitives on the *same* database: no second connection, no
84    /// big-bang rewrite. Pragmas are the caller's responsibility here (the
85    /// connection is assumed already configured).
86    pub fn from_connection(conn: rusqlite::Connection) -> Self {
87        Self { conn }
88    }
89
90    /// Borrow the underlying SQLite connection for backend-specific
91    /// (domain-table) SQL. SQLite-only by nature — the [`Backend`] trait stays
92    /// the portable, backend-agnostic surface; this escape hatch is how a
93    /// consumer keeps its existing rusqlite code while adopting the substrate.
94    pub fn connection(&self) -> &rusqlite::Connection {
95        &self.conn
96    }
97
98    fn apply_pragmas(conn: &rusqlite::Connection) -> Result<()> {
99        // WAL + a generous busy timeout: multiple co-located agents serialize
100        // on the write lock instead of failing fast. (NFS-home degradation to
101        // DELETE journal mode is a consumer concern, handled where the path is
102        // chosen, not here.)
103        conn.busy_timeout(std::time::Duration::from_secs(5))?;
104        conn.pragma_update(None, "journal_mode", "WAL")?;
105        conn.pragma_update(None, "synchronous", "NORMAL")?;
106        Ok(())
107    }
108}
109
110impl Backend for SqliteBackend {
111    fn dialect(&self) -> Dialect {
112        Dialect::Sqlite
113    }
114
115    fn exec(&self, sql: &str, params: &[Value]) -> Result<u64> {
116        let n = self
117            .conn
118            .execute(sql, rusqlite::params_from_iter(params.iter()))?;
119        Ok(n as u64)
120    }
121
122    fn query(&self, sql: &str, params: &[Value]) -> Result<Vec<Row>> {
123        let mut stmt = self.conn.prepare(sql)?;
124        let ncols = stmt.column_count();
125        let rows = stmt
126            .query_map(rusqlite::params_from_iter(params.iter()), |row| {
127                (0..ncols)
128                    .map(|i| row.get_ref(i).map(value_from_ref))
129                    .collect::<rusqlite::Result<Row>>()
130            })?
131            .collect::<rusqlite::Result<Vec<Row>>>()?;
132        Ok(rows)
133    }
134}
135
136fn value_from_ref(v: rusqlite::types::ValueRef<'_>) -> Value {
137    use rusqlite::types::ValueRef;
138    match v {
139        ValueRef::Null => Value::Null,
140        ValueRef::Integer(i) => Value::Int(i),
141        ValueRef::Real(f) => Value::Real(f),
142        ValueRef::Text(t) => Value::Text(String::from_utf8_lossy(t).into_owned()),
143        ValueRef::Blob(b) => Value::Blob(b.to_vec()),
144    }
145}
146
147impl rusqlite::ToSql for Value {
148    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
149        use rusqlite::types::{ToSqlOutput, Value as SqlValue, ValueRef};
150        Ok(match self {
151            Value::Null => ToSqlOutput::Owned(SqlValue::Null),
152            Value::Int(i) => ToSqlOutput::Owned(SqlValue::Integer(*i)),
153            Value::Real(f) => ToSqlOutput::Owned(SqlValue::Real(*f)),
154            Value::Text(s) => ToSqlOutput::Borrowed(ValueRef::Text(s.as_bytes())),
155            Value::Blob(b) => ToSqlOutput::Borrowed(ValueRef::Blob(b)),
156        })
157    }
158}
159
160/// Extract exactly 32 bytes from a `Value::Blob` (hash columns).
161pub(crate) fn blob32(v: &Value) -> Result<[u8; 32]> {
162    match v {
163        Value::Blob(b) if b.len() == 32 => {
164            let mut out = [0u8; 32];
165            out.copy_from_slice(b);
166            Ok(out)
167        }
168        Value::Blob(b) => Err(StoreError::MalformedRow(format!(
169            "expected 32-byte hash, got {} bytes",
170            b.len()
171        ))),
172        other => Err(StoreError::MalformedRow(format!(
173            "expected blob hash, got {other:?}"
174        ))),
175    }
176}
177
178/// Extract a `u64` from a `Value::Int`.
179pub(crate) fn as_u64(v: &Value) -> Result<u64> {
180    match v {
181        Value::Int(i) => Ok(*i as u64),
182        other => Err(StoreError::MalformedRow(format!(
183            "expected integer, got {other:?}"
184        ))),
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn round_trips_values() {
194        let db = SqliteBackend::in_memory().unwrap();
195        db.exec(
196            "CREATE TABLE t (i INTEGER, r REAL, s TEXT, b BLOB, n INTEGER)",
197            &[],
198        )
199        .unwrap();
200        db.exec(
201            "INSERT INTO t (i, r, s, b, n) VALUES (?, ?, ?, ?, ?)",
202            &[
203                Value::Int(42),
204                Value::Real(1.5),
205                Value::Text("hi".into()),
206                Value::Blob(vec![1, 2, 3]),
207                Value::Null,
208            ],
209        )
210        .unwrap();
211        let rows = db.query("SELECT i, r, s, b, n FROM t", &[]).unwrap();
212        assert_eq!(rows.len(), 1);
213        assert_eq!(
214            rows[0],
215            vec![
216                Value::Int(42),
217                Value::Real(1.5),
218                Value::Text("hi".into()),
219                Value::Blob(vec![1, 2, 3]),
220                Value::Null,
221            ]
222        );
223    }
224
225    #[test]
226    fn dialect_is_sqlite() {
227        let db = SqliteBackend::in_memory().unwrap();
228        assert_eq!(db.dialect(), Dialect::Sqlite);
229    }
230
231    #[test]
232    fn from_connection_wraps_and_shares_the_database() {
233        // A consumer's own connection, handed to the substrate.
234        let conn = rusqlite::Connection::open_in_memory().unwrap();
235        let db = SqliteBackend::from_connection(conn);
236
237        // Substrate writes through the Backend trait...
238        db.exec("CREATE TABLE t (x INTEGER)", &[]).unwrap();
239        // ...and the consumer keeps its own rusqlite domain SQL via the escape
240        // hatch — both hit the same database.
241        db.connection()
242            .execute("INSERT INTO t VALUES (7)", [])
243            .unwrap();
244
245        let rows = db.query("SELECT x FROM t", &[]).unwrap();
246        assert_eq!(rows, vec![vec![Value::Int(7)]]);
247    }
248}