Skip to main content

zlayer_store/
backend.rs

1//! Generic `SQLite`/`sqlx` adapter for blob-shaped stores.
2//!
3//! `JsonStore<T>` is the persistence engine shared by the "blob + optional
4//! unique indexes" style of `ZLayer` resource — notifiers, variables, tasks,
5//! workflows, user groups, and syncs. A record is stored as an opaque
6//! `serde_json`-serialised blob in a `data_json` column; zero or more columns
7//! are peeled out via caller-supplied extractor functions so that unique
8//! constraints and secondary lookups remain first-class SQL rather than
9//! whole-table scans.
10//!
11//! # Design
12//!
13//! The caller owns the record type `T`. `T` must be `Serialize +
14//! DeserializeOwned + Send + Sync + 'static`. The caller describes how `T`
15//! maps onto the table via [`JsonTable`]:
16//!
17//! ```ignore
18//! use zlayer_store::{IndexSpec, JsonTable, JsonStore};
19//!
20//! # #[derive(serde::Serialize, serde::Deserialize)]
21//! # struct MyRecord { id: String, name: String, scope: Option<String> }
22//! let table = JsonTable::<MyRecord> {
23//!     name: "my_records",
24//!     primary_key: "id",
25//!     indexes: &[
26//!         IndexSpec { column: "name",  extractor: |r| Some(r.name.clone()), unique: true  },
27//!         IndexSpec { column: "scope", extractor: |r| r.scope.clone(),      unique: false },
28//!     ],
29//!     unique_constraints: &[],
30//! };
31//! # async fn _example() -> Result<(), zlayer_store::StorageError> {
32//! let store: JsonStore<MyRecord> = JsonStore::in_memory(table).await?;
33//! # Ok(()) }
34//! ```
35//!
36//! The generated schema is:
37//!
38//! ```sql
39//! CREATE TABLE IF NOT EXISTS my_records (
40//!     id TEXT PRIMARY KEY NOT NULL,
41//!     name TEXT,
42//!     scope TEXT,
43//!     data_json TEXT NOT NULL,
44//!     created_at TEXT NOT NULL,
45//!     updated_at TEXT NOT NULL,
46//!     UNIQUE(name)
47//! )
48//! CREATE INDEX IF NOT EXISTS idx_my_records_scope ON my_records(scope)
49//! ```
50//!
51//! A table with an empty `indexes` slice produces the minimal 5-column shape
52//! with no `UNIQUE` clause — that is what the notifier store uses.
53//!
54//! # Compound uniqueness
55//!
56//! For resources that need uniqueness across a combination of columns (e.g.
57//! variables with `UNIQUE(name, scope)`), populate
58//! [`JsonTable::unique_constraints`] with static slices of column names. Each
59//! inner slice becomes one `UNIQUE(col_a, col_b, ...)` clause in the generated
60//! DDL. The columns referenced must also appear in [`JsonTable::indexes`] so
61//! that they are peeled out at write time; otherwise `SQLite` has no column to
62//! enforce the constraint against.
63//!
64//! Note: `SQLite` treats `NULL`s as distinct in `UNIQUE`, so a compound unique
65//! over `(name, scope)` only enforces uniqueness among rows where every
66//! referenced column is non-`NULL`. Rows where any referenced column is `NULL`
67//! — for example a global variable with `scope = None` — are enforced in
68//! application code via a pre-write `get_by_*` check (see
69//! `InMemoryVariableStore::store` and `SqlxVariableStore::store` for the
70//! pattern).
71//!
72//! # Operations
73//!
74//! - [`put`](JsonStore::put) — upsert by `id`, preserving `created_at` on
75//!   conflict.
76//! - [`get`](JsonStore::get), [`list`](JsonStore::list),
77//!   [`delete`](JsonStore::delete), [`count`](JsonStore::count) — the
78//!   usual CRUD primitives.
79//! - [`get_by_unique`](JsonStore::get_by_unique) — fetch by any indexed
80//!   column (not just unique ones).
81//!
82//! Unique-constraint violations surface as [`StorageError::AlreadyExists`] —
83//! not as a raw `Database(...)` string — so callers can distinguish 409-class
84//! errors cleanly.
85//!
86//! # Identifier safety
87//!
88//! `JsonTable::name` and `IndexSpec::column` are `&'static str`, so they come
89//! from compile-time string literals at every call site. The schema DDL is
90//! assembled via `format!` — that is safe for `&'static str` table/column
91//! identifiers and would be a disaster for user-supplied strings. Do not
92//! pass run-time data to either field.
93
94use std::marker::PhantomData;
95use std::path::Path;
96
97use serde::de::DeserializeOwned;
98use serde::Serialize;
99use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
100use sqlx::Row;
101
102use crate::StorageError;
103
104/// Specification for a single secondary column peeled out of `T` at write
105/// time. All peeled columns are typed `TEXT` in `SQLite`; `None` from the
106/// extractor becomes SQL `NULL`.
107pub struct IndexSpec<T> {
108    /// Column name. Must be a valid SQL identifier (a-z, A-Z, 0-9, `_`) and
109    /// is splat directly into the generated DDL, so this MUST be a
110    /// compile-time string literal.
111    pub column: &'static str,
112
113    /// Extractor pulling the column value out of `T`. `None` is written as
114    /// SQL `NULL`.
115    pub extractor: fn(&T) -> Option<String>,
116
117    /// If `true`, `column` is included in a table-level `UNIQUE(...)`
118    /// constraint. `SQLite` treats `NULL`s as distinct in `UNIQUE`, so
119    /// nullable unique columns only enforce uniqueness among non-`NULL`
120    /// values. Callers needing "unique including `NULL`" must enforce it in
121    /// application code (see `environments.rs` for the pattern).
122    pub unique: bool,
123}
124
125/// Full table specification: name + the ordered list of indexed columns.
126///
127/// Shared across open/reopen so a downstream shim can keep the same layout
128/// forever without duplicating strings.
129///
130/// The `'static` bound on `T` is required because `indexes` is a
131/// `&'static [IndexSpec<T>]` — the compiler needs to know any `T` referenced
132/// by the static slice is itself valid for `'static`. Every concrete record
133/// type we store (e.g. `StoredNotifier`) owns its data, so this bound is
134/// always satisfied in practice.
135pub struct JsonTable<T: 'static> {
136    /// SQL table name. Must be a valid SQL identifier; splat directly into
137    /// DDL, so this MUST be a compile-time string literal.
138    pub name: &'static str,
139
140    /// Name of the primary-key column. Defaults to `"id"` for every store
141    /// created fresh by this codebase. It exists as a knob so a pre-existing
142    /// on-disk schema whose primary key is named something else can be routed
143    /// through this adapter without a destructive migration — the
144    /// `deployments` table, for instance, has had `name TEXT PRIMARY KEY`
145    /// since before the adapter existed, so it sets `primary_key: "name"`.
146    /// Like every other identifier on this struct it is splat directly into
147    /// the DDL/queries, so it MUST be a compile-time string literal.
148    pub primary_key: &'static str,
149
150    /// Indexed columns. May be empty for pure blob-only tables.
151    pub indexes: &'static [IndexSpec<T>],
152
153    /// Compound `UNIQUE(col_a, col_b, ...)` constraints emitted at
154    /// `CREATE TABLE` time. Each inner slice becomes one `UNIQUE(...)` clause
155    /// in the generated DDL. For single-column uniqueness prefer
156    /// `IndexSpec::unique = true` — it's flatter. Use this field when
157    /// uniqueness spans two or more columns (e.g. `UNIQUE(name, scope)`).
158    ///
159    /// Every column name referenced here MUST also appear in `indexes` so
160    /// that it's peeled out of `T` at write time; otherwise the `CREATE TABLE`
161    /// statement will reference a column that doesn't exist. The column names
162    /// are splat directly into the DDL, so they MUST be compile-time string
163    /// literals. May be empty when no compound constraints are needed.
164    pub unique_constraints: &'static [&'static [&'static str]],
165}
166
167/// Generic blob store keyed by a string `id`.
168///
169/// This is the `SQLite`-backed implementation of the adapter. The type name
170/// is engine-neutral on purpose: the `ZLayerZQL` mirror overrides this entire
171/// `backend` module with a ZQL-backed `JsonStore` exposing an identical API,
172/// so the per-resource storage files that hold a `JsonStore<T>` regenerate
173/// mechanically with no hand edits.
174///
175/// Use via a narrow wrapper type in each storage module (see
176/// `notifiers.rs::SqlxNotifierStore` for the pattern).
177pub struct JsonStore<T>
178where
179    T: Serialize + DeserializeOwned + Send + Sync + 'static,
180{
181    pool: SqlitePool,
182    table: JsonTable<T>,
183    _marker: PhantomData<fn() -> T>,
184}
185
186impl<T> JsonStore<T>
187where
188    T: Serialize + DeserializeOwned + Send + Sync + 'static,
189{
190    /// Open or create a `SQLite` database at the given path and ensure the
191    /// table + indexes exist.
192    ///
193    /// # Errors
194    ///
195    /// Returns [`StorageError`] if the database cannot be opened or if the
196    /// schema cannot be created.
197    pub async fn open<P: AsRef<Path>>(path: P, table: JsonTable<T>) -> Result<Self, StorageError> {
198        let path_str = path.as_ref().display().to_string();
199        let connection_string = format!("sqlite:{path_str}?mode=rwc");
200
201        let pool = SqlitePoolOptions::new()
202            .max_connections(5)
203            .connect(&connection_string)
204            .await?;
205
206        sqlx::query("PRAGMA journal_mode=WAL")
207            .execute(&pool)
208            .await?;
209        sqlx::query("PRAGMA busy_timeout=5000")
210            .execute(&pool)
211            .await?;
212
213        let this = Self {
214            pool,
215            table,
216            _marker: PhantomData,
217        };
218        this.init_schema().await?;
219        Ok(this)
220    }
221
222    /// Create an in-memory `SQLite` database (useful for tests) with the
223    /// table + indexes in place.
224    ///
225    /// # Errors
226    ///
227    /// Returns [`StorageError`] if the in-memory database cannot be created.
228    pub async fn in_memory(table: JsonTable<T>) -> Result<Self, StorageError> {
229        let pool = SqlitePool::connect(":memory:").await?;
230        let this = Self {
231            pool,
232            table,
233            _marker: PhantomData,
234        };
235        this.init_schema().await?;
236        Ok(this)
237    }
238
239    /// Create the table + per-index secondary indexes if they don't exist.
240    async fn init_schema(&self) -> Result<(), StorageError> {
241        let table_name = self.table.name;
242
243        // Build the CREATE TABLE statement. Every peeled column is `TEXT`
244        // nullable; the blob is `data_json TEXT NOT NULL`; the timestamps
245        // are ISO-8601 RFC-3339 strings.
246        let mut ddl = String::with_capacity(256);
247        ddl.push_str("CREATE TABLE IF NOT EXISTS ");
248        ddl.push_str(table_name);
249        ddl.push_str(" (\n    ");
250        ddl.push_str(self.table.primary_key);
251        ddl.push_str(" TEXT PRIMARY KEY NOT NULL");
252
253        for idx in self.table.indexes {
254            ddl.push_str(",\n    ");
255            ddl.push_str(idx.column);
256            ddl.push_str(" TEXT");
257        }
258
259        ddl.push_str(",\n    data_json TEXT NOT NULL");
260        ddl.push_str(",\n    created_at TEXT NOT NULL");
261        ddl.push_str(",\n    updated_at TEXT NOT NULL");
262
263        // Compound UNIQUE clause across all `unique: true` columns, matching
264        // the `users.rs` / `environments.rs` shape.
265        let unique_cols: Vec<&'static str> = self
266            .table
267            .indexes
268            .iter()
269            .filter(|i| i.unique)
270            .map(|i| i.column)
271            .collect();
272        if !unique_cols.is_empty() {
273            ddl.push_str(",\n    UNIQUE(");
274            for (i, col) in unique_cols.iter().enumerate() {
275                if i > 0 {
276                    ddl.push_str(", ");
277                }
278                ddl.push_str(col);
279            }
280            ddl.push(')');
281        }
282
283        // Extra compound UNIQUE constraints — one clause per inner slice.
284        // Every referenced column must exist in `indexes` so it's actually
285        // peeled out of `T` at write time.
286        let indexed_columns: std::collections::HashSet<&'static str> =
287            self.table.indexes.iter().map(|i| i.column).collect();
288        for constraint in self.table.unique_constraints {
289            if constraint.is_empty() {
290                continue;
291            }
292            for col in *constraint {
293                debug_assert!(
294                    indexed_columns.contains(col),
295                    "unique_constraints references column '{col}' on table \
296                     '{table_name}' that is not declared in JsonTable::indexes"
297                );
298            }
299            ddl.push_str(",\n    UNIQUE(");
300            for (i, col) in constraint.iter().enumerate() {
301                if i > 0 {
302                    ddl.push_str(", ");
303                }
304                ddl.push_str(col);
305            }
306            ddl.push(')');
307        }
308        ddl.push_str("\n)");
309
310        sqlx::query(&ddl).execute(&self.pool).await?;
311
312        // Secondary indexes on every peeled column (including unique ones)
313        // for fast `get_by_unique` lookup.
314        for idx in self.table.indexes {
315            let idx_ddl = format!(
316                "CREATE INDEX IF NOT EXISTS idx_{table}_{col} ON {table}({col})",
317                table = table_name,
318                col = idx.column,
319            );
320            sqlx::query(&idx_ddl).execute(&self.pool).await?;
321        }
322
323        Ok(())
324    }
325
326    /// Upsert a record by its primary key `id`.
327    ///
328    /// - If `id` is new, a row is inserted with `created_at = updated_at =
329    ///   now` — both timestamps are taken from `now()` at write time, NOT
330    ///   from any field on `T`, because the adapter does not know which
331    ///   field (if any) of `T` holds a timestamp.
332    /// - If `id` already exists, every column is refreshed from the new
333    ///   blob except `created_at`, which is preserved.
334    ///
335    /// # Errors
336    ///
337    /// - [`StorageError::AlreadyExists`] if the write violates a `UNIQUE`
338    ///   constraint on one of the peeled columns.
339    /// - [`StorageError::Serialization`] if `T` cannot be encoded to JSON.
340    /// - [`StorageError::Database`] for any other `sqlx` error.
341    pub async fn put(&self, id: &str, value: &T) -> Result<(), StorageError> {
342        let data_json = serde_json::to_string(value)?;
343        let now = chrono::Utc::now().to_rfc3339();
344        let table_name = self.table.name;
345
346        // Build column and placeholder lists. `?` for every value; `sqlx`
347        // does the binding so there's no injection risk from the blob or
348        // extracted columns.
349        let mut columns: Vec<&str> = Vec::with_capacity(4 + self.table.indexes.len());
350        columns.push(self.table.primary_key);
351        for idx in self.table.indexes {
352            columns.push(idx.column);
353        }
354        columns.push("data_json");
355        columns.push("created_at");
356        columns.push("updated_at");
357
358        let placeholders = (0..columns.len())
359            .map(|_| "?")
360            .collect::<Vec<_>>()
361            .join(", ");
362
363        // ON CONFLICT(<pk>) update clause — everything except `created_at`.
364        let update_assignments = {
365            let mut parts: Vec<String> = Vec::new();
366            for idx in self.table.indexes {
367                parts.push(format!("{col} = excluded.{col}", col = idx.column));
368            }
369            parts.push("data_json = excluded.data_json".to_string());
370            parts.push("updated_at = excluded.updated_at".to_string());
371            parts.join(", ")
372        };
373
374        let sql = format!(
375            "INSERT INTO {table} ({cols}) VALUES ({placeholders}) \
376             ON CONFLICT({pk}) DO UPDATE SET {updates}",
377            table = table_name,
378            cols = columns.join(", "),
379            placeholders = placeholders,
380            pk = self.table.primary_key,
381            updates = update_assignments,
382        );
383
384        let mut query = sqlx::query(&sql).bind(id);
385        for idx in self.table.indexes {
386            query = query.bind((idx.extractor)(value));
387        }
388        query = query.bind(&data_json).bind(&now).bind(&now);
389
390        match query.execute(&self.pool).await {
391            Ok(_) => Ok(()),
392            Err(err) => Err(map_sqlx_err(err, self.table.name)),
393        }
394    }
395
396    /// Fetch a record by primary key.
397    ///
398    /// # Errors
399    ///
400    /// - [`StorageError::Database`] on `sqlx` failure.
401    /// - [`StorageError::Serialization`] if the blob cannot be decoded.
402    pub async fn get(&self, id: &str) -> Result<Option<T>, StorageError> {
403        let sql = format!(
404            "SELECT data_json FROM {table} WHERE {pk} = ?",
405            table = self.table.name,
406            pk = self.table.primary_key,
407        );
408
409        let row: Option<(String,)> = sqlx::query_as(&sql)
410            .bind(id)
411            .fetch_optional(&self.pool)
412            .await?;
413
414        match row {
415            Some((data_json,)) => {
416                let value: T = serde_json::from_str(&data_json)?;
417                Ok(Some(value))
418            }
419            None => Ok(None),
420        }
421    }
422
423    /// List every record in the table, ordered by the primary-key column
424    /// (`primary_key`, default `"id"`) ascending.
425    ///
426    /// Most callers want a domain-specific ordering (e.g. notifier name),
427    /// which they can do by sorting in memory after `list()` returns. Doing
428    /// the sort here would require knowing which field of `T` to order by,
429    /// which the adapter cannot know generically.
430    ///
431    /// # Errors
432    ///
433    /// - [`StorageError::Database`] on `sqlx` failure.
434    /// - [`StorageError::Serialization`] if any blob cannot be decoded.
435    pub async fn list(&self) -> Result<Vec<T>, StorageError> {
436        let sql = format!(
437            "SELECT data_json FROM {table} ORDER BY {pk} ASC",
438            table = self.table.name,
439            pk = self.table.primary_key,
440        );
441
442        let rows: Vec<(String,)> = sqlx::query_as(&sql).fetch_all(&self.pool).await?;
443
444        let mut out = Vec::with_capacity(rows.len());
445        for (data_json,) in rows {
446            let value: T = serde_json::from_str(&data_json)?;
447            out.push(value);
448        }
449        Ok(out)
450    }
451
452    /// List every record in the table ordered by the primary-key column
453    /// ascending, **tolerating** rows whose `data_json` blob fails to
454    /// deserialize: each such row is logged at `error` level (with its
455    /// primary-key value) and skipped rather than failing the whole call.
456    ///
457    /// This is the lossy counterpart to [`list`](Self::list). Use it for
458    /// stores where a single corrupt/legacy row must never take down the
459    /// entire listing — the `deployments` store relies on this so a poisoned
460    /// record can't 500 every `list_deployments` call and survive restart.
461    ///
462    /// Because the blob can't be decoded into `T` for a skipped row, the
463    /// primary-key column is selected alongside `data_json` purely for the
464    /// log line. The PK is always a `TEXT` column, so it decodes as a string.
465    ///
466    /// # Errors
467    ///
468    /// Returns [`StorageError::Database`] on `sqlx` failure. Per-row
469    /// deserialize failures are skipped, not returned.
470    pub async fn list_lossy(&self) -> Result<Vec<T>, StorageError> {
471        let sql = format!(
472            "SELECT {pk}, data_json FROM {table} ORDER BY {pk} ASC",
473            table = self.table.name,
474            pk = self.table.primary_key,
475        );
476
477        let rows: Vec<(String, String)> = sqlx::query_as(&sql).fetch_all(&self.pool).await?;
478
479        let mut out = Vec::with_capacity(rows.len());
480        for (key, data_json) in rows {
481            match serde_json::from_str::<T>(&data_json) {
482                Ok(value) => out.push(value),
483                Err(e) => {
484                    tracing::error!(
485                        table = %self.table.name,
486                        key = %key,
487                        error = %e,
488                        "skipping un-deserializable row in list_lossy()"
489                    );
490                }
491            }
492        }
493        Ok(out)
494    }
495
496    /// Delete a record by primary key.
497    ///
498    /// # Errors
499    ///
500    /// Returns [`StorageError::Database`] on `sqlx` failure.
501    pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
502        let sql = format!(
503            "DELETE FROM {table} WHERE {pk} = ?",
504            table = self.table.name,
505            pk = self.table.primary_key,
506        );
507        let result = sqlx::query(&sql).bind(id).execute(&self.pool).await?;
508        Ok(result.rows_affected() > 0)
509    }
510
511    /// Fetch a record by one of its peeled secondary columns.
512    ///
513    /// The column must appear in the `JsonTable::indexes` list. Works for
514    /// any indexed column — unique or non-unique. When the column is
515    /// non-unique, the first matching row (by `rowid`) is returned.
516    ///
517    /// # Errors
518    ///
519    /// - [`StorageError::Other`] if `column` does not correspond to any
520    ///   declared index.
521    /// - [`StorageError::Database`] on `sqlx` failure.
522    /// - [`StorageError::Serialization`] if the blob cannot be decoded.
523    pub async fn get_by_unique(
524        &self,
525        column: &str,
526        value: &str,
527    ) -> Result<Option<T>, StorageError> {
528        // Validate the column against the static index list — this is what
529        // prevents SQL injection via `column`.
530        let found = self.table.indexes.iter().any(|i| i.column == column);
531        if !found {
532            return Err(StorageError::Other(format!(
533                "unknown column '{column}' for table '{table}'",
534                table = self.table.name
535            )));
536        }
537
538        let sql = format!(
539            "SELECT data_json FROM {table} WHERE {col} = ? LIMIT 1",
540            table = self.table.name,
541            col = column,
542        );
543
544        let row: Option<(String,)> = sqlx::query_as(&sql)
545            .bind(value)
546            .fetch_optional(&self.pool)
547            .await?;
548
549        match row {
550            Some((data_json,)) => {
551                let value: T = serde_json::from_str(&data_json)?;
552                Ok(Some(value))
553            }
554            None => Ok(None),
555        }
556    }
557
558    /// List every record where a peeled column matches a specific value.
559    /// Rows are ordered by `id` ascending, matching [`list`](Self::list).
560    ///
561    /// Use [`list_where_opt`](Self::list_where_opt) if you need to filter
562    /// on `NULL` — passing `""` to this function binds the empty string, not
563    /// SQL `NULL`.
564    ///
565    /// # Errors
566    ///
567    /// - [`StorageError::Other`] if `column` is not declared in the table's
568    ///   index list.
569    /// - [`StorageError::Database`] on `sqlx` failure.
570    /// - [`StorageError::Serialization`] if any blob cannot be decoded.
571    pub async fn list_where(&self, column: &str, value: &str) -> Result<Vec<T>, StorageError> {
572        self.list_where_opt(column, Some(value)).await
573    }
574
575    /// List every record where a peeled column is SQL `NULL`. Rows are
576    /// ordered by `id` ascending.
577    ///
578    /// # Errors
579    ///
580    /// - [`StorageError::Other`] if `column` is not declared in the table's
581    ///   index list.
582    /// - [`StorageError::Database`] on `sqlx` failure.
583    /// - [`StorageError::Serialization`] if any blob cannot be decoded.
584    pub async fn list_where_null(&self, column: &str) -> Result<Vec<T>, StorageError> {
585        self.list_where_opt(column, None).await
586    }
587
588    /// List every record filtered by a peeled column. `Some(v)` matches
589    /// `column = ?` with `v` bound; `None` matches `column IS NULL`. Rows are
590    /// ordered by `id` ascending.
591    ///
592    /// This is the unified primitive used by both [`list_where`](Self::list_where)
593    /// and [`list_where_null`](Self::list_where_null).
594    ///
595    /// # Errors
596    ///
597    /// - [`StorageError::Other`] if `column` is not declared in the table's
598    ///   index list.
599    /// - [`StorageError::Database`] on `sqlx` failure.
600    /// - [`StorageError::Serialization`] if any blob cannot be decoded.
601    pub async fn list_where_opt(
602        &self,
603        column: &str,
604        value: Option<&str>,
605    ) -> Result<Vec<T>, StorageError> {
606        // Validate the column against the static index list — this is what
607        // prevents SQL injection via `column`.
608        let found = self.table.indexes.iter().any(|i| i.column == column);
609        if !found {
610            return Err(StorageError::Other(format!(
611                "unknown column '{column}' for table '{table}'",
612                table = self.table.name
613            )));
614        }
615
616        let rows: Vec<(String,)> = if let Some(v) = value {
617            let sql = format!(
618                "SELECT data_json FROM {table} WHERE {col} = ? ORDER BY {pk} ASC",
619                table = self.table.name,
620                col = column,
621                pk = self.table.primary_key,
622            );
623            sqlx::query_as(&sql).bind(v).fetch_all(&self.pool).await?
624        } else {
625            let sql = format!(
626                "SELECT data_json FROM {table} WHERE {col} IS NULL ORDER BY {pk} ASC",
627                table = self.table.name,
628                col = column,
629                pk = self.table.primary_key,
630            );
631            sqlx::query_as(&sql).fetch_all(&self.pool).await?
632        };
633
634        let mut out = Vec::with_capacity(rows.len());
635        for (data_json,) in rows {
636            let decoded: T = serde_json::from_str(&data_json)?;
637            out.push(decoded);
638        }
639        Ok(out)
640    }
641
642    /// Return a reference to the underlying `SqlitePool`.
643    ///
644    /// Exposed `pub` as a temporary migration bridge: sibling stores still
645    /// living in `crates/zlayer-api/src/storage/` (tasks, workflows, groups)
646    /// run auxiliary queries — creating companion tables (e.g. `task_runs`,
647    /// `group_members`), doing cross-table transactions during cascade
648    /// deletes — against the same pool without opening a second connection.
649    /// It was `pub(crate)` while `JsonStore` lived inside `zlayer-api`;
650    /// the extraction into `zlayer-store` requires `pub` so those
651    /// cross-crate callers keep compiling. This is NOT a stable public API:
652    /// once the companion-table stores move into `zlayer-store` too, this
653    /// should return to a narrower visibility.
654    #[must_use]
655    pub fn pool(&self) -> &SqlitePool {
656        &self.pool
657    }
658
659    /// Return the total number of rows in the table.
660    ///
661    /// # Errors
662    ///
663    /// Returns [`StorageError::Database`] on `sqlx` failure.
664    pub async fn count(&self) -> Result<u64, StorageError> {
665        let sql = format!("SELECT COUNT(*) FROM {table}", table = self.table.name);
666        let row = sqlx::query(&sql).fetch_one(&self.pool).await?;
667        let raw: i64 = row.try_get(0)?;
668        Ok(u64::try_from(raw).unwrap_or(0))
669    }
670}
671
672/// Map a raw `sqlx::Error` into a `StorageError`, surfacing `SQLite` 2067
673/// (`SQLITE_CONSTRAINT_UNIQUE`) / 1555 (`SQLITE_CONSTRAINT_PRIMARYKEY`) as a
674/// dedicated `AlreadyExists` variant and everything else as a generic
675/// `Database`.
676fn map_sqlx_err(err: sqlx::Error, table: &str) -> StorageError {
677    if let sqlx::Error::Database(db) = &err {
678        // SQLite error codes are returned as strings by `sqlx`. 2067 is the
679        // extended code for UNIQUE; 1555 is the extended code for PRIMARY
680        // KEY. Both are constraint violations that a caller typically wants
681        // to render as HTTP 409.
682        if let Some(code) = db.code() {
683            if code == "2067" || code == "1555" {
684                return StorageError::AlreadyExists(format!(
685                    "UNIQUE constraint failed on table '{table}': {db}"
686                ));
687            }
688        }
689        // Some SQLite builds don't populate extended codes; fall back to
690        // substring matching on the message for those cases.
691        let msg = db.message();
692        if msg.contains("UNIQUE constraint failed") {
693            return StorageError::AlreadyExists(format!(
694                "UNIQUE constraint failed on table '{table}': {msg}"
695            ));
696        }
697    }
698    StorageError::from(err)
699}
700
701#[cfg(test)]
702mod tests {
703    use super::*;
704    use serde::Deserialize;
705
706    /// Minimal record type exercising both a unique and a non-unique indexed
707    /// column plus the blob round-trip.
708    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
709    struct TestRecord {
710        id: String,
711        name: String,
712        value: String,
713    }
714
715    /// Table spec with a unique `name` column and a non-unique `value`
716    /// column — covers both branches of the index machinery.
717    fn indexed_table() -> JsonTable<TestRecord> {
718        static INDEXES: &[IndexSpec<TestRecord>] = &[
719            IndexSpec {
720                column: "name",
721                extractor: |r| Some(r.name.clone()),
722                unique: true,
723            },
724            IndexSpec {
725                column: "value",
726                extractor: |r| Some(r.value.clone()),
727                unique: false,
728            },
729        ];
730        JsonTable {
731            name: "test_records",
732            primary_key: "id",
733            indexes: INDEXES,
734            unique_constraints: &[],
735        }
736    }
737
738    /// Table spec with no secondary columns — matches the notifier shape.
739    fn blob_only_table() -> JsonTable<TestRecord> {
740        JsonTable {
741            name: "test_blobs",
742            primary_key: "id",
743            indexes: &[],
744            unique_constraints: &[],
745        }
746    }
747
748    /// Record with two peeled columns exercising a compound UNIQUE(name,
749    /// scope) constraint — the shape used by the variable store.
750    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
751    struct ScopedRecord {
752        id: String,
753        name: String,
754        scope: Option<String>,
755    }
756
757    fn scoped_table() -> JsonTable<ScopedRecord> {
758        static INDEXES: &[IndexSpec<ScopedRecord>] = &[
759            IndexSpec {
760                column: "name",
761                extractor: |r| Some(r.name.clone()),
762                unique: false,
763            },
764            IndexSpec {
765                column: "scope",
766                extractor: |r| r.scope.clone(),
767                unique: false,
768            },
769        ];
770        static UNIQUES: &[&[&str]] = &[&["name", "scope"]];
771        JsonTable {
772            name: "scoped_records",
773            primary_key: "id",
774            indexes: INDEXES,
775            unique_constraints: UNIQUES,
776        }
777    }
778
779    fn make_scoped(id: &str, name: &str, scope: Option<&str>) -> ScopedRecord {
780        ScopedRecord {
781            id: id.to_string(),
782            name: name.to_string(),
783            scope: scope.map(str::to_string),
784        }
785    }
786
787    fn make(id: &str, name: &str, value: &str) -> TestRecord {
788        TestRecord {
789            id: id.to_string(),
790            name: name.to_string(),
791            value: value.to_string(),
792        }
793    }
794
795    #[tokio::test]
796    async fn in_memory_round_trip_with_indexes() {
797        let store = JsonStore::in_memory(indexed_table()).await.unwrap();
798        let rec = make("id-1", "alpha", "v1");
799
800        store.put(&rec.id, &rec).await.unwrap();
801
802        let got = store.get("id-1").await.unwrap().expect("must exist");
803        assert_eq!(got, rec);
804
805        let list = store.list().await.unwrap();
806        assert_eq!(list.len(), 1);
807        assert_eq!(list[0], rec);
808
809        assert!(store.delete("id-1").await.unwrap());
810        assert!(store.get("id-1").await.unwrap().is_none());
811        assert!(!store.delete("id-1").await.unwrap());
812    }
813
814    #[tokio::test]
815    async fn in_memory_round_trip_blob_only() {
816        let store = JsonStore::in_memory(blob_only_table()).await.unwrap();
817        let rec = make("b-1", "bob", "42");
818
819        store.put(&rec.id, &rec).await.unwrap();
820        let got = store.get("b-1").await.unwrap().expect("must exist");
821        assert_eq!(got, rec);
822
823        let list = store.list().await.unwrap();
824        assert_eq!(list, vec![rec]);
825    }
826
827    #[tokio::test]
828    async fn unique_conflict_surfaces_as_already_exists() {
829        let store = JsonStore::in_memory(indexed_table()).await.unwrap();
830        store.put("a", &make("a", "dup", "v1")).await.unwrap();
831
832        let err = store
833            .put("b", &make("b", "dup", "v2"))
834            .await
835            .expect_err("unique violation must error");
836
837        match err {
838            StorageError::AlreadyExists(msg) => {
839                assert!(
840                    msg.contains("test_records"),
841                    "message should mention the table name, got: {msg}"
842                );
843            }
844            other => panic!("expected AlreadyExists, got {other:?}"),
845        }
846    }
847
848    #[tokio::test]
849    async fn get_by_unique_success_and_unknown_column() {
850        let store = JsonStore::in_memory(indexed_table()).await.unwrap();
851        store.put("x", &make("x", "name-x", "val-x")).await.unwrap();
852        store.put("y", &make("y", "name-y", "val-y")).await.unwrap();
853
854        // Unique column lookup.
855        let found = store
856            .get_by_unique("name", "name-x")
857            .await
858            .unwrap()
859            .expect("must exist");
860        assert_eq!(found.id, "x");
861
862        // Non-unique column lookup still works.
863        let by_value = store
864            .get_by_unique("value", "val-y")
865            .await
866            .unwrap()
867            .expect("must exist");
868        assert_eq!(by_value.id, "y");
869
870        // Missing value returns None, not an error.
871        let missing = store.get_by_unique("name", "nope").await.unwrap();
872        assert!(missing.is_none());
873
874        // Unknown column returns Other, not Database.
875        let err = store
876            .get_by_unique("not_a_column", "whatever")
877            .await
878            .expect_err("unknown column must fail");
879        match err {
880            StorageError::Other(msg) => {
881                assert!(msg.contains("not_a_column"));
882                assert!(msg.contains("test_records"));
883            }
884            other => panic!("expected Other, got {other:?}"),
885        }
886    }
887
888    #[tokio::test]
889    async fn upsert_preserves_created_at() {
890        let store = JsonStore::in_memory(indexed_table()).await.unwrap();
891        let rec = make("z", "z-name", "v1");
892        store.put(&rec.id, &rec).await.unwrap();
893
894        // Read back the raw created_at / updated_at columns so we can
895        // assert the adapter's timestamp policy (created_at is preserved
896        // across upserts, updated_at is refreshed).
897        let (created_before, updated_before): (String, String) =
898            sqlx::query_as("SELECT created_at, updated_at FROM test_records WHERE id = ?")
899                .bind("z")
900                .fetch_one(&store.pool)
901                .await
902                .unwrap();
903
904        // Tiny pause so the RFC-3339 strings differ.
905        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
906
907        let updated = make("z", "z-name", "v2");
908        store.put(&updated.id, &updated).await.unwrap();
909
910        let (created_after, updated_after): (String, String) =
911            sqlx::query_as("SELECT created_at, updated_at FROM test_records WHERE id = ?")
912                .bind("z")
913                .fetch_one(&store.pool)
914                .await
915                .unwrap();
916
917        assert_eq!(
918            created_before, created_after,
919            "created_at must be preserved across upsert"
920        );
921        assert_ne!(
922            updated_before, updated_after,
923            "updated_at must advance on upsert"
924        );
925
926        // And the blob really did update.
927        let got = store.get("z").await.unwrap().unwrap();
928        assert_eq!(got.value, "v2");
929    }
930
931    #[tokio::test]
932    async fn count_tracks_inserts_and_deletes() {
933        let store = JsonStore::in_memory(indexed_table()).await.unwrap();
934        assert_eq!(store.count().await.unwrap(), 0);
935
936        store.put("1", &make("1", "one", "v")).await.unwrap();
937        store.put("2", &make("2", "two", "v")).await.unwrap();
938        store.put("3", &make("3", "three", "v")).await.unwrap();
939        assert_eq!(store.count().await.unwrap(), 3);
940
941        // Upsert of an existing id must NOT bump the count.
942        store.put("1", &make("1", "one", "v2")).await.unwrap();
943        assert_eq!(store.count().await.unwrap(), 3);
944
945        assert!(store.delete("2").await.unwrap());
946        assert_eq!(store.count().await.unwrap(), 2);
947    }
948
949    #[tokio::test]
950    async fn list_returns_every_row() {
951        let store = JsonStore::in_memory(blob_only_table()).await.unwrap();
952        store.put("c", &make("c", "cc", "3")).await.unwrap();
953        store.put("a", &make("a", "aa", "1")).await.unwrap();
954        store.put("b", &make("b", "bb", "2")).await.unwrap();
955
956        let list = store.list().await.unwrap();
957        assert_eq!(list.len(), 3);
958        // Ordered by id ASC, as documented.
959        let ids: Vec<&str> = list.iter().map(|r| r.id.as_str()).collect();
960        assert_eq!(ids, vec!["a", "b", "c"]);
961    }
962
963    // =========================================================================
964    // Configurable primary-key column tests
965    // =========================================================================
966
967    /// Record stored under a non-`id` primary key — mirrors the `deployments`
968    /// table whose PK column is `name`.
969    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
970    struct NamedRecord {
971        name: String,
972        payload: String,
973    }
974
975    fn named_pk_table() -> JsonTable<NamedRecord> {
976        JsonTable {
977            name: "named_records",
978            primary_key: "name",
979            indexes: &[],
980            unique_constraints: &[],
981        }
982    }
983
984    #[tokio::test]
985    async fn custom_primary_key_round_trips() {
986        let store = JsonStore::in_memory(named_pk_table()).await.unwrap();
987
988        // The generated DDL must declare `name` (not `id`) as the PK column.
989        // SQLite has no `id` column on this table, so any query/DDL that still
990        // hardcoded `id` would fail outright here.
991        let pk_col: (String,) =
992            sqlx::query_as("SELECT name FROM pragma_table_info('named_records') WHERE pk = 1")
993                .fetch_one(&store.pool)
994                .await
995                .unwrap();
996        assert_eq!(pk_col.0, "name", "primary key column must be `name`");
997
998        let rec = NamedRecord {
999            name: "alpha".to_string(),
1000            payload: "v1".to_string(),
1001        };
1002        store.put(&rec.name, &rec).await.unwrap();
1003
1004        // get keyed by the custom PK
1005        let got = store.get("alpha").await.unwrap().expect("must exist");
1006        assert_eq!(got, rec);
1007
1008        // upsert by the same PK overwrites in place (ON CONFLICT(name))
1009        let updated = NamedRecord {
1010            name: "alpha".to_string(),
1011            payload: "v2".to_string(),
1012        };
1013        store.put(&updated.name, &updated).await.unwrap();
1014        assert_eq!(store.count().await.unwrap(), 1);
1015        assert_eq!(store.get("alpha").await.unwrap().unwrap().payload, "v2");
1016
1017        // list ordered by the custom PK ascending
1018        store
1019            .put(
1020                "beta",
1021                &NamedRecord {
1022                    name: "beta".to_string(),
1023                    payload: "vb".to_string(),
1024                },
1025            )
1026            .await
1027            .unwrap();
1028        let names: Vec<String> = store
1029            .list()
1030            .await
1031            .unwrap()
1032            .into_iter()
1033            .map(|r| r.name)
1034            .collect();
1035        assert_eq!(names, vec!["alpha".to_string(), "beta".to_string()]);
1036
1037        // delete keyed by the custom PK
1038        assert!(store.delete("alpha").await.unwrap());
1039        assert!(store.get("alpha").await.unwrap().is_none());
1040        assert!(!store.delete("alpha").await.unwrap());
1041    }
1042
1043    #[tokio::test]
1044    async fn list_lossy_skips_undeserializable_rows() {
1045        let store = JsonStore::in_memory(blob_only_table()).await.unwrap();
1046        store.put("a", &make("a", "aa", "1")).await.unwrap();
1047        store.put("c", &make("c", "cc", "3")).await.unwrap();
1048
1049        // Inject a row whose data_json can't decode into TestRecord.
1050        sqlx::query(
1051            "INSERT INTO test_blobs (id, data_json, created_at, updated_at) \
1052             VALUES (?, ?, ?, ?)",
1053        )
1054        .bind("b")
1055        .bind("{ not valid json for TestRecord")
1056        .bind("2026-01-01T00:00:00+00:00")
1057        .bind("2026-01-01T00:00:00+00:00")
1058        .execute(&store.pool)
1059        .await
1060        .unwrap();
1061
1062        // Strict list() errors on the poisoned row...
1063        assert!(store.list().await.is_err());
1064
1065        // ...while list_lossy() skips it and returns the two good rows,
1066        // ordered by the PK ascending.
1067        let good = store.list_lossy().await.unwrap();
1068        let ids: Vec<&str> = good.iter().map(|r| r.id.as_str()).collect();
1069        assert_eq!(ids, vec!["a", "c"]);
1070    }
1071
1072    // =========================================================================
1073    // Compound UNIQUE + list_where{,_null,_opt} tests
1074    // =========================================================================
1075
1076    #[tokio::test]
1077    async fn compound_unique_rejects_duplicate_pair() {
1078        let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1079        store
1080            .put("id-1", &make_scoped("id-1", "foo", Some("proj-a")))
1081            .await
1082            .unwrap();
1083
1084        // Same (name, scope) under a different id must fail with AlreadyExists.
1085        let err = store
1086            .put("id-2", &make_scoped("id-2", "foo", Some("proj-a")))
1087            .await
1088            .expect_err("compound unique violation must error");
1089        match err {
1090            StorageError::AlreadyExists(msg) => {
1091                assert!(
1092                    msg.contains("scoped_records"),
1093                    "message should mention the table name, got: {msg}"
1094                );
1095            }
1096            other => panic!("expected AlreadyExists, got {other:?}"),
1097        }
1098    }
1099
1100    #[tokio::test]
1101    async fn compound_unique_permits_distinct_combinations() {
1102        let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1103        // Same name in different scopes — allowed.
1104        store
1105            .put("id-1", &make_scoped("id-1", "foo", Some("proj-a")))
1106            .await
1107            .unwrap();
1108        store
1109            .put("id-2", &make_scoped("id-2", "foo", Some("proj-b")))
1110            .await
1111            .unwrap();
1112        // Different name in the same scope — allowed.
1113        store
1114            .put("id-3", &make_scoped("id-3", "bar", Some("proj-a")))
1115            .await
1116            .unwrap();
1117
1118        assert_eq!(store.count().await.unwrap(), 3);
1119    }
1120
1121    #[tokio::test]
1122    async fn list_where_filters_by_column_value() {
1123        let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1124        store
1125            .put("a", &make_scoped("a", "x", Some("s1")))
1126            .await
1127            .unwrap();
1128        store
1129            .put("b", &make_scoped("b", "y", Some("s1")))
1130            .await
1131            .unwrap();
1132        store
1133            .put("c", &make_scoped("c", "z", Some("s2")))
1134            .await
1135            .unwrap();
1136
1137        let s1 = store.list_where("scope", "s1").await.unwrap();
1138        assert_eq!(s1.len(), 2);
1139        let ids: Vec<&str> = s1.iter().map(|r| r.id.as_str()).collect();
1140        // Ordered by id ASC.
1141        assert_eq!(ids, vec!["a", "b"]);
1142
1143        let s2 = store.list_where("scope", "s2").await.unwrap();
1144        assert_eq!(s2.len(), 1);
1145        assert_eq!(s2[0].id, "c");
1146
1147        let missing = store.list_where("scope", "s3").await.unwrap();
1148        assert!(missing.is_empty());
1149    }
1150
1151    #[tokio::test]
1152    async fn list_where_null_filters_null_rows() {
1153        let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1154        store.put("a", &make_scoped("a", "x", None)).await.unwrap();
1155        store
1156            .put("b", &make_scoped("b", "y", Some("s1")))
1157            .await
1158            .unwrap();
1159        store.put("c", &make_scoped("c", "z", None)).await.unwrap();
1160
1161        let nulls = store.list_where_null("scope").await.unwrap();
1162        assert_eq!(nulls.len(), 2);
1163        let ids: Vec<&str> = nulls.iter().map(|r| r.id.as_str()).collect();
1164        assert_eq!(ids, vec!["a", "c"]);
1165    }
1166
1167    #[tokio::test]
1168    async fn list_where_opt_unknown_column_returns_other() {
1169        let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1170        let err = store
1171            .list_where_opt("not_a_column", Some("x"))
1172            .await
1173            .expect_err("unknown column must fail");
1174        match err {
1175            StorageError::Other(msg) => {
1176                assert!(msg.contains("not_a_column"));
1177                assert!(msg.contains("scoped_records"));
1178            }
1179            other => panic!("expected Other, got {other:?}"),
1180        }
1181
1182        let err2 = store
1183            .list_where_opt("not_a_column", None)
1184            .await
1185            .expect_err("unknown column must fail");
1186        match err2 {
1187            StorageError::Other(_) => {}
1188            other => panic!("expected Other, got {other:?}"),
1189        }
1190    }
1191}