zlayer_store/backend.rs
1//! Generic `SQLite`/`sqlx` adapter for blob-shaped stores.
2//!
3//! `JsonStore<T>` is the persistence engine shared by the "blob + optional
4//! unique indexes" style of `ZLayer` resource — notifiers, variables, tasks,
5//! workflows, user groups, and syncs. A record is stored as an opaque
6//! `serde_json`-serialised blob in a `data_json` column; zero or more columns
7//! are peeled out via caller-supplied extractor functions so that unique
8//! constraints and secondary lookups remain first-class SQL rather than
9//! whole-table scans.
10//!
11//! # Design
12//!
13//! The caller owns the record type `T`. `T` must be `Serialize +
14//! DeserializeOwned + Send + Sync + 'static`. The caller describes how `T`
15//! maps onto the table via [`JsonTable`]:
16//!
17//! ```ignore
18//! use zlayer_store::{IndexSpec, JsonTable, JsonStore};
19//!
20//! # #[derive(serde::Serialize, serde::Deserialize)]
21//! # struct MyRecord { id: String, name: String, scope: Option<String> }
22//! let table = JsonTable::<MyRecord> {
23//! name: "my_records",
24//! primary_key: "id",
25//! indexes: &[
26//! IndexSpec { column: "name", extractor: |r| Some(r.name.clone()), unique: true },
27//! IndexSpec { column: "scope", extractor: |r| r.scope.clone(), unique: false },
28//! ],
29//! unique_constraints: &[],
30//! };
31//! # async fn _example() -> Result<(), zlayer_store::StorageError> {
32//! let store: JsonStore<MyRecord> = JsonStore::in_memory(table).await?;
33//! # Ok(()) }
34//! ```
35//!
36//! The generated schema is:
37//!
38//! ```sql
39//! CREATE TABLE IF NOT EXISTS my_records (
40//! id TEXT PRIMARY KEY NOT NULL,
41//! name TEXT,
42//! scope TEXT,
43//! data_json TEXT NOT NULL,
44//! created_at TEXT NOT NULL,
45//! updated_at TEXT NOT NULL,
46//! UNIQUE(name)
47//! )
48//! CREATE INDEX IF NOT EXISTS idx_my_records_scope ON my_records(scope)
49//! ```
50//!
51//! A table with an empty `indexes` slice produces the minimal 5-column shape
52//! with no `UNIQUE` clause — that is what the notifier store uses.
53//!
54//! # Compound uniqueness
55//!
56//! For resources that need uniqueness across a combination of columns (e.g.
57//! variables with `UNIQUE(name, scope)`), populate
58//! [`JsonTable::unique_constraints`] with static slices of column names. Each
59//! inner slice becomes one `UNIQUE(col_a, col_b, ...)` clause in the generated
60//! DDL. The columns referenced must also appear in [`JsonTable::indexes`] so
61//! that they are peeled out at write time; otherwise `SQLite` has no column to
62//! enforce the constraint against.
63//!
64//! Note: `SQLite` treats `NULL`s as distinct in `UNIQUE`, so a compound unique
65//! over `(name, scope)` only enforces uniqueness among rows where every
66//! referenced column is non-`NULL`. Rows where any referenced column is `NULL`
67//! — for example a global variable with `scope = None` — are enforced in
68//! application code via a pre-write `get_by_*` check (see
69//! `InMemoryVariableStore::store` and `SqlxVariableStore::store` for the
70//! pattern).
71//!
72//! # Operations
73//!
74//! - [`put`](JsonStore::put) — upsert by `id`, preserving `created_at` on
75//! conflict.
76//! - [`get`](JsonStore::get), [`list`](JsonStore::list),
77//! [`delete`](JsonStore::delete), [`count`](JsonStore::count) — the
78//! usual CRUD primitives.
79//! - [`get_by_unique`](JsonStore::get_by_unique) — fetch by any indexed
80//! column (not just unique ones).
81//!
82//! Unique-constraint violations surface as [`StorageError::AlreadyExists`] —
83//! not as a raw `Database(...)` string — so callers can distinguish 409-class
84//! errors cleanly.
85//!
86//! # Identifier safety
87//!
88//! `JsonTable::name` and `IndexSpec::column` are `&'static str`, so they come
89//! from compile-time string literals at every call site. The schema DDL is
90//! assembled via `format!` — that is safe for `&'static str` table/column
91//! identifiers and would be a disaster for user-supplied strings. Do not
92//! pass run-time data to either field.
93
94use std::marker::PhantomData;
95use std::path::Path;
96
97use serde::de::DeserializeOwned;
98use serde::Serialize;
99use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
100use sqlx::Row;
101
102use crate::StorageError;
103
104/// Specification for a single secondary column peeled out of `T` at write
105/// time. All peeled columns are typed `TEXT` in `SQLite`; `None` from the
106/// extractor becomes SQL `NULL`.
107pub struct IndexSpec<T> {
108 /// Column name. Must be a valid SQL identifier (a-z, A-Z, 0-9, `_`) and
109 /// is splat directly into the generated DDL, so this MUST be a
110 /// compile-time string literal.
111 pub column: &'static str,
112
113 /// Extractor pulling the column value out of `T`. `None` is written as
114 /// SQL `NULL`.
115 pub extractor: fn(&T) -> Option<String>,
116
117 /// If `true`, `column` is included in a table-level `UNIQUE(...)`
118 /// constraint. `SQLite` treats `NULL`s as distinct in `UNIQUE`, so
119 /// nullable unique columns only enforce uniqueness among non-`NULL`
120 /// values. Callers needing "unique including `NULL`" must enforce it in
121 /// application code (see `environments.rs` for the pattern).
122 pub unique: bool,
123}
124
125/// Full table specification: name + the ordered list of indexed columns.
126///
127/// Shared across open/reopen so a downstream shim can keep the same layout
128/// forever without duplicating strings.
129///
130/// The `'static` bound on `T` is required because `indexes` is a
131/// `&'static [IndexSpec<T>]` — the compiler needs to know any `T` referenced
132/// by the static slice is itself valid for `'static`. Every concrete record
133/// type we store (e.g. `StoredNotifier`) owns its data, so this bound is
134/// always satisfied in practice.
135pub struct JsonTable<T: 'static> {
136 /// SQL table name. Must be a valid SQL identifier; splat directly into
137 /// DDL, so this MUST be a compile-time string literal.
138 pub name: &'static str,
139
140 /// Name of the primary-key column. Defaults to `"id"` for every store
141 /// created fresh by this codebase. It exists as a knob so a pre-existing
142 /// on-disk schema whose primary key is named something else can be routed
143 /// through this adapter without a destructive migration — the
144 /// `deployments` table, for instance, has had `name TEXT PRIMARY KEY`
145 /// since before the adapter existed, so it sets `primary_key: "name"`.
146 /// Like every other identifier on this struct it is splat directly into
147 /// the DDL/queries, so it MUST be a compile-time string literal.
148 pub primary_key: &'static str,
149
150 /// Indexed columns. May be empty for pure blob-only tables.
151 pub indexes: &'static [IndexSpec<T>],
152
153 /// Compound `UNIQUE(col_a, col_b, ...)` constraints emitted at
154 /// `CREATE TABLE` time. Each inner slice becomes one `UNIQUE(...)` clause
155 /// in the generated DDL. For single-column uniqueness prefer
156 /// `IndexSpec::unique = true` — it's flatter. Use this field when
157 /// uniqueness spans two or more columns (e.g. `UNIQUE(name, scope)`).
158 ///
159 /// Every column name referenced here MUST also appear in `indexes` so
160 /// that it's peeled out of `T` at write time; otherwise the `CREATE TABLE`
161 /// statement will reference a column that doesn't exist. The column names
162 /// are splat directly into the DDL, so they MUST be compile-time string
163 /// literals. May be empty when no compound constraints are needed.
164 pub unique_constraints: &'static [&'static [&'static str]],
165}
166
167/// Generic blob store keyed by a string `id`.
168///
169/// This is the `SQLite`-backed implementation of the adapter. The type name
170/// is engine-neutral on purpose: the `ZLayerZQL` mirror overrides this entire
171/// `backend` module with a ZQL-backed `JsonStore` exposing an identical API,
172/// so the per-resource storage files that hold a `JsonStore<T>` regenerate
173/// mechanically with no hand edits.
174///
175/// Use via a narrow wrapper type in each storage module (see
176/// `notifiers.rs::SqlxNotifierStore` for the pattern).
177pub struct JsonStore<T>
178where
179 T: Serialize + DeserializeOwned + Send + Sync + 'static,
180{
181 pool: SqlitePool,
182 table: JsonTable<T>,
183 _marker: PhantomData<fn() -> T>,
184}
185
186impl<T> JsonStore<T>
187where
188 T: Serialize + DeserializeOwned + Send + Sync + 'static,
189{
190 /// Open or create a `SQLite` database at the given path and ensure the
191 /// table + indexes exist.
192 ///
193 /// # Errors
194 ///
195 /// Returns [`StorageError`] if the database cannot be opened or if the
196 /// schema cannot be created.
197 pub async fn open<P: AsRef<Path>>(path: P, table: JsonTable<T>) -> Result<Self, StorageError> {
198 let path_str = path.as_ref().display().to_string();
199 let connection_string = format!("sqlite:{path_str}?mode=rwc");
200
201 let pool = SqlitePoolOptions::new()
202 .max_connections(5)
203 .connect(&connection_string)
204 .await?;
205
206 sqlx::query("PRAGMA journal_mode=WAL")
207 .execute(&pool)
208 .await?;
209 sqlx::query("PRAGMA busy_timeout=5000")
210 .execute(&pool)
211 .await?;
212
213 let this = Self {
214 pool,
215 table,
216 _marker: PhantomData,
217 };
218 this.init_schema().await?;
219 Ok(this)
220 }
221
222 /// Create an in-memory `SQLite` database (useful for tests) with the
223 /// table + indexes in place.
224 ///
225 /// # Errors
226 ///
227 /// Returns [`StorageError`] if the in-memory database cannot be created.
228 pub async fn in_memory(table: JsonTable<T>) -> Result<Self, StorageError> {
229 let pool = SqlitePool::connect(":memory:").await?;
230 let this = Self {
231 pool,
232 table,
233 _marker: PhantomData,
234 };
235 this.init_schema().await?;
236 Ok(this)
237 }
238
239 /// Create the table + per-index secondary indexes if they don't exist.
240 async fn init_schema(&self) -> Result<(), StorageError> {
241 let table_name = self.table.name;
242
243 // Build the CREATE TABLE statement. Every peeled column is `TEXT`
244 // nullable; the blob is `data_json TEXT NOT NULL`; the timestamps
245 // are ISO-8601 RFC-3339 strings.
246 let mut ddl = String::with_capacity(256);
247 ddl.push_str("CREATE TABLE IF NOT EXISTS ");
248 ddl.push_str(table_name);
249 ddl.push_str(" (\n ");
250 ddl.push_str(self.table.primary_key);
251 ddl.push_str(" TEXT PRIMARY KEY NOT NULL");
252
253 for idx in self.table.indexes {
254 ddl.push_str(",\n ");
255 ddl.push_str(idx.column);
256 ddl.push_str(" TEXT");
257 }
258
259 ddl.push_str(",\n data_json TEXT NOT NULL");
260 ddl.push_str(",\n created_at TEXT NOT NULL");
261 ddl.push_str(",\n updated_at TEXT NOT NULL");
262
263 // Compound UNIQUE clause across all `unique: true` columns, matching
264 // the `users.rs` / `environments.rs` shape.
265 let unique_cols: Vec<&'static str> = self
266 .table
267 .indexes
268 .iter()
269 .filter(|i| i.unique)
270 .map(|i| i.column)
271 .collect();
272 if !unique_cols.is_empty() {
273 ddl.push_str(",\n UNIQUE(");
274 for (i, col) in unique_cols.iter().enumerate() {
275 if i > 0 {
276 ddl.push_str(", ");
277 }
278 ddl.push_str(col);
279 }
280 ddl.push(')');
281 }
282
283 // Extra compound UNIQUE constraints — one clause per inner slice.
284 // Every referenced column must exist in `indexes` so it's actually
285 // peeled out of `T` at write time.
286 let indexed_columns: std::collections::HashSet<&'static str> =
287 self.table.indexes.iter().map(|i| i.column).collect();
288 for constraint in self.table.unique_constraints {
289 if constraint.is_empty() {
290 continue;
291 }
292 for col in *constraint {
293 debug_assert!(
294 indexed_columns.contains(col),
295 "unique_constraints references column '{col}' on table \
296 '{table_name}' that is not declared in JsonTable::indexes"
297 );
298 }
299 ddl.push_str(",\n UNIQUE(");
300 for (i, col) in constraint.iter().enumerate() {
301 if i > 0 {
302 ddl.push_str(", ");
303 }
304 ddl.push_str(col);
305 }
306 ddl.push(')');
307 }
308 ddl.push_str("\n)");
309
310 sqlx::query(&ddl).execute(&self.pool).await?;
311
312 // Secondary indexes on every peeled column (including unique ones)
313 // for fast `get_by_unique` lookup.
314 for idx in self.table.indexes {
315 let idx_ddl = format!(
316 "CREATE INDEX IF NOT EXISTS idx_{table}_{col} ON {table}({col})",
317 table = table_name,
318 col = idx.column,
319 );
320 sqlx::query(&idx_ddl).execute(&self.pool).await?;
321 }
322
323 Ok(())
324 }
325
326 /// Upsert a record by its primary key `id`.
327 ///
328 /// - If `id` is new, a row is inserted with `created_at = updated_at =
329 /// now` — both timestamps are taken from `now()` at write time, NOT
330 /// from any field on `T`, because the adapter does not know which
331 /// field (if any) of `T` holds a timestamp.
332 /// - If `id` already exists, every column is refreshed from the new
333 /// blob except `created_at`, which is preserved.
334 ///
335 /// # Errors
336 ///
337 /// - [`StorageError::AlreadyExists`] if the write violates a `UNIQUE`
338 /// constraint on one of the peeled columns.
339 /// - [`StorageError::Serialization`] if `T` cannot be encoded to JSON.
340 /// - [`StorageError::Database`] for any other `sqlx` error.
341 pub async fn put(&self, id: &str, value: &T) -> Result<(), StorageError> {
342 let data_json = serde_json::to_string(value)?;
343 let now = chrono::Utc::now().to_rfc3339();
344 let table_name = self.table.name;
345
346 // Build column and placeholder lists. `?` for every value; `sqlx`
347 // does the binding so there's no injection risk from the blob or
348 // extracted columns.
349 let mut columns: Vec<&str> = Vec::with_capacity(4 + self.table.indexes.len());
350 columns.push(self.table.primary_key);
351 for idx in self.table.indexes {
352 columns.push(idx.column);
353 }
354 columns.push("data_json");
355 columns.push("created_at");
356 columns.push("updated_at");
357
358 let placeholders = (0..columns.len())
359 .map(|_| "?")
360 .collect::<Vec<_>>()
361 .join(", ");
362
363 // ON CONFLICT(<pk>) update clause — everything except `created_at`.
364 let update_assignments = {
365 let mut parts: Vec<String> = Vec::new();
366 for idx in self.table.indexes {
367 parts.push(format!("{col} = excluded.{col}", col = idx.column));
368 }
369 parts.push("data_json = excluded.data_json".to_string());
370 parts.push("updated_at = excluded.updated_at".to_string());
371 parts.join(", ")
372 };
373
374 let sql = format!(
375 "INSERT INTO {table} ({cols}) VALUES ({placeholders}) \
376 ON CONFLICT({pk}) DO UPDATE SET {updates}",
377 table = table_name,
378 cols = columns.join(", "),
379 placeholders = placeholders,
380 pk = self.table.primary_key,
381 updates = update_assignments,
382 );
383
384 let mut query = sqlx::query(&sql).bind(id);
385 for idx in self.table.indexes {
386 query = query.bind((idx.extractor)(value));
387 }
388 query = query.bind(&data_json).bind(&now).bind(&now);
389
390 match query.execute(&self.pool).await {
391 Ok(_) => Ok(()),
392 Err(err) => Err(map_sqlx_err(err, self.table.name)),
393 }
394 }
395
396 /// Fetch a record by primary key.
397 ///
398 /// # Errors
399 ///
400 /// - [`StorageError::Database`] on `sqlx` failure.
401 /// - [`StorageError::Serialization`] if the blob cannot be decoded.
402 pub async fn get(&self, id: &str) -> Result<Option<T>, StorageError> {
403 let sql = format!(
404 "SELECT data_json FROM {table} WHERE {pk} = ?",
405 table = self.table.name,
406 pk = self.table.primary_key,
407 );
408
409 let row: Option<(String,)> = sqlx::query_as(&sql)
410 .bind(id)
411 .fetch_optional(&self.pool)
412 .await?;
413
414 match row {
415 Some((data_json,)) => {
416 let value: T = serde_json::from_str(&data_json)?;
417 Ok(Some(value))
418 }
419 None => Ok(None),
420 }
421 }
422
423 /// List every record in the table, ordered by the primary-key column
424 /// (`primary_key`, default `"id"`) ascending.
425 ///
426 /// Most callers want a domain-specific ordering (e.g. notifier name),
427 /// which they can do by sorting in memory after `list()` returns. Doing
428 /// the sort here would require knowing which field of `T` to order by,
429 /// which the adapter cannot know generically.
430 ///
431 /// # Errors
432 ///
433 /// - [`StorageError::Database`] on `sqlx` failure.
434 /// - [`StorageError::Serialization`] if any blob cannot be decoded.
435 pub async fn list(&self) -> Result<Vec<T>, StorageError> {
436 let sql = format!(
437 "SELECT data_json FROM {table} ORDER BY {pk} ASC",
438 table = self.table.name,
439 pk = self.table.primary_key,
440 );
441
442 let rows: Vec<(String,)> = sqlx::query_as(&sql).fetch_all(&self.pool).await?;
443
444 let mut out = Vec::with_capacity(rows.len());
445 for (data_json,) in rows {
446 let value: T = serde_json::from_str(&data_json)?;
447 out.push(value);
448 }
449 Ok(out)
450 }
451
452 /// List every record in the table ordered by the primary-key column
453 /// ascending, **tolerating** rows whose `data_json` blob fails to
454 /// deserialize: each such row is logged at `error` level (with its
455 /// primary-key value) and skipped rather than failing the whole call.
456 ///
457 /// This is the lossy counterpart to [`list`](Self::list). Use it for
458 /// stores where a single corrupt/legacy row must never take down the
459 /// entire listing — the `deployments` store relies on this so a poisoned
460 /// record can't 500 every `list_deployments` call and survive restart.
461 ///
462 /// Because the blob can't be decoded into `T` for a skipped row, the
463 /// primary-key column is selected alongside `data_json` purely for the
464 /// log line. The PK is always a `TEXT` column, so it decodes as a string.
465 ///
466 /// # Errors
467 ///
468 /// Returns [`StorageError::Database`] on `sqlx` failure. Per-row
469 /// deserialize failures are skipped, not returned.
470 pub async fn list_lossy(&self) -> Result<Vec<T>, StorageError> {
471 let sql = format!(
472 "SELECT {pk}, data_json FROM {table} ORDER BY {pk} ASC",
473 table = self.table.name,
474 pk = self.table.primary_key,
475 );
476
477 let rows: Vec<(String, String)> = sqlx::query_as(&sql).fetch_all(&self.pool).await?;
478
479 let mut out = Vec::with_capacity(rows.len());
480 for (key, data_json) in rows {
481 match serde_json::from_str::<T>(&data_json) {
482 Ok(value) => out.push(value),
483 Err(e) => {
484 tracing::error!(
485 table = %self.table.name,
486 key = %key,
487 error = %e,
488 "skipping un-deserializable row in list_lossy()"
489 );
490 }
491 }
492 }
493 Ok(out)
494 }
495
496 /// Delete a record by primary key.
497 ///
498 /// # Errors
499 ///
500 /// Returns [`StorageError::Database`] on `sqlx` failure.
501 pub async fn delete(&self, id: &str) -> Result<bool, StorageError> {
502 let sql = format!(
503 "DELETE FROM {table} WHERE {pk} = ?",
504 table = self.table.name,
505 pk = self.table.primary_key,
506 );
507 let result = sqlx::query(&sql).bind(id).execute(&self.pool).await?;
508 Ok(result.rows_affected() > 0)
509 }
510
511 /// Fetch a record by one of its peeled secondary columns.
512 ///
513 /// The column must appear in the `JsonTable::indexes` list. Works for
514 /// any indexed column — unique or non-unique. When the column is
515 /// non-unique, the first matching row (by `rowid`) is returned.
516 ///
517 /// # Errors
518 ///
519 /// - [`StorageError::Other`] if `column` does not correspond to any
520 /// declared index.
521 /// - [`StorageError::Database`] on `sqlx` failure.
522 /// - [`StorageError::Serialization`] if the blob cannot be decoded.
523 pub async fn get_by_unique(
524 &self,
525 column: &str,
526 value: &str,
527 ) -> Result<Option<T>, StorageError> {
528 // Validate the column against the static index list — this is what
529 // prevents SQL injection via `column`.
530 let found = self.table.indexes.iter().any(|i| i.column == column);
531 if !found {
532 return Err(StorageError::Other(format!(
533 "unknown column '{column}' for table '{table}'",
534 table = self.table.name
535 )));
536 }
537
538 let sql = format!(
539 "SELECT data_json FROM {table} WHERE {col} = ? LIMIT 1",
540 table = self.table.name,
541 col = column,
542 );
543
544 let row: Option<(String,)> = sqlx::query_as(&sql)
545 .bind(value)
546 .fetch_optional(&self.pool)
547 .await?;
548
549 match row {
550 Some((data_json,)) => {
551 let value: T = serde_json::from_str(&data_json)?;
552 Ok(Some(value))
553 }
554 None => Ok(None),
555 }
556 }
557
558 /// List every record where a peeled column matches a specific value.
559 /// Rows are ordered by `id` ascending, matching [`list`](Self::list).
560 ///
561 /// Use [`list_where_opt`](Self::list_where_opt) if you need to filter
562 /// on `NULL` — passing `""` to this function binds the empty string, not
563 /// SQL `NULL`.
564 ///
565 /// # Errors
566 ///
567 /// - [`StorageError::Other`] if `column` is not declared in the table's
568 /// index list.
569 /// - [`StorageError::Database`] on `sqlx` failure.
570 /// - [`StorageError::Serialization`] if any blob cannot be decoded.
571 pub async fn list_where(&self, column: &str, value: &str) -> Result<Vec<T>, StorageError> {
572 self.list_where_opt(column, Some(value)).await
573 }
574
575 /// List every record where a peeled column is SQL `NULL`. Rows are
576 /// ordered by `id` ascending.
577 ///
578 /// # Errors
579 ///
580 /// - [`StorageError::Other`] if `column` is not declared in the table's
581 /// index list.
582 /// - [`StorageError::Database`] on `sqlx` failure.
583 /// - [`StorageError::Serialization`] if any blob cannot be decoded.
584 pub async fn list_where_null(&self, column: &str) -> Result<Vec<T>, StorageError> {
585 self.list_where_opt(column, None).await
586 }
587
588 /// List every record filtered by a peeled column. `Some(v)` matches
589 /// `column = ?` with `v` bound; `None` matches `column IS NULL`. Rows are
590 /// ordered by `id` ascending.
591 ///
592 /// This is the unified primitive used by both [`list_where`](Self::list_where)
593 /// and [`list_where_null`](Self::list_where_null).
594 ///
595 /// # Errors
596 ///
597 /// - [`StorageError::Other`] if `column` is not declared in the table's
598 /// index list.
599 /// - [`StorageError::Database`] on `sqlx` failure.
600 /// - [`StorageError::Serialization`] if any blob cannot be decoded.
601 pub async fn list_where_opt(
602 &self,
603 column: &str,
604 value: Option<&str>,
605 ) -> Result<Vec<T>, StorageError> {
606 // Validate the column against the static index list — this is what
607 // prevents SQL injection via `column`.
608 let found = self.table.indexes.iter().any(|i| i.column == column);
609 if !found {
610 return Err(StorageError::Other(format!(
611 "unknown column '{column}' for table '{table}'",
612 table = self.table.name
613 )));
614 }
615
616 let rows: Vec<(String,)> = if let Some(v) = value {
617 let sql = format!(
618 "SELECT data_json FROM {table} WHERE {col} = ? ORDER BY {pk} ASC",
619 table = self.table.name,
620 col = column,
621 pk = self.table.primary_key,
622 );
623 sqlx::query_as(&sql).bind(v).fetch_all(&self.pool).await?
624 } else {
625 let sql = format!(
626 "SELECT data_json FROM {table} WHERE {col} IS NULL ORDER BY {pk} ASC",
627 table = self.table.name,
628 col = column,
629 pk = self.table.primary_key,
630 );
631 sqlx::query_as(&sql).fetch_all(&self.pool).await?
632 };
633
634 let mut out = Vec::with_capacity(rows.len());
635 for (data_json,) in rows {
636 let decoded: T = serde_json::from_str(&data_json)?;
637 out.push(decoded);
638 }
639 Ok(out)
640 }
641
642 /// Return a reference to the underlying `SqlitePool`.
643 ///
644 /// Exposed `pub` as a temporary migration bridge: sibling stores still
645 /// living in `crates/zlayer-api/src/storage/` (tasks, workflows, groups)
646 /// run auxiliary queries — creating companion tables (e.g. `task_runs`,
647 /// `group_members`), doing cross-table transactions during cascade
648 /// deletes — against the same pool without opening a second connection.
649 /// It was `pub(crate)` while `JsonStore` lived inside `zlayer-api`;
650 /// the extraction into `zlayer-store` requires `pub` so those
651 /// cross-crate callers keep compiling. This is NOT a stable public API:
652 /// once the companion-table stores move into `zlayer-store` too, this
653 /// should return to a narrower visibility.
654 #[must_use]
655 pub fn pool(&self) -> &SqlitePool {
656 &self.pool
657 }
658
659 /// Return the total number of rows in the table.
660 ///
661 /// # Errors
662 ///
663 /// Returns [`StorageError::Database`] on `sqlx` failure.
664 pub async fn count(&self) -> Result<u64, StorageError> {
665 let sql = format!("SELECT COUNT(*) FROM {table}", table = self.table.name);
666 let row = sqlx::query(&sql).fetch_one(&self.pool).await?;
667 let raw: i64 = row.try_get(0)?;
668 Ok(u64::try_from(raw).unwrap_or(0))
669 }
670}
671
672/// Map a raw `sqlx::Error` into a `StorageError`, surfacing `SQLite` 2067
673/// (`SQLITE_CONSTRAINT_UNIQUE`) / 1555 (`SQLITE_CONSTRAINT_PRIMARYKEY`) as a
674/// dedicated `AlreadyExists` variant and everything else as a generic
675/// `Database`.
676fn map_sqlx_err(err: sqlx::Error, table: &str) -> StorageError {
677 if let sqlx::Error::Database(db) = &err {
678 // SQLite error codes are returned as strings by `sqlx`. 2067 is the
679 // extended code for UNIQUE; 1555 is the extended code for PRIMARY
680 // KEY. Both are constraint violations that a caller typically wants
681 // to render as HTTP 409.
682 if let Some(code) = db.code() {
683 if code == "2067" || code == "1555" {
684 return StorageError::AlreadyExists(format!(
685 "UNIQUE constraint failed on table '{table}': {db}"
686 ));
687 }
688 }
689 // Some SQLite builds don't populate extended codes; fall back to
690 // substring matching on the message for those cases.
691 let msg = db.message();
692 if msg.contains("UNIQUE constraint failed") {
693 return StorageError::AlreadyExists(format!(
694 "UNIQUE constraint failed on table '{table}': {msg}"
695 ));
696 }
697 }
698 StorageError::from(err)
699}
700
701#[cfg(test)]
702mod tests {
703 use super::*;
704 use serde::Deserialize;
705
706 /// Minimal record type exercising both a unique and a non-unique indexed
707 /// column plus the blob round-trip.
708 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
709 struct TestRecord {
710 id: String,
711 name: String,
712 value: String,
713 }
714
715 /// Table spec with a unique `name` column and a non-unique `value`
716 /// column — covers both branches of the index machinery.
717 fn indexed_table() -> JsonTable<TestRecord> {
718 static INDEXES: &[IndexSpec<TestRecord>] = &[
719 IndexSpec {
720 column: "name",
721 extractor: |r| Some(r.name.clone()),
722 unique: true,
723 },
724 IndexSpec {
725 column: "value",
726 extractor: |r| Some(r.value.clone()),
727 unique: false,
728 },
729 ];
730 JsonTable {
731 name: "test_records",
732 primary_key: "id",
733 indexes: INDEXES,
734 unique_constraints: &[],
735 }
736 }
737
738 /// Table spec with no secondary columns — matches the notifier shape.
739 fn blob_only_table() -> JsonTable<TestRecord> {
740 JsonTable {
741 name: "test_blobs",
742 primary_key: "id",
743 indexes: &[],
744 unique_constraints: &[],
745 }
746 }
747
748 /// Record with two peeled columns exercising a compound UNIQUE(name,
749 /// scope) constraint — the shape used by the variable store.
750 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
751 struct ScopedRecord {
752 id: String,
753 name: String,
754 scope: Option<String>,
755 }
756
757 fn scoped_table() -> JsonTable<ScopedRecord> {
758 static INDEXES: &[IndexSpec<ScopedRecord>] = &[
759 IndexSpec {
760 column: "name",
761 extractor: |r| Some(r.name.clone()),
762 unique: false,
763 },
764 IndexSpec {
765 column: "scope",
766 extractor: |r| r.scope.clone(),
767 unique: false,
768 },
769 ];
770 static UNIQUES: &[&[&str]] = &[&["name", "scope"]];
771 JsonTable {
772 name: "scoped_records",
773 primary_key: "id",
774 indexes: INDEXES,
775 unique_constraints: UNIQUES,
776 }
777 }
778
779 fn make_scoped(id: &str, name: &str, scope: Option<&str>) -> ScopedRecord {
780 ScopedRecord {
781 id: id.to_string(),
782 name: name.to_string(),
783 scope: scope.map(str::to_string),
784 }
785 }
786
787 fn make(id: &str, name: &str, value: &str) -> TestRecord {
788 TestRecord {
789 id: id.to_string(),
790 name: name.to_string(),
791 value: value.to_string(),
792 }
793 }
794
795 #[tokio::test]
796 async fn in_memory_round_trip_with_indexes() {
797 let store = JsonStore::in_memory(indexed_table()).await.unwrap();
798 let rec = make("id-1", "alpha", "v1");
799
800 store.put(&rec.id, &rec).await.unwrap();
801
802 let got = store.get("id-1").await.unwrap().expect("must exist");
803 assert_eq!(got, rec);
804
805 let list = store.list().await.unwrap();
806 assert_eq!(list.len(), 1);
807 assert_eq!(list[0], rec);
808
809 assert!(store.delete("id-1").await.unwrap());
810 assert!(store.get("id-1").await.unwrap().is_none());
811 assert!(!store.delete("id-1").await.unwrap());
812 }
813
814 #[tokio::test]
815 async fn in_memory_round_trip_blob_only() {
816 let store = JsonStore::in_memory(blob_only_table()).await.unwrap();
817 let rec = make("b-1", "bob", "42");
818
819 store.put(&rec.id, &rec).await.unwrap();
820 let got = store.get("b-1").await.unwrap().expect("must exist");
821 assert_eq!(got, rec);
822
823 let list = store.list().await.unwrap();
824 assert_eq!(list, vec![rec]);
825 }
826
827 #[tokio::test]
828 async fn unique_conflict_surfaces_as_already_exists() {
829 let store = JsonStore::in_memory(indexed_table()).await.unwrap();
830 store.put("a", &make("a", "dup", "v1")).await.unwrap();
831
832 let err = store
833 .put("b", &make("b", "dup", "v2"))
834 .await
835 .expect_err("unique violation must error");
836
837 match err {
838 StorageError::AlreadyExists(msg) => {
839 assert!(
840 msg.contains("test_records"),
841 "message should mention the table name, got: {msg}"
842 );
843 }
844 other => panic!("expected AlreadyExists, got {other:?}"),
845 }
846 }
847
848 #[tokio::test]
849 async fn get_by_unique_success_and_unknown_column() {
850 let store = JsonStore::in_memory(indexed_table()).await.unwrap();
851 store.put("x", &make("x", "name-x", "val-x")).await.unwrap();
852 store.put("y", &make("y", "name-y", "val-y")).await.unwrap();
853
854 // Unique column lookup.
855 let found = store
856 .get_by_unique("name", "name-x")
857 .await
858 .unwrap()
859 .expect("must exist");
860 assert_eq!(found.id, "x");
861
862 // Non-unique column lookup still works.
863 let by_value = store
864 .get_by_unique("value", "val-y")
865 .await
866 .unwrap()
867 .expect("must exist");
868 assert_eq!(by_value.id, "y");
869
870 // Missing value returns None, not an error.
871 let missing = store.get_by_unique("name", "nope").await.unwrap();
872 assert!(missing.is_none());
873
874 // Unknown column returns Other, not Database.
875 let err = store
876 .get_by_unique("not_a_column", "whatever")
877 .await
878 .expect_err("unknown column must fail");
879 match err {
880 StorageError::Other(msg) => {
881 assert!(msg.contains("not_a_column"));
882 assert!(msg.contains("test_records"));
883 }
884 other => panic!("expected Other, got {other:?}"),
885 }
886 }
887
888 #[tokio::test]
889 async fn upsert_preserves_created_at() {
890 let store = JsonStore::in_memory(indexed_table()).await.unwrap();
891 let rec = make("z", "z-name", "v1");
892 store.put(&rec.id, &rec).await.unwrap();
893
894 // Read back the raw created_at / updated_at columns so we can
895 // assert the adapter's timestamp policy (created_at is preserved
896 // across upserts, updated_at is refreshed).
897 let (created_before, updated_before): (String, String) =
898 sqlx::query_as("SELECT created_at, updated_at FROM test_records WHERE id = ?")
899 .bind("z")
900 .fetch_one(&store.pool)
901 .await
902 .unwrap();
903
904 // Tiny pause so the RFC-3339 strings differ.
905 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
906
907 let updated = make("z", "z-name", "v2");
908 store.put(&updated.id, &updated).await.unwrap();
909
910 let (created_after, updated_after): (String, String) =
911 sqlx::query_as("SELECT created_at, updated_at FROM test_records WHERE id = ?")
912 .bind("z")
913 .fetch_one(&store.pool)
914 .await
915 .unwrap();
916
917 assert_eq!(
918 created_before, created_after,
919 "created_at must be preserved across upsert"
920 );
921 assert_ne!(
922 updated_before, updated_after,
923 "updated_at must advance on upsert"
924 );
925
926 // And the blob really did update.
927 let got = store.get("z").await.unwrap().unwrap();
928 assert_eq!(got.value, "v2");
929 }
930
931 #[tokio::test]
932 async fn count_tracks_inserts_and_deletes() {
933 let store = JsonStore::in_memory(indexed_table()).await.unwrap();
934 assert_eq!(store.count().await.unwrap(), 0);
935
936 store.put("1", &make("1", "one", "v")).await.unwrap();
937 store.put("2", &make("2", "two", "v")).await.unwrap();
938 store.put("3", &make("3", "three", "v")).await.unwrap();
939 assert_eq!(store.count().await.unwrap(), 3);
940
941 // Upsert of an existing id must NOT bump the count.
942 store.put("1", &make("1", "one", "v2")).await.unwrap();
943 assert_eq!(store.count().await.unwrap(), 3);
944
945 assert!(store.delete("2").await.unwrap());
946 assert_eq!(store.count().await.unwrap(), 2);
947 }
948
949 #[tokio::test]
950 async fn list_returns_every_row() {
951 let store = JsonStore::in_memory(blob_only_table()).await.unwrap();
952 store.put("c", &make("c", "cc", "3")).await.unwrap();
953 store.put("a", &make("a", "aa", "1")).await.unwrap();
954 store.put("b", &make("b", "bb", "2")).await.unwrap();
955
956 let list = store.list().await.unwrap();
957 assert_eq!(list.len(), 3);
958 // Ordered by id ASC, as documented.
959 let ids: Vec<&str> = list.iter().map(|r| r.id.as_str()).collect();
960 assert_eq!(ids, vec!["a", "b", "c"]);
961 }
962
963 // =========================================================================
964 // Configurable primary-key column tests
965 // =========================================================================
966
967 /// Record stored under a non-`id` primary key — mirrors the `deployments`
968 /// table whose PK column is `name`.
969 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
970 struct NamedRecord {
971 name: String,
972 payload: String,
973 }
974
975 fn named_pk_table() -> JsonTable<NamedRecord> {
976 JsonTable {
977 name: "named_records",
978 primary_key: "name",
979 indexes: &[],
980 unique_constraints: &[],
981 }
982 }
983
984 #[tokio::test]
985 async fn custom_primary_key_round_trips() {
986 let store = JsonStore::in_memory(named_pk_table()).await.unwrap();
987
988 // The generated DDL must declare `name` (not `id`) as the PK column.
989 // SQLite has no `id` column on this table, so any query/DDL that still
990 // hardcoded `id` would fail outright here.
991 let pk_col: (String,) =
992 sqlx::query_as("SELECT name FROM pragma_table_info('named_records') WHERE pk = 1")
993 .fetch_one(&store.pool)
994 .await
995 .unwrap();
996 assert_eq!(pk_col.0, "name", "primary key column must be `name`");
997
998 let rec = NamedRecord {
999 name: "alpha".to_string(),
1000 payload: "v1".to_string(),
1001 };
1002 store.put(&rec.name, &rec).await.unwrap();
1003
1004 // get keyed by the custom PK
1005 let got = store.get("alpha").await.unwrap().expect("must exist");
1006 assert_eq!(got, rec);
1007
1008 // upsert by the same PK overwrites in place (ON CONFLICT(name))
1009 let updated = NamedRecord {
1010 name: "alpha".to_string(),
1011 payload: "v2".to_string(),
1012 };
1013 store.put(&updated.name, &updated).await.unwrap();
1014 assert_eq!(store.count().await.unwrap(), 1);
1015 assert_eq!(store.get("alpha").await.unwrap().unwrap().payload, "v2");
1016
1017 // list ordered by the custom PK ascending
1018 store
1019 .put(
1020 "beta",
1021 &NamedRecord {
1022 name: "beta".to_string(),
1023 payload: "vb".to_string(),
1024 },
1025 )
1026 .await
1027 .unwrap();
1028 let names: Vec<String> = store
1029 .list()
1030 .await
1031 .unwrap()
1032 .into_iter()
1033 .map(|r| r.name)
1034 .collect();
1035 assert_eq!(names, vec!["alpha".to_string(), "beta".to_string()]);
1036
1037 // delete keyed by the custom PK
1038 assert!(store.delete("alpha").await.unwrap());
1039 assert!(store.get("alpha").await.unwrap().is_none());
1040 assert!(!store.delete("alpha").await.unwrap());
1041 }
1042
1043 #[tokio::test]
1044 async fn list_lossy_skips_undeserializable_rows() {
1045 let store = JsonStore::in_memory(blob_only_table()).await.unwrap();
1046 store.put("a", &make("a", "aa", "1")).await.unwrap();
1047 store.put("c", &make("c", "cc", "3")).await.unwrap();
1048
1049 // Inject a row whose data_json can't decode into TestRecord.
1050 sqlx::query(
1051 "INSERT INTO test_blobs (id, data_json, created_at, updated_at) \
1052 VALUES (?, ?, ?, ?)",
1053 )
1054 .bind("b")
1055 .bind("{ not valid json for TestRecord")
1056 .bind("2026-01-01T00:00:00+00:00")
1057 .bind("2026-01-01T00:00:00+00:00")
1058 .execute(&store.pool)
1059 .await
1060 .unwrap();
1061
1062 // Strict list() errors on the poisoned row...
1063 assert!(store.list().await.is_err());
1064
1065 // ...while list_lossy() skips it and returns the two good rows,
1066 // ordered by the PK ascending.
1067 let good = store.list_lossy().await.unwrap();
1068 let ids: Vec<&str> = good.iter().map(|r| r.id.as_str()).collect();
1069 assert_eq!(ids, vec!["a", "c"]);
1070 }
1071
1072 // =========================================================================
1073 // Compound UNIQUE + list_where{,_null,_opt} tests
1074 // =========================================================================
1075
1076 #[tokio::test]
1077 async fn compound_unique_rejects_duplicate_pair() {
1078 let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1079 store
1080 .put("id-1", &make_scoped("id-1", "foo", Some("proj-a")))
1081 .await
1082 .unwrap();
1083
1084 // Same (name, scope) under a different id must fail with AlreadyExists.
1085 let err = store
1086 .put("id-2", &make_scoped("id-2", "foo", Some("proj-a")))
1087 .await
1088 .expect_err("compound unique violation must error");
1089 match err {
1090 StorageError::AlreadyExists(msg) => {
1091 assert!(
1092 msg.contains("scoped_records"),
1093 "message should mention the table name, got: {msg}"
1094 );
1095 }
1096 other => panic!("expected AlreadyExists, got {other:?}"),
1097 }
1098 }
1099
1100 #[tokio::test]
1101 async fn compound_unique_permits_distinct_combinations() {
1102 let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1103 // Same name in different scopes — allowed.
1104 store
1105 .put("id-1", &make_scoped("id-1", "foo", Some("proj-a")))
1106 .await
1107 .unwrap();
1108 store
1109 .put("id-2", &make_scoped("id-2", "foo", Some("proj-b")))
1110 .await
1111 .unwrap();
1112 // Different name in the same scope — allowed.
1113 store
1114 .put("id-3", &make_scoped("id-3", "bar", Some("proj-a")))
1115 .await
1116 .unwrap();
1117
1118 assert_eq!(store.count().await.unwrap(), 3);
1119 }
1120
1121 #[tokio::test]
1122 async fn list_where_filters_by_column_value() {
1123 let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1124 store
1125 .put("a", &make_scoped("a", "x", Some("s1")))
1126 .await
1127 .unwrap();
1128 store
1129 .put("b", &make_scoped("b", "y", Some("s1")))
1130 .await
1131 .unwrap();
1132 store
1133 .put("c", &make_scoped("c", "z", Some("s2")))
1134 .await
1135 .unwrap();
1136
1137 let s1 = store.list_where("scope", "s1").await.unwrap();
1138 assert_eq!(s1.len(), 2);
1139 let ids: Vec<&str> = s1.iter().map(|r| r.id.as_str()).collect();
1140 // Ordered by id ASC.
1141 assert_eq!(ids, vec!["a", "b"]);
1142
1143 let s2 = store.list_where("scope", "s2").await.unwrap();
1144 assert_eq!(s2.len(), 1);
1145 assert_eq!(s2[0].id, "c");
1146
1147 let missing = store.list_where("scope", "s3").await.unwrap();
1148 assert!(missing.is_empty());
1149 }
1150
1151 #[tokio::test]
1152 async fn list_where_null_filters_null_rows() {
1153 let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1154 store.put("a", &make_scoped("a", "x", None)).await.unwrap();
1155 store
1156 .put("b", &make_scoped("b", "y", Some("s1")))
1157 .await
1158 .unwrap();
1159 store.put("c", &make_scoped("c", "z", None)).await.unwrap();
1160
1161 let nulls = store.list_where_null("scope").await.unwrap();
1162 assert_eq!(nulls.len(), 2);
1163 let ids: Vec<&str> = nulls.iter().map(|r| r.id.as_str()).collect();
1164 assert_eq!(ids, vec!["a", "c"]);
1165 }
1166
1167 #[tokio::test]
1168 async fn list_where_opt_unknown_column_returns_other() {
1169 let store = JsonStore::in_memory(scoped_table()).await.unwrap();
1170 let err = store
1171 .list_where_opt("not_a_column", Some("x"))
1172 .await
1173 .expect_err("unknown column must fail");
1174 match err {
1175 StorageError::Other(msg) => {
1176 assert!(msg.contains("not_a_column"));
1177 assert!(msg.contains("scoped_records"));
1178 }
1179 other => panic!("expected Other, got {other:?}"),
1180 }
1181
1182 let err2 = store
1183 .list_where_opt("not_a_column", None)
1184 .await
1185 .expect_err("unknown column must fail");
1186 match err2 {
1187 StorageError::Other(_) => {}
1188 other => panic!("expected Other, got {other:?}"),
1189 }
1190 }
1191}