Skip to main content

pylon_runtime/
lib.rs

1pub mod account_backend;
2pub mod api_key_backend;
3pub mod audit_backend;
4pub mod cache_handlers;
5pub mod cache_server;
6pub mod config;
7pub mod cron;
8pub mod datastore;
9pub mod ip_limit;
10pub mod job_store;
11pub mod jobs;
12pub mod log;
13pub mod loro_store;
14pub mod magic_code_backend;
15pub mod metrics;
16pub mod oauth_backend;
17pub mod openapi;
18pub mod org_backend;
19pub mod pg_loro_store;
20pub mod presence;
21pub mod pubsub;
22pub mod rate_limit;
23pub mod resp;
24pub mod resp_server;
25pub mod rooms;
26pub mod scheduler;
27pub mod server;
28pub mod session_backend;
29pub mod shard_ws;
30pub mod sse;
31pub mod tls;
32pub mod verification_backend;
33pub mod workflow_store;
34pub mod workflows;
35pub mod ws;
36
37use std::collections::HashMap;
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Mutex;
40
41use pylon_kernel::{AppManifest, ManifestEntity};
42use rusqlite::Connection;
43
44// ---------------------------------------------------------------------------
45// Runtime errors
46// ---------------------------------------------------------------------------
47
48#[derive(Debug, Clone)]
49pub struct RuntimeError {
50    pub code: String,
51    pub message: String,
52}
53
54impl std::fmt::Display for RuntimeError {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "[{}] {}", self.code, self.message)
57    }
58}
59
60impl std::error::Error for RuntimeError {}
61
62/// Lift a `DataError` (the cross-crate error type for PG `DataStore`
63/// operations) into a `RuntimeError`. Used by `PostgresDataStore`
64/// closure bounds (`with_client`, `with_transaction`) so callers in
65/// the runtime can propagate PG errors with their native error type.
66impl From<pylon_http::DataError> for RuntimeError {
67    fn from(e: pylon_http::DataError) -> Self {
68        RuntimeError {
69            code: e.code,
70            message: e.message,
71        }
72    }
73}
74
75// ---------------------------------------------------------------------------
76// SQL safety helpers
77// ---------------------------------------------------------------------------
78
79/// Quote a SQL identifier with double quotes to prevent injection.
80/// Any embedded double quotes are escaped by doubling them (SQL standard).
81fn quote_ident(name: &str) -> String {
82    format!("\"{}\"", name.replace('"', "\"\""))
83}
84
85/// Validate that `name` is a known column on the given entity.
86/// Always allows "id" (the primary key). Returns an error listing valid
87/// columns when validation fails.
88fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
89    if name == "id" {
90        return Ok(());
91    }
92    if entity.fields.iter().any(|f| f.name == name) {
93        return Ok(());
94    }
95    Err(RuntimeError {
96        code: "INVALID_COLUMN".into(),
97        message: format!(
98            "Unknown column \"{}\" -- valid columns: id, {}",
99            name,
100            entity
101                .fields
102                .iter()
103                .map(|f| f.name.as_str())
104                .collect::<Vec<_>>()
105                .join(", ")
106        ),
107    })
108}
109
110// ---------------------------------------------------------------------------
111// Connection tuning
112// ---------------------------------------------------------------------------
113
114/// Apply the production pragma set on a SQLite connection. Identical
115/// values to `pylon_storage::sqlite::tune_connection` — kept here too
116/// because the Runtime opens its own connections directly (write +
117/// read pool) without going through the storage adapter.
118///
119/// See `crates/storage/src/sqlite.rs` for the rationale on each
120/// pragma. Skipping it on writes drops throughput by 5–10×.
121fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
122    let pragmas: &[(&str, &str)] = if in_memory {
123        &[
124            ("temp_store", "MEMORY"),
125            ("cache_size", "-65536"),
126            ("foreign_keys", "ON"),
127        ]
128    } else {
129        &[
130            ("journal_mode", "WAL"),
131            ("synchronous", "NORMAL"),
132            ("cache_size", "-65536"),
133            ("mmap_size", "268435456"),
134            ("temp_store", "MEMORY"),
135            ("busy_timeout", "5000"),
136            ("foreign_keys", "ON"),
137            ("wal_autocheckpoint", "1000"),
138        ]
139    };
140    for (key, value) in pragmas {
141        let _ = conn.pragma_update(None, key, value);
142    }
143}
144
145// ---------------------------------------------------------------------------
146// Read connection guard
147// ---------------------------------------------------------------------------
148
149/// A guard that dereferences to a `Connection`, abstracting over whether
150/// it came from the read pool or fell back to the write connection.
151enum ReadConnGuard<'a> {
152    Pooled(std::sync::MutexGuard<'a, Connection>),
153    Write(std::sync::MutexGuard<'a, Connection>),
154}
155
156impl<'a> std::ops::Deref for ReadConnGuard<'a> {
157    type Target = Connection;
158    fn deref(&self) -> &Connection {
159        match self {
160            ReadConnGuard::Pooled(g) => g,
161            ReadConnGuard::Write(g) => g,
162        }
163    }
164}
165
166// ---------------------------------------------------------------------------
167// Runtime — the core execution engine
168// ---------------------------------------------------------------------------
169
170/// A manifest-driven runtime that executes CRUD operations against an
171/// underlying data store. Two backends are supported:
172///
173/// - **SQLite** (default): single-process, file-or-memory, with a write
174///   mutex + read pool, FTS5 search, and per-row LoroDoc CRDT snapshots.
175/// - **Postgres**: live cluster, suitable for multi-replica deployments.
176///   Routes entity CRUD through [`pylon_storage::pg_datastore::PostgresDataStore`].
177///   CRDT mode and FTS5-shaped search are SQLite-only at this layer; the
178///   Postgres backend returns `NOT_SUPPORTED` for those paths and the router
179///   degrades to JSON change events (no binary CRDT broadcasts).
180///
181/// Pick a backend by passing a `postgres://` URL to [`Runtime::open`]; any
182/// other string is treated as a SQLite filesystem path.
183pub struct Runtime {
184    backend: RuntimeBackend,
185    manifest: AppManifest,
186    entities: HashMap<String, ManifestEntity>,
187    /// True only for the SQLite in-memory variant. Postgres mode reports false.
188    /// Gates the test-reset endpoint — a false positive here would let
189    /// `/api/__test__/reset` truncate real tables.
190    is_in_memory: bool,
191}
192
193/// Backend storage for entity CRUD. SQLite variant owns the connection
194/// pool and CRDT cache; Postgres variant wraps a `PostgresDataStore`.
195enum RuntimeBackend {
196    Sqlite(SqliteBackend),
197    Postgres(PgBackend),
198}
199
200/// SQLite-backed entity store. WAL mode allows one writer and multiple
201/// concurrent readers — the struct exploits this with a single write
202/// connection behind a mutex plus a pool of read-only connections.
203struct SqliteBackend {
204    /// Write connection — single mutex, serializes writes.
205    write_conn: Mutex<Connection>,
206    /// Read connections — pool of connections for concurrent reads.
207    /// Empty for in-memory databases where extra connections are not possible.
208    read_pool: Vec<Mutex<Connection>>,
209    /// Counter for round-robin read pool selection.
210    read_counter: AtomicUsize,
211    /// Per-row LoroDoc cache + sidecar persistence. Used for entities with
212    /// `crdt: true` (the default). Reads still hit SQLite directly via the
213    /// read pool — the LoroDoc just produces the projected JSON that gets
214    /// materialized into SQLite columns on every write.
215    crdt: crate::loro_store::LoroStore,
216}
217
218/// Postgres-backed entity store. Wraps `PostgresDataStore` from
219/// pylon-storage and delegates the `DataStore` surface directly.
220pub(crate) struct PgBackend {
221    pub(crate) store: pylon_storage::pg_datastore::PostgresDataStore,
222    /// Per-row LoroDoc snapshot store for entities with `crdt: true`.
223    /// Arc'd so the runtime layer can hand a clone to PgCrdtHookImpl
224    /// (the bridge that lets PgTxStore call back into CRDT machinery
225    /// from inside a held tx).
226    pub(crate) crdt: std::sync::Arc<crate::pg_loro_store::PgLoroStore>,
227}
228
229/// Number of read-only connections to open in the pool.
230const READ_POOL_SIZE: usize = 4;
231
232/// True iff `url` is a Postgres connection string. Treats `postgres://`,
233/// `postgresql://`, and the ambient-credentials forms as PG; everything
234/// else is interpreted as a SQLite filesystem path.
235fn is_postgres_url(url: &str) -> bool {
236    let lower = url.to_ascii_lowercase();
237    lower.starts_with("postgres://") || lower.starts_with("postgresql://")
238}
239
240/// Convert a `pylon_http::DataError` (returned by `PostgresDataStore`)
241/// into the runtime's error type. The codes round-trip; only the type
242/// changes.
243fn data_err_to_runtime(e: pylon_http::DataError) -> RuntimeError {
244    RuntimeError {
245        code: e.code,
246        message: e.message,
247    }
248}
249
250impl Runtime {
251    /// Open a runtime against either a SQLite file path or a Postgres URL.
252    ///
253    /// Backend selection is by URL prefix:
254    /// - `postgres://...` or `postgresql://...` → Postgres (requires the
255    ///   `postgres-live` feature on `pylon-storage`, enabled by default).
256    /// - Anything else → SQLite, treating the string as a filesystem path
257    ///   (`":memory:"` works via `Runtime::in_memory` instead).
258    pub fn open(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
259        if is_postgres_url(url) {
260            Self::open_postgres(url, manifest)
261        } else {
262            let conn = Connection::open(url).map_err(|e| RuntimeError {
263                code: "RUNTIME_OPEN_FAILED".into(),
264                message: format!("Failed to open database: {e}"),
265            })?;
266            Self::from_connection(conn, manifest, false)
267        }
268    }
269
270    /// Open a runtime backed by a live Postgres cluster.
271    ///
272    /// Schema must be applied separately via `pylon migrate` / the
273    /// storage adapter's plan apply path — Runtime does not auto-create
274    /// tables on Postgres (in contrast to SQLite, where `from_connection`
275    /// runs CREATE TABLE IF NOT EXISTS on every open). This matches how
276    /// production Postgres deployments are typically managed: schema is
277    /// migrated via a controlled, observable step, not as a side effect
278    /// of the server starting up.
279    pub fn open_postgres(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
280        let store = pylon_storage::pg_datastore::PostgresDataStore::connect(url, manifest.clone())
281            .map_err(data_err_to_runtime)?;
282        // Bootstrap the CRDT sidecar table on every open. Idempotent
283        // (`CREATE TABLE IF NOT EXISTS`); same shape as the SQLite
284        // path's `ensure_sidecar` call. Without this, the first
285        // CRDT-mode write would error because `_pylon_crdt_snapshots`
286        // doesn't exist yet on a fresh PG database.
287        store
288            .with_client(|c| crate::pg_loro_store::ensure_sidecar(c))
289            .map_err(|e| RuntimeError {
290                code: "CRDT_SIDECAR_BOOTSTRAP_FAILED".into(),
291                message: format!("ensure pg crdt sidecar: {e}"),
292            })?;
293        let entities: HashMap<String, ManifestEntity> = manifest
294            .entities
295            .iter()
296            .map(|e| (e.name.clone(), e.clone()))
297            .collect();
298        Ok(Self {
299            backend: RuntimeBackend::Postgres(PgBackend {
300                store,
301                crdt: std::sync::Arc::new(crate::pg_loro_store::PgLoroStore::new()),
302            }),
303            manifest,
304            entities,
305            is_in_memory: false,
306        })
307    }
308
309    /// Returns true if this runtime is backed by an in-memory SQLite DB.
310    ///
311    /// Stored at open time rather than queried via `conn.path()` because
312    /// the path-based check conflates "no filename" with "in-memory":
313    /// `Connection::open("")` yields a file-backed DB with empty path,
314    /// and would falsely pass as in-memory. Since we always know at
315    /// construction time which constructor was used, track the bit.
316    ///
317    /// Gates the test-reset endpoint — a false positive here would let
318    /// `/api/__test__/reset` truncate real tables.
319    pub fn is_in_memory(&self) -> bool {
320        self.is_in_memory
321    }
322
323    /// Filesystem path to the SQLite database, if this runtime is file-backed.
324    /// Returns `None` for in-memory runtimes AND Postgres runtimes (no local
325    /// file). Used by the server bootstrap to derive companion paths
326    /// (session store, change log persistence) without requiring the caller
327    /// to pass them in.
328    pub fn db_path(&self) -> Option<String> {
329        if self.is_in_memory {
330            return None;
331        }
332        let sb = match &self.backend {
333            RuntimeBackend::Sqlite(sb) => sb,
334            RuntimeBackend::Postgres(_) => return None,
335        };
336        let conn = sb.write_conn.lock().ok()?;
337        conn.path().filter(|p| !p.is_empty()).map(String::from)
338    }
339
340    /// Drop every row from every entity table. Intended for test harnesses
341    /// that call `/api/__test__/reset` between cases; refuses to run on
342    /// anything but an in-memory database.
343    ///
344    /// Does NOT drop the tables themselves — schema stays, indexes stay,
345    /// triggers stay. Just truncates user data + the change log.
346    pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
347        if !self.is_in_memory() {
348            return Err(RuntimeError {
349                code: "RESET_REFUSED".into(),
350                message: "reset_for_tests is only available on in-memory databases".into(),
351            });
352        }
353        let conn = self.lock_write_conn()?;
354        let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
355        for name in entity_names {
356            let sql = format!("DELETE FROM {}", quote_ident(&name));
357            let _ = conn.execute(&sql, []);
358            // Also clear any FTS5 shadow table if present.
359            let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
360            let _ = conn.execute(&fts_sql, []);
361        }
362        Ok(())
363    }
364
365    /// Create an in-memory SQLite-backed runtime (useful for tests and
366    /// benchmarks). For Postgres-backed equivalents, use `open_postgres`
367    /// with a test-cluster URL.
368    pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
369        let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
370            code: "RUNTIME_OPEN_FAILED".into(),
371            message: format!("Failed to open in-memory database: {e}"),
372        })?;
373        Self::from_connection(conn, manifest, true)
374    }
375
376    fn from_connection(
377        conn: Connection,
378        manifest: AppManifest,
379        is_in_memory: bool,
380    ) -> Result<Self, RuntimeError> {
381        // Apply the production pragma set on the write connection.
382        tune_runtime_connection(&conn, is_in_memory);
383
384        // Build entity lookup map.
385        let entities: HashMap<String, ManifestEntity> = manifest
386            .entities
387            .iter()
388            .map(|e| (e.name.clone(), e.clone()))
389            .collect();
390
391        // Create tables for all entities.
392        for entity in &manifest.entities {
393            let fields: Vec<String> = entity
394                .fields
395                .iter()
396                .map(|f| {
397                    let col_type = match f.field_type.as_str() {
398                        "int" => "INTEGER",
399                        "float" => "REAL",
400                        "bool" => "INTEGER",
401                        _ => "TEXT",
402                    };
403                    let not_null = if f.optional { "" } else { " NOT NULL" };
404                    let unique = if f.unique { " UNIQUE" } else { "" };
405                    format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
406                })
407                .collect();
408
409            let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
410            cols.extend(fields);
411            let sql = format!(
412                "CREATE TABLE IF NOT EXISTS {} ({})",
413                quote_ident(&entity.name),
414                cols.join(", ")
415            );
416            conn.execute(&sql, []).map_err(|e| RuntimeError {
417                code: "SCHEMA_INIT_FAILED".into(),
418                message: format!("Failed to create table {}: {e}", entity.name),
419            })?;
420
421            // Create indexes.
422            for idx in &entity.indexes {
423                let unique_kw = if idx.unique { "UNIQUE " } else { "" };
424                let quoted_fields: Vec<String> =
425                    idx.fields.iter().map(|f| quote_ident(f)).collect();
426                let idx_sql = format!(
427                    "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
428                    quote_ident(&idx.name),
429                    quote_ident(&entity.name),
430                    quoted_fields.join(", ")
431                );
432                conn.execute(&idx_sql, []).ok();
433            }
434
435            // Create an FTS5 virtual table over all text-ish fields so clients
436            // can do full-text search via the `$search` query operator.
437            //
438            // Fields that look like "string" / "richtext" / "text" are indexed.
439            // The FTS table is a contentless external-content table pointed at
440            // the entity table, so SQLite keeps it consistent via triggers we
441            // install below.
442            let text_fields: Vec<&str> = entity
443                .fields
444                .iter()
445                .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
446                .map(|f| f.name.as_str())
447                .collect();
448            if !text_fields.is_empty() {
449                let fts_name = format!("{}_fts", entity.name);
450                let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
451                let fts_sql = format!(
452                    "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
453                    quote_ident(&fts_name),
454                    quoted_cols.join(", "),
455                    quote_ident(&entity.name),
456                );
457                // FTS5 may not be compiled in; ignore errors so those builds
458                // still work (queries using $search will return empty).
459                let fts_ok = conn.execute(&fts_sql, []).is_ok();
460
461                if fts_ok {
462                    // Sync triggers: keep FTS index current on INSERT/UPDATE/DELETE.
463                    //
464                    // Subtle bug fixed: the trigger NAME must be built from
465                    // the raw `fts_name` + suffix and THEN quoted once.
466                    // Previously this code quoted `fts_name` first and then
467                    // appended `_ai`/`_ad`/`_au` AFTER the closing quote,
468                    // producing invalid SQL like `"foo_fts"_ai`. The
469                    // `.ok()` after execute silently ate the error, so the
470                    // triggers were never created and FTS stayed out of
471                    // sync on writes.
472                    let tbl = quote_ident(&entity.name);
473                    let ftb = quote_ident(&fts_name);
474                    let cols_list = quoted_cols.join(", ");
475                    let new_list: Vec<String> = text_fields
476                        .iter()
477                        .map(|f| format!("new.{}", quote_ident(f)))
478                        .collect();
479                    let old_list: Vec<String> = text_fields
480                        .iter()
481                        .map(|f| format!("old.{}", quote_ident(f)))
482                        .collect();
483
484                    let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
485                    let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
486                    let trigger_au = quote_ident(&format!("{}_au", fts_name));
487
488                    let trigger_ins = format!(
489                        "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
490                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
491                        new_vals = new_list.join(", "),
492                    );
493                    let trigger_del = format!(
494                        "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
495                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
496                        old_vals = old_list.join(", "),
497                    );
498                    let trigger_upd = format!(
499                        "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
500                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
501                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
502                        new_vals = new_list.join(", "),
503                        old_vals = old_list.join(", "),
504                    );
505                    // Log failures instead of silently dropping — FTS going
506                    // stale should be visible to operators.
507                    for (label, sql) in [
508                        ("ai", &trigger_ins),
509                        ("ad", &trigger_del),
510                        ("au", &trigger_upd),
511                    ] {
512                        if let Err(e) = conn.execute(sql, []) {
513                            tracing::warn!(
514                                "[fts] failed to create {label} trigger for {}: {e}",
515                                entity.name
516                            );
517                        }
518                    }
519                }
520            }
521        }
522
523        // Open read-only connection pool for file-backed databases.
524        // In-memory databases cannot share connections, so the pool stays empty
525        // and reads fall back to the write connection.
526        let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
527
528        let read_pool = if let Some(ref path) = db_path {
529            let mut pool = Vec::with_capacity(READ_POOL_SIZE);
530            for _ in 0..READ_POOL_SIZE {
531                let read_conn = Connection::open_with_flags(
532                    path,
533                    rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
534                        | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
535                )
536                .map_err(|e| RuntimeError {
537                    code: "POOL_OPEN_FAILED".into(),
538                    message: format!("Failed to open read connection: {e}"),
539                })?;
540                tune_runtime_connection(&read_conn, false);
541                pool.push(Mutex::new(read_conn));
542            }
543            pool
544        } else {
545            // In-memory DB — no separate read connections possible.
546            Vec::new()
547        };
548
549        // Sidecar table for CRDT snapshots — created always so toggling
550        // `crdt: true` on an entity post-deploy doesn't need a migration.
551        crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
552            code: "CRDT_SIDECAR_FAILED".into(),
553            message: format!("create CRDT sidecar table: {e}"),
554        })?;
555
556        Ok(Self {
557            backend: RuntimeBackend::Sqlite(SqliteBackend {
558                write_conn: Mutex::new(conn),
559                read_pool,
560                read_counter: AtomicUsize::new(0),
561                crdt: crate::loro_store::LoroStore::new(),
562            }),
563            manifest,
564            entities,
565            is_in_memory,
566        })
567    }
568
569    /// Create the search index tables (`_facet_bitmap`, per-entity
570    /// `_fts_<Entity>`, and a covering index for each declared
571    /// sortable field) for every searchable entity in the manifest.
572    ///
573    /// Production deployments do this via the storage adapter's
574    /// `apply_schema` / migration plan; that path also handles
575    /// adding/removing the tables when a `search:` block is added or
576    /// removed across deploys. This method is a quick path for tests
577    /// and benchmarks that build a `Runtime::in_memory(...)` directly
578    /// without going through the schema-plan pipeline.
579    pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
580        // Postgres: schema (FTS, facets) is owned by the storage adapter's
581        // migration plan. Tests / benchmarks against Postgres must apply
582        // the plan separately; this fast-path is a SQLite-only convenience.
583        if matches!(self.backend, RuntimeBackend::Postgres(_)) {
584            return Ok(());
585        }
586        let conn = self.lock_write_conn()?;
587        conn.execute(pylon_storage::search::create_facet_table_sql(), [])
588            .map_err(|e| RuntimeError {
589                code: "FACET_TABLE_FAILED".into(),
590                message: format!("create _facet_bitmap: {e}"),
591            })?;
592        for entity in &self.manifest.entities {
593            if let Some(cfg) = &entity.search {
594                if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
595                    conn.execute(&sql, []).map_err(|e| RuntimeError {
596                        code: "FTS_TABLE_FAILED".into(),
597                        message: format!("create FTS table for {}: {e}", entity.name),
598                    })?;
599                }
600                for field in &cfg.sortable {
601                    let idx_sql = format!(
602                        "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
603                        entity.name, entity.name,
604                    );
605                    conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
606                        code: "SORT_INDEX_FAILED".into(),
607                        message: format!("create sort index for {}.{field}: {e}", entity.name),
608                    })?;
609                }
610            }
611        }
612        Ok(())
613    }
614
615    /// Return a reference to the app manifest.
616    pub fn manifest(&self) -> &AppManifest {
617        &self.manifest
618    }
619
620    /// Expose the write connection mutex for transactional operations.
621    /// SQLite-only — Postgres mode returns `NOT_SQLITE_BACKEND`. Callers
622    /// that need a transaction on Postgres should use [`Runtime::transact_ops`]
623    /// (via the `DataStore` trait), which routes to a real Postgres
624    /// transaction inside `PostgresDataStore`.
625    pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
626        self.lock_write_conn()
627    }
628
629    /// Return the number of read connections in the pool. Always 0 for
630    /// in-memory SQLite (pool is empty by design) and for Postgres mode
631    /// (the pool concept doesn't apply — `PostgresDataStore` manages its
632    /// own connection internally).
633    pub fn read_pool_size(&self) -> usize {
634        match &self.backend {
635            RuntimeBackend::Sqlite(sb) => sb.read_pool.len(),
636            RuntimeBackend::Postgres(_) => 0,
637        }
638    }
639
640    /// Return true if this runtime is backed by Postgres. Useful for
641    /// SQLite-only fast-paths to early-exit cleanly.
642    pub fn is_postgres(&self) -> bool {
643        matches!(self.backend, RuntimeBackend::Postgres(_))
644    }
645
646    // -----------------------------------------------------------------------
647    // CRDT helpers
648    // -----------------------------------------------------------------------
649
650    /// Map an entity's manifest fields → the [`pylon_crdt::CrdtField`] vec
651    /// the LoroStore needs. Resolves each field's CRDT shape from the
652    /// (type, annotation) pair via `pylon_crdt::field_kind`. Caches
653    /// nothing yet — called per write, fine at our entity counts.
654    pub(crate) fn crdt_fields_for(
655        &self,
656        ent: &ManifestEntity,
657    ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
658        let mut out = Vec::with_capacity(ent.fields.len());
659        for f in &ent.fields {
660            // Skip the implicit `id` column — it's the row key, not a
661            // CRDT-managed value. SQLite's PRIMARY KEY constraint owns it.
662            if f.name == "id" {
663                continue;
664            }
665            let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
666                code: "INVALID_CRDT_FIELD".into(),
667                message: format!(
668                    "{}.{}: {e} (declared type={}, crdt={:?})",
669                    ent.name, f.name, f.field_type, f.crdt
670                ),
671            })?;
672            out.push(pylon_crdt::CrdtField {
673                name: f.name.clone(),
674                kind,
675            });
676        }
677        Ok(out)
678    }
679
680    /// Borrow the CRDT store. SQLite-only — Postgres mode does not yet
681    /// support per-row CRDT snapshots at the runtime layer (CRDT
682    /// broadcasts degrade to JSON change events).
683    ///
684    /// # Panics
685    /// Panics on Postgres backend. Call sites that may run under either
686    /// backend should branch on `is_postgres()` first.
687    pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
688        match &self.backend {
689            RuntimeBackend::Sqlite(sb) => &sb.crdt,
690            RuntimeBackend::Postgres(_) => {
691                panic!("crdt_store() called on Postgres-backed Runtime")
692            }
693        }
694    }
695
696    // -----------------------------------------------------------------------
697    // CRUD operations
698    // -----------------------------------------------------------------------
699
700    /// Insert a new row. Returns the generated ID.
701    ///
702    /// For entities with `crdt: true` (the default) the LoroDoc snapshot
703    /// + the SQLite materialized row are committed together in a single
704    /// SQLite transaction so a crash between the two leaves neither.
705    /// `crdt: false` entities skip the LoroDoc and use a direct write
706    /// (legacy LWW path). Both produce the same on-disk row shape, so
707    /// reads, indexes, FTS, and policies don't change between modes.
708    pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
709        if let Some(pg) = self.pg_backend() {
710            let ent = self.require_entity(entity)?;
711            // Both CRDT-mode and non-CRDT writes go through one
712            // transaction so the row, the FTS shadow, and (for CRDT)
713            // the LoroDoc snapshot either all commit or all roll back.
714            // Pre-fix this was three separate autocommits and any
715            // failure between them desynced the layers.
716            if ent.crdt {
717                let crdt_fields = self.crdt_fields_for(ent)?;
718                let id = generate_id();
719                // Inject the generated id so build_insert_sql reuses
720                // it — keeps the snapshot key and the row id aligned.
721                let mut row = data.clone();
722                if let Some(obj) = row.as_object_mut() {
723                    obj.insert("id".into(), serde_json::Value::String(id.clone()));
724                }
725                let result = pg
726                    .store
727                    .with_transaction_raw(|tx| -> Result<(), RuntimeError> {
728                        pg.crdt
729                            .apply_patch(tx, entity, &id, &crdt_fields, data)
730                            .map_err(|e| RuntimeError {
731                                code: "CRDT_APPLY_FAILED".into(),
732                                message: format!("crdt write {entity}/{id}: {e}"),
733                            })?;
734                        pylon_storage::pg_tx_store::tx_insert(tx, &self.manifest, entity, &row)
735                            .map(|_| ())
736                            .map_err(data_err_to_runtime)?;
737                        pg.crdt.cache_after_commit(tx, entity, &id);
738                        Ok(())
739                    });
740                if result.is_err() {
741                    // Rollback drops the persisted snapshot, but the
742                    // in-memory LoroDoc was mutated in-place by
743                    // apply_patch. Evict it so the next access
744                    // re-hydrates from disk (which is back in the
745                    // pre-apply state). Without this, the cache would
746                    // hold a doc ahead of the materialized row.
747                    pg.crdt.evict(entity, &id);
748                }
749                result?;
750                return Ok(id);
751            }
752            // Non-CRDT path: still one tx — the typed `DataStore::insert`
753            // already wraps in `with_transaction` internally for FTS
754            // atomicity, so we can delegate straight through.
755            return pylon_http::DataStore::insert(&pg.store, entity, data)
756                .map_err(data_err_to_runtime);
757        }
758        let ent = self.require_entity(entity)?;
759        let conn = self.lock_write_conn()?;
760
761        let id = generate_id();
762
763        let obj = data.as_object().ok_or_else(|| RuntimeError {
764            code: "INVALID_DATA".into(),
765            message: "Insert data must be a JSON object".into(),
766        })?;
767
768        // Validate columns up-front so we don't even open a transaction
769        // for a patch that the SQL INSERT will reject.
770        for key in obj.keys() {
771            if key != "id" {
772                validate_column_name(key, ent)?;
773            }
774        }
775
776        // SQLite-only path past this point — Postgres dispatch happened
777        // at the top of `insert()`. Hoist the backend handle here so we
778        // can reach the LoroStore from inside the tx closure without
779        // a second runtime branch on every iteration.
780        let sb = self.sqlite_backend()?;
781
782        // Atomic block — CRDT sidecar snapshot + materialized SQL row +
783        // search-index maintenance all land together or none does. SQLite's
784        // rollback journal makes this crash-safe end-to-end.
785        with_write_tx(&conn, || {
786            if ent.crdt {
787                let crdt_fields = self.crdt_fields_for(ent)?;
788                sb.crdt
789                    .apply_patch(&conn, entity, &id, &crdt_fields, data)
790                    .map_err(|e| RuntimeError {
791                        code: "CRDT_APPLY_FAILED".into(),
792                        message: format!("crdt write {entity}/{id}: {e}"),
793                    })?;
794            }
795
796            let mut col_names = vec![quote_ident("id")];
797            let mut placeholders = vec!["?1".to_string()];
798            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
799
800            let mut idx = 2;
801            for (key, val) in obj {
802                if key == "id" {
803                    continue;
804                }
805                col_names.push(quote_ident(key));
806                placeholders.push(format!("?{idx}"));
807                values.push(json_to_sql(val));
808                idx += 1;
809            }
810
811            let sql = format!(
812                "INSERT INTO {} ({}) VALUES ({})",
813                quote_ident(entity),
814                col_names.join(", "),
815                placeholders.join(", ")
816            );
817
818            let params: Vec<&dyn rusqlite::types::ToSql> =
819                values.iter().map(|v| v.as_ref()).collect();
820            conn.execute(&sql, params.as_slice())
821                .map_err(|e| RuntimeError {
822                    code: "INSERT_FAILED".into(),
823                    message: format!("Insert into {entity} failed: {e}"),
824                })?;
825
826            // Search-index maintenance lives inside the same tx so a
827            // crash between the row insert and the FTS update can't leave
828            // the search index inconsistent with the row table.
829            if let Some(cfg) = ent.search.as_ref() {
830                if !cfg.is_empty() {
831                    pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
832                        .map_err(|e| RuntimeError {
833                            code: "SEARCH_MAINTENANCE_FAILED".into(),
834                            message: format!("search index update on insert {entity}: {e}"),
835                        })?;
836                }
837            }
838            Ok(())
839        })?;
840
841        Ok(id)
842    }
843
844    /// Get a single row by ID.
845    pub fn get_by_id(
846        &self,
847        entity: &str,
848        id: &str,
849    ) -> Result<Option<serde_json::Value>, RuntimeError> {
850        if let Some(pg) = self.pg_backend() {
851            return pylon_http::DataStore::get_by_id(&pg.store, entity, id)
852                .map_err(data_err_to_runtime);
853        }
854        let ent = self.require_entity(entity)?;
855        let conn = self.lock_read_conn()?;
856
857        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
858        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
859            code: "QUERY_FAILED".into(),
860            message: format!("Failed to prepare query: {e}"),
861        })?;
862
863        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
864
865        let result = stmt
866            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
867            .ok();
868
869        Ok(result)
870    }
871
872    /// List all rows for an entity.
873    pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
874        if let Some(pg) = self.pg_backend() {
875            return pylon_http::DataStore::list(&pg.store, entity).map_err(data_err_to_runtime);
876        }
877        let ent = self.require_entity(entity)?;
878        let conn = self.lock_read_conn()?;
879
880        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
881        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
882            code: "QUERY_FAILED".into(),
883            message: format!("Failed to prepare query: {e}"),
884        })?;
885
886        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
887
888        let rows = stmt
889            .query_map([], |row| Ok(row_to_json(row, &columns)))
890            .map_err(|e| RuntimeError {
891                code: "QUERY_FAILED".into(),
892                message: format!("Query failed: {e}"),
893            })?;
894
895        let mut result = Vec::new();
896        for row in rows {
897            if let Ok(val) = row {
898                result.push(val);
899            }
900        }
901        Ok(result)
902    }
903
904    /// List rows after a cursor ID (for cursor-based pagination).
905    pub fn list_after(
906        &self,
907        entity: &str,
908        after: Option<&str>,
909        limit: usize,
910    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
911        if let Some(pg) = self.pg_backend() {
912            return pylon_http::DataStore::list_after(&pg.store, entity, after, limit)
913                .map_err(data_err_to_runtime);
914        }
915        let ent = self.require_entity(entity)?;
916        let conn = self.lock_read_conn()?;
917
918        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
919        let table = quote_ident(entity);
920
921        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
922            Some(cursor) => (
923                format!(
924                    "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
925                    table
926                ),
927                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
928            ),
929            None => (
930                format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
931                vec![Box::new(limit as i64)],
932            ),
933        };
934
935        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
936            params.iter().map(|v| v.as_ref()).collect();
937
938        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
939            code: "QUERY_FAILED".into(),
940            message: format!("Failed to prepare query: {e}"),
941        })?;
942
943        let rows = stmt
944            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
945            .map_err(|e| RuntimeError {
946                code: "QUERY_FAILED".into(),
947                message: format!("Query failed: {e}"),
948            })?;
949
950        let mut result = Vec::new();
951        for row in rows {
952            if let Ok(val) = row {
953                result.push(val);
954            }
955        }
956        Ok(result)
957    }
958
959    /// Update a row by ID. Returns true if a row was found and updated.
960    ///
961    /// For entities with `crdt: true` (the default) the LoroDoc receives
962    /// the patch first; the SQLite UPDATE writes the same fields so the
963    /// materialized view stays in lockstep with the doc state.
964    pub fn update(
965        &self,
966        entity: &str,
967        id: &str,
968        data: &serde_json::Value,
969    ) -> Result<bool, RuntimeError> {
970        if let Some(pg) = self.pg_backend() {
971            let ent = self.require_entity(entity)?;
972            if ent.crdt {
973                // CRDT mode: snapshot apply + materialized update +
974                // FTS shadow rebuild all share one tx. Pre-fix the
975                // snapshot landed in autocommit and the row write in
976                // a separate one — a mid-write crash desynced them.
977                //
978                // The closure also FAILS the tx if `tx_update` returns
979                // false (no row matched). Without that, the snapshot
980                // would commit alone — orphaned state pointing at a
981                // row that doesn't exist. Codex flagged this. On
982                // rollback the runtime evicts the cached LoroDoc so
983                // the next read re-hydrates from the (unchanged)
984                // sidecar.
985                let crdt_fields = self.crdt_fields_for(ent)?;
986                let result = pg
987                    .store
988                    .with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
989                        pg.crdt
990                            .apply_patch(tx, entity, id, &crdt_fields, data)
991                            .map_err(|e| RuntimeError {
992                                code: "CRDT_APPLY_FAILED".into(),
993                                message: format!("crdt update {entity}/{id}: {e}"),
994                            })?;
995                        let updated = pylon_storage::pg_tx_store::tx_update(
996                            tx,
997                            &self.manifest,
998                            entity,
999                            id,
1000                            data,
1001                        )
1002                        .map_err(data_err_to_runtime)?;
1003                        if !updated {
1004                            // Roll back via Err so the snapshot doesn't
1005                            // commit against a missing row.
1006                            return Err(RuntimeError {
1007                                code: "ENTITY_NOT_FOUND".into(),
1008                                message: format!(
1009                                    "Update on {entity}/{id} found no row — refusing to commit \
1010                                 a CRDT snapshot that would orphan."
1011                                ),
1012                            });
1013                        }
1014                        // Refresh the cache from the just-persisted
1015                        // snapshot so post-commit reads on this process
1016                        // skip the re-hydration round-trip.
1017                        pg.crdt.cache_after_commit(tx, entity, id);
1018                        Ok(updated)
1019                    });
1020                if result.is_err() {
1021                    pg.crdt.evict(entity, id);
1022                    // ENTITY_NOT_FOUND from the inner closure is the
1023                    // intended return for "no such row" — translate
1024                    // into Ok(false) so callers see the same shape
1025                    // the SQLite path returns. Real errors (CRDT
1026                    // apply failed, BEGIN/COMMIT failed) propagate.
1027                    if let Err(ref e) = result {
1028                        if e.code == "ENTITY_NOT_FOUND" {
1029                            return Ok(false);
1030                        }
1031                    }
1032                }
1033                return result;
1034            }
1035            return pylon_http::DataStore::update(&pg.store, entity, id, data)
1036                .map_err(data_err_to_runtime);
1037        }
1038        let ent = self.require_entity(entity)?;
1039        let conn = self.lock_write_conn()?;
1040
1041        let obj = data.as_object().ok_or_else(|| RuntimeError {
1042            code: "INVALID_DATA".into(),
1043            message: "Update data must be a JSON object".into(),
1044        })?;
1045
1046        // Validate up-front and exit cheap if there's nothing to write.
1047        for key in obj.keys() {
1048            if key != "id" {
1049                validate_column_name(key, ent)?;
1050            }
1051        }
1052        let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
1053        if writable_keys.is_empty() {
1054            return Ok(false);
1055        }
1056
1057        // SQLite-only path past this point — see note in `insert()`.
1058        let sb = self.sqlite_backend()?;
1059
1060        // Atomic block — same shape as insert. CRDT snapshot, SQL UPDATE,
1061        // and FTS maintenance all commit together.
1062        let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
1063            if ent.crdt {
1064                let crdt_fields = self.crdt_fields_for(ent)?;
1065                sb.crdt
1066                    .apply_patch(&conn, entity, id, &crdt_fields, data)
1067                    .map_err(|e| RuntimeError {
1068                        code: "CRDT_APPLY_FAILED".into(),
1069                        message: format!("crdt write {entity}/{id}: {e}"),
1070                    })?;
1071            }
1072
1073            let mut set_clauses = Vec::new();
1074            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1075            let mut idx = 1;
1076            for key in &writable_keys {
1077                set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1078                values.push(json_to_sql(&obj[key.as_str()]));
1079                idx += 1;
1080            }
1081
1082            // Capture pre-UPDATE row for search-maintenance diff INSIDE the
1083            // tx. Matches the contract of search_maintenance::apply_update
1084            // — old state must be read before the UPDATE lands.
1085            let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1086            let old_row = if searchable {
1087                self.get_by_id_with_conn(&conn, entity, id)?
1088            } else {
1089                None
1090            };
1091
1092            values.push(Box::new(id.to_string()));
1093            let sql = format!(
1094                "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1095                quote_ident(entity),
1096                set_clauses.join(", ")
1097            );
1098
1099            let params: Vec<&dyn rusqlite::types::ToSql> =
1100                values.iter().map(|v| v.as_ref()).collect();
1101            let affected = conn
1102                .execute(&sql, params.as_slice())
1103                .map_err(|e| RuntimeError {
1104                    code: "UPDATE_FAILED".into(),
1105                    message: format!("Update {entity}/{id} failed: {e}"),
1106                })? as i64;
1107
1108            if affected > 0 && searchable {
1109                if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1110                    pylon_storage::search_maintenance::apply_update(
1111                        &conn, entity, id, &old, data, cfg,
1112                    )
1113                    .map_err(|e| RuntimeError {
1114                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1115                        message: format!("search index update on update {entity}: {e}"),
1116                    })?;
1117                }
1118            }
1119            Ok(affected)
1120        })?;
1121
1122        Ok(affected > 0)
1123    }
1124
1125    /// Delete a row by ID. Returns true if a row was actually deleted.
1126    pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
1127        if let Some(pg) = self.pg_backend() {
1128            let ent = self.require_entity(entity)?;
1129            if ent.crdt {
1130                // Sidecar delete + entity delete + FTS shadow delete
1131                // share one tx. Eviction of the in-memory cache runs
1132                // AFTER commit so a rolled-back delete leaves the
1133                // cache valid (the snapshot is still on disk).
1134                let result = pg
1135                    .store
1136                    .with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
1137                        tx.execute(
1138                            "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
1139                            &[&entity, &id],
1140                        )
1141                        .map_err(|e| RuntimeError {
1142                            code: "CRDT_SIDECAR_DELETE_FAILED".into(),
1143                            message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
1144                        })?;
1145                        pylon_storage::pg_tx_store::tx_delete(tx, &self.manifest, entity, id)
1146                            .map_err(data_err_to_runtime)
1147                    });
1148                // Evict regardless of whether tx_delete found a row —
1149                // we issued the sidecar DELETE inside the same tx, so
1150                // any cached doc is now stale even if the entity row
1151                // was already gone (orphan sidecar case codex flagged).
1152                // Only skip eviction if the WHOLE tx rolled back.
1153                if result.is_ok() {
1154                    pg.crdt.evict(entity, id);
1155                }
1156                return result;
1157            }
1158            return pylon_http::DataStore::delete(&pg.store, entity, id)
1159                .map_err(data_err_to_runtime);
1160        }
1161        let ent = self.require_entity(entity)?;
1162        let conn = self.lock_write_conn()?;
1163
1164        // Apply search-maintenance BEFORE the DELETE — we still need
1165        // the row's facet values to clear the bitmap bits.
1166        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1167        if searchable {
1168            if let (Some(cfg), Ok(Some(row))) = (
1169                ent.search.as_ref(),
1170                self.get_by_id_with_conn(&conn, entity, id),
1171            ) {
1172                pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
1173                    .map_err(|e| RuntimeError {
1174                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1175                        message: format!("search index update on delete {entity}: {e}"),
1176                    })?;
1177            }
1178        }
1179
1180        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1181        let affected = conn
1182            .execute(&sql, rusqlite::params![id])
1183            .map_err(|e| RuntimeError {
1184                code: "DELETE_FAILED".into(),
1185                message: format!("Delete {entity}/{id} failed: {e}"),
1186            })?;
1187
1188        Ok(affected > 0)
1189    }
1190
1191    /// Lookup a single row by a field value (e.g., email).
1192    pub fn lookup(
1193        &self,
1194        entity: &str,
1195        field: &str,
1196        value: &str,
1197    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1198        if let Some(pg) = self.pg_backend() {
1199            return pylon_http::DataStore::lookup(&pg.store, entity, field, value)
1200                .map_err(data_err_to_runtime);
1201        }
1202        let ent = self.require_entity(entity)?;
1203        validate_column_name(field, ent)?;
1204        let conn = self.lock_read_conn()?;
1205
1206        let sql = format!(
1207            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1208            quote_ident(entity),
1209            quote_ident(field)
1210        );
1211        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1212
1213        let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1214            stmt.query_row(rusqlite::params![value], |row| {
1215                Ok(row_to_json(row, &columns))
1216            })
1217            .ok()
1218        });
1219
1220        Ok(result)
1221    }
1222
1223    /// Link two entities by setting a foreign-key field.
1224    pub fn link(
1225        &self,
1226        entity: &str,
1227        id: &str,
1228        relation: &str,
1229        target_id: &str,
1230    ) -> Result<bool, RuntimeError> {
1231        let ent = self.require_entity(entity)?;
1232
1233        // Find the relation definition to determine which field to set.
1234        let rel = ent
1235            .relations
1236            .iter()
1237            .find(|r| r.name == relation)
1238            .ok_or_else(|| RuntimeError {
1239                code: "RELATION_NOT_FOUND".into(),
1240                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1241            })?;
1242
1243        let data = serde_json::json!({ rel.field.clone(): target_id });
1244        self.update(entity, id, &data)
1245    }
1246
1247    /// Unlink a relation by setting the foreign-key field to null.
1248    pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
1249        let ent = self.require_entity(entity)?;
1250
1251        let rel = ent
1252            .relations
1253            .iter()
1254            .find(|r| r.name == relation)
1255            .ok_or_else(|| RuntimeError {
1256                code: "RELATION_NOT_FOUND".into(),
1257                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1258            })?;
1259
1260        let data = serde_json::json!({ rel.field.clone(): null });
1261        self.update(entity, id, &data)
1262    }
1263
1264    /// Execute a filtered query with operators ($not, $gt, $in, $like, $order, $limit).
1265    pub fn query_filtered(
1266        &self,
1267        entity: &str,
1268        filter: &serde_json::Value,
1269    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1270        if let Some(pg) = self.pg_backend() {
1271            return pylon_http::DataStore::query_filtered(&pg.store, entity, filter)
1272                .map_err(data_err_to_runtime);
1273        }
1274        let ent = self.require_entity(entity)?;
1275        let conn = self.lock_read_conn()?;
1276
1277        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1278        let obj = filter
1279            .as_object()
1280            .unwrap_or(&serde_json::Map::new())
1281            .clone();
1282
1283        let mut where_clauses = Vec::new();
1284        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1285        let mut order_clause = String::new();
1286        let mut limit_clause = String::new();
1287        let mut join_clause = String::new();
1288        let mut fts_order = false;
1289        let mut idx = 1;
1290
1291        for (key, val) in &obj {
1292            match key.as_str() {
1293                "$order" => {
1294                    if let Some(order_obj) = val.as_object() {
1295                        let mut parts: Vec<String> = Vec::new();
1296                        for (col, dir) in order_obj {
1297                            validate_column_name(col, ent)?;
1298                            let d = match dir.as_str().unwrap_or("asc") {
1299                                "desc" | "DESC" => "DESC",
1300                                _ => "ASC",
1301                            };
1302                            parts.push(format!("{} {d}", quote_ident(col)));
1303                        }
1304                        if !parts.is_empty() {
1305                            order_clause = format!(" ORDER BY {}", parts.join(", "));
1306                        }
1307                    }
1308                }
1309                "$limit" => {
1310                    if let Some(n) = val.as_u64() {
1311                        limit_clause = format!(" LIMIT {n}");
1312                    }
1313                }
1314                "$offset" => {
1315                    if let Some(n) = val.as_u64() {
1316                        // SQLite requires LIMIT before OFFSET; add a default.
1317                        if limit_clause.is_empty() {
1318                            limit_clause = " LIMIT -1".into();
1319                        }
1320                        limit_clause = format!("{limit_clause} OFFSET {n}");
1321                    }
1322                }
1323                "$search" => {
1324                    if let Some(q) = val.as_str() {
1325                        // Join against the entity's FTS5 virtual table.
1326                        let fts = format!("{}_fts", entity);
1327                        join_clause = format!(
1328                            " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
1329                            fts = quote_ident(&fts),
1330                            ent = quote_ident(entity),
1331                        );
1332                        where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
1333                        values.push(Box::new(q.to_string()));
1334                        fts_order = true;
1335                        idx += 1;
1336                    }
1337                }
1338                _ => {
1339                    validate_column_name(key, ent)?;
1340                    let quoted_key = quote_ident(key);
1341
1342                    if let Some(op_obj) = val.as_object() {
1343                        for (op, op_val) in op_obj {
1344                            match op.as_str() {
1345                                "$not" => {
1346                                    where_clauses.push(format!("{quoted_key} != ?{idx}"));
1347                                    values.push(json_to_sql(op_val));
1348                                    idx += 1;
1349                                }
1350                                "$gt" => {
1351                                    where_clauses.push(format!("{quoted_key} > ?{idx}"));
1352                                    values.push(json_to_sql(op_val));
1353                                    idx += 1;
1354                                }
1355                                "$gte" => {
1356                                    where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1357                                    values.push(json_to_sql(op_val));
1358                                    idx += 1;
1359                                }
1360                                "$lt" => {
1361                                    where_clauses.push(format!("{quoted_key} < ?{idx}"));
1362                                    values.push(json_to_sql(op_val));
1363                                    idx += 1;
1364                                }
1365                                "$lte" => {
1366                                    where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1367                                    values.push(json_to_sql(op_val));
1368                                    idx += 1;
1369                                }
1370                                "$like" => {
1371                                    where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1372                                    let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1373                                    values.push(Box::new(pattern));
1374                                    idx += 1;
1375                                }
1376                                "$in" => {
1377                                    if let Some(arr) = op_val.as_array() {
1378                                        if arr.is_empty() {
1379                                            // Empty $in matches nothing.
1380                                            // Previously SQLite SKIPPED the
1381                                            // predicate (returning ALL rows)
1382                                            // while PG short-circuited to
1383                                            // FALSE — a real cross-backend
1384                                            // drift bug codex caught. Both
1385                                            // now emit `0` (false) so empty
1386                                            // $in returns an empty set.
1387                                            where_clauses.push("0".into());
1388                                        } else {
1389                                            let placeholders: Vec<String> = arr
1390                                                .iter()
1391                                                .map(|v| {
1392                                                    let p = format!("?{idx}");
1393                                                    values.push(json_to_sql(v));
1394                                                    idx += 1;
1395                                                    p
1396                                                })
1397                                                .collect();
1398                                            where_clauses.push(format!(
1399                                                "{quoted_key} IN ({})",
1400                                                placeholders.join(", ")
1401                                            ));
1402                                        }
1403                                    }
1404                                }
1405                                _ => {}
1406                            }
1407                        }
1408                    } else {
1409                        // Simple equality.
1410                        where_clauses.push(format!("{quoted_key} = ?{idx}"));
1411                        values.push(json_to_sql(val));
1412                        idx += 1;
1413                    }
1414                }
1415            }
1416        }
1417
1418        let where_sql = if where_clauses.is_empty() {
1419            String::new()
1420        } else {
1421            format!(" WHERE {}", where_clauses.join(" AND "))
1422        };
1423
1424        if order_clause.is_empty() {
1425            order_clause = if fts_order {
1426                // FTS joins default-order by bm25 relevance.
1427                " ORDER BY bm25(".to_string() + &quote_ident(&format!("{}_fts", entity)) + ")"
1428            } else {
1429                format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1430            };
1431        }
1432
1433        let select_prefix = format!("{}.*", quote_ident(entity));
1434        let sql = format!(
1435            "SELECT {} FROM {}{}{}{}{}",
1436            select_prefix,
1437            quote_ident(entity),
1438            join_clause,
1439            where_sql,
1440            order_clause,
1441            limit_clause
1442        );
1443        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1444            values.iter().map(|v| v.as_ref()).collect();
1445
1446        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1447            code: "QUERY_FAILED".into(),
1448            message: format!("Failed to prepare filtered query: {e}"),
1449        })?;
1450
1451        let rows = stmt
1452            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1453            .map_err(|e| RuntimeError {
1454                code: "QUERY_FAILED".into(),
1455                message: format!("Filtered query failed: {e}"),
1456            })?;
1457
1458        let mut result = Vec::new();
1459        for row in rows {
1460            if let Ok(val) = row {
1461                result.push(val);
1462            }
1463        }
1464        Ok(result)
1465    }
1466
1467    /// Execute a graph-style query.
1468    ///
1469    /// Input: `{ "User": { "where": { "email": "..." }, "include": { "posts": {} } } }`
1470    /// Returns nested results following relations.
1471    pub fn query_graph(
1472        &self,
1473        query: &serde_json::Value,
1474    ) -> Result<serde_json::Value, RuntimeError> {
1475        let obj = query.as_object().ok_or_else(|| RuntimeError {
1476            code: "INVALID_QUERY".into(),
1477            message: "Graph query must be a JSON object".into(),
1478        })?;
1479
1480        let mut results = serde_json::Map::new();
1481
1482        for (entity_name, query_opts) in obj {
1483            let _ent = self.require_entity(entity_name)?;
1484
1485            // Apply where clause if present.
1486            let filter = query_opts
1487                .get("where")
1488                .cloned()
1489                .unwrap_or(serde_json::json!({}));
1490            let rows = self.query_filtered(entity_name, &filter)?;
1491
1492            // Apply includes (relations) if present.
1493            let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1494            {
1495                // Internal invariant: if query_filtered succeeded above, the
1496                // entity must exist. Previously this used .unwrap() which
1497                // would panic if the invariant broke — a panic inside the
1498                // handler path poisons the connection mutex and takes down
1499                // all subsequent reads. Fail the request cleanly instead.
1500                let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1501                    code: "INVARIANT_BROKEN".into(),
1502                    message: format!(
1503                        "entity \"{entity_name}\" missing from registry during include expansion"
1504                    ),
1505                })?;
1506                rows.into_iter()
1507                    .map(|mut row| {
1508                        for (rel_name, _sub_query) in include {
1509                            if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1510                                let fk_value = row
1511                                    .get(&rel.field)
1512                                    .and_then(|v| v.as_str())
1513                                    .map(|s| s.to_string());
1514                                if let Some(fk) = fk_value {
1515                                    if rel.many {
1516                                        // One-to-many: find rows in target where field matches id.
1517                                        let sub_filter = serde_json::json!({ &rel.field: &fk });
1518                                        if let Ok(related) =
1519                                            self.query_filtered(&rel.target, &sub_filter)
1520                                        {
1521                                            row[rel_name] = serde_json::json!(related);
1522                                        }
1523                                    } else {
1524                                        // One-to-one / many-to-one: get by id.
1525                                        if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1526                                        {
1527                                            row[rel_name] = related;
1528                                        }
1529                                    }
1530                                }
1531                            }
1532                        }
1533                        row
1534                    })
1535                    .collect()
1536            } else {
1537                rows
1538            };
1539
1540            // Apply limit if present.
1541            let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1542                rows.into_iter().take(limit as usize).collect()
1543            } else {
1544                rows
1545            };
1546
1547            results.insert(entity_name.clone(), serde_json::json!(rows));
1548        }
1549
1550        Ok(serde_json::Value::Object(results))
1551    }
1552
1553    // -----------------------------------------------------------------------
1554    // Transaction-safe variants (use a pre-held connection guard)
1555    // -----------------------------------------------------------------------
1556
1557    /// Insert using an already-locked connection (for transactions).
1558    pub fn insert_with_conn(
1559        &self,
1560        conn: &Connection,
1561        entity: &str,
1562        data: &serde_json::Value,
1563    ) -> Result<String, RuntimeError> {
1564        let ent = self.require_entity(entity)?;
1565        let id = generate_id();
1566        let obj = data.as_object().ok_or_else(|| RuntimeError {
1567            code: "INVALID_DATA".into(),
1568            message: "Insert data must be a JSON object".into(),
1569        })?;
1570
1571        let mut col_names = vec![quote_ident("id")];
1572        let mut placeholders = vec!["?1".to_string()];
1573        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1574        let mut idx = 2;
1575        for (key, val) in obj {
1576            if key == "id" {
1577                continue;
1578            }
1579            validate_column_name(key, ent)?;
1580            col_names.push(quote_ident(key));
1581            placeholders.push(format!("?{idx}"));
1582            values.push(json_to_sql(val));
1583            idx += 1;
1584        }
1585
1586        let sql = format!(
1587            "INSERT INTO {} ({}) VALUES ({})",
1588            quote_ident(entity),
1589            col_names.join(", "),
1590            placeholders.join(", ")
1591        );
1592        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1593        conn.execute(&sql, params.as_slice())
1594            .map_err(|e| RuntimeError {
1595                code: "INSERT_FAILED".into(),
1596                message: format!("Insert into {entity} failed: {e}"),
1597            })?;
1598
1599        // Faceted-search maintenance in the same transaction. Skipped
1600        // for entities that don't declare `search:` in their schema.
1601        if let Some(cfg) = ent.search.as_ref() {
1602            if !cfg.is_empty() {
1603                pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1604                    .map_err(|e| RuntimeError {
1605                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1606                        message: format!("search index update on insert {entity}: {e}"),
1607                    })?;
1608            }
1609        }
1610
1611        Ok(id)
1612    }
1613
1614    /// Update using an already-locked connection (for transactions).
1615    pub fn update_with_conn(
1616        &self,
1617        conn: &Connection,
1618        entity: &str,
1619        id: &str,
1620        data: &serde_json::Value,
1621    ) -> Result<bool, RuntimeError> {
1622        let ent = self.require_entity(entity)?;
1623        let obj = data.as_object().ok_or_else(|| RuntimeError {
1624            code: "INVALID_DATA".into(),
1625            message: "Update data must be a JSON object".into(),
1626        })?;
1627
1628        let mut set_clauses = Vec::new();
1629        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1630        let mut idx = 1;
1631        for (key, val) in obj {
1632            if key == "id" {
1633                continue;
1634            }
1635            validate_column_name(key, ent)?;
1636            set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1637            values.push(json_to_sql(val));
1638            idx += 1;
1639        }
1640        if set_clauses.is_empty() {
1641            return Ok(false);
1642        }
1643
1644        // Capture the pre-UPDATE row if we need to diff facet values.
1645        // Read happens before the UPDATE so apply_update sees the OLD
1646        // state of any facet field. Cheap — single-row lookup on the
1647        // `id` primary-key index.
1648        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1649        let old_row = if searchable {
1650            self.get_by_id_with_conn(conn, entity, id)?
1651        } else {
1652            None
1653        };
1654
1655        values.push(Box::new(id.to_string()));
1656        let sql = format!(
1657            "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1658            quote_ident(entity),
1659            set_clauses.join(", ")
1660        );
1661        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1662        let affected = conn
1663            .execute(&sql, params.as_slice())
1664            .map_err(|e| RuntimeError {
1665                code: "UPDATE_FAILED".into(),
1666                message: format!("Update {entity}/{id} failed: {e}"),
1667            })?;
1668
1669        if affected > 0 && searchable {
1670            if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1671                pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1672                    .map_err(|e| RuntimeError {
1673                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1674                        message: format!("search index update on update {entity}: {e}"),
1675                    })?;
1676            }
1677        }
1678
1679        Ok(affected > 0)
1680    }
1681
1682    /// Delete using an already-locked connection (for transactions).
1683    pub fn delete_with_conn(
1684        &self,
1685        conn: &Connection,
1686        entity: &str,
1687        id: &str,
1688    ) -> Result<bool, RuntimeError> {
1689        let ent = self.require_entity(entity)?;
1690
1691        // Apply search maintenance BEFORE the DELETE so we still have
1692        // the row's facet values to diff against.
1693        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1694        if searchable {
1695            if let (Some(cfg), Ok(Some(row))) = (
1696                ent.search.as_ref(),
1697                self.get_by_id_with_conn(conn, entity, id),
1698            ) {
1699                pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1700                    .map_err(|e| RuntimeError {
1701                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1702                        message: format!("search index update on delete {entity}: {e}"),
1703                    })?;
1704            }
1705        }
1706
1707        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1708        let affected = conn
1709            .execute(&sql, rusqlite::params![id])
1710            .map_err(|e| RuntimeError {
1711                code: "DELETE_FAILED".into(),
1712                message: format!("Delete {entity}/{id} failed: {e}"),
1713            })?;
1714        Ok(affected > 0)
1715    }
1716
1717    /// Read a row by id using a pre-held connection (for transactions).
1718    pub fn get_by_id_with_conn(
1719        &self,
1720        conn: &Connection,
1721        entity: &str,
1722        id: &str,
1723    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1724        let ent = self.require_entity(entity)?;
1725        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1726        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1727            code: "QUERY_FAILED".into(),
1728            message: format!("Failed to prepare query: {e}"),
1729        })?;
1730        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1731        Ok(stmt
1732            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1733            .ok())
1734    }
1735
1736    /// List rows using a pre-held connection (for transactions).
1737    pub fn list_with_conn(
1738        &self,
1739        conn: &Connection,
1740        entity: &str,
1741    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1742        let ent = self.require_entity(entity)?;
1743        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1744        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1745            code: "QUERY_FAILED".into(),
1746            message: format!("Failed to prepare query: {e}"),
1747        })?;
1748        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1749        let rows = stmt
1750            .query_map([], |row| Ok(row_to_json(row, &columns)))
1751            .map_err(|e| RuntimeError {
1752                code: "QUERY_FAILED".into(),
1753                message: format!("Query failed: {e}"),
1754            })?;
1755        Ok(rows.flatten().collect())
1756    }
1757
1758    /// List after cursor using a pre-held connection (for transactions).
1759    pub fn list_after_with_conn(
1760        &self,
1761        conn: &Connection,
1762        entity: &str,
1763        after: Option<&str>,
1764        limit: usize,
1765    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1766        let ent = self.require_entity(entity)?;
1767        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1768        let table = quote_ident(entity);
1769        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1770            Some(cursor) => (
1771                format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1772                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1773            ),
1774            None => (
1775                format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1776                vec![Box::new(limit as i64)],
1777            ),
1778        };
1779        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1780            params.iter().map(|v| v.as_ref()).collect();
1781        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1782            code: "QUERY_FAILED".into(),
1783            message: format!("Failed to prepare: {e}"),
1784        })?;
1785        let rows = stmt
1786            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1787            .map_err(|e| RuntimeError {
1788                code: "QUERY_FAILED".into(),
1789                message: format!("Query failed: {e}"),
1790            })?;
1791        Ok(rows.flatten().collect())
1792    }
1793
1794    /// Lookup by field using a pre-held connection (for transactions).
1795    pub fn lookup_with_conn(
1796        &self,
1797        conn: &Connection,
1798        entity: &str,
1799        field: &str,
1800        value: &str,
1801    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1802        let ent = self.require_entity(entity)?;
1803        validate_column_name(field, ent)?;
1804        let sql = format!(
1805            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1806            quote_ident(entity),
1807            quote_ident(field)
1808        );
1809        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1810        Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1811            stmt.query_row(rusqlite::params![value], |row| {
1812                Ok(row_to_json(row, &columns))
1813            })
1814            .ok()
1815        }))
1816    }
1817
1818    /// Link relation using a pre-held connection (for transactions).
1819    pub fn link_with_conn(
1820        &self,
1821        conn: &Connection,
1822        entity: &str,
1823        id: &str,
1824        relation: &str,
1825        target_id: &str,
1826    ) -> Result<bool, RuntimeError> {
1827        let ent = self.require_entity(entity)?;
1828        let rel = ent
1829            .relations
1830            .iter()
1831            .find(|r| r.name == relation)
1832            .ok_or_else(|| RuntimeError {
1833                code: "RELATION_NOT_FOUND".into(),
1834                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1835            })?;
1836        let data = serde_json::json!({ rel.field.clone(): target_id });
1837        self.update_with_conn(conn, entity, id, &data)
1838    }
1839
1840    /// Unlink relation using a pre-held connection (for transactions).
1841    pub fn unlink_with_conn(
1842        &self,
1843        conn: &Connection,
1844        entity: &str,
1845        id: &str,
1846        relation: &str,
1847    ) -> Result<bool, RuntimeError> {
1848        let ent = self.require_entity(entity)?;
1849        let rel = ent
1850            .relations
1851            .iter()
1852            .find(|r| r.name == relation)
1853            .ok_or_else(|| RuntimeError {
1854                code: "RELATION_NOT_FOUND".into(),
1855                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1856            })?;
1857        let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1858        self.update_with_conn(conn, entity, id, &data)
1859    }
1860
1861    /// Query with filters using a pre-held connection (for transactions).
1862    ///
1863    /// Shares the filter-building logic with [`query_filtered`] by executing
1864    /// against the provided connection rather than acquiring one.
1865    pub fn query_filtered_with_conn(
1866        &self,
1867        conn: &Connection,
1868        entity: &str,
1869        filter: &serde_json::Value,
1870    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1871        let ent = self.require_entity(entity)?;
1872        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1873        let empty = serde_json::Map::new();
1874        let obj = filter.as_object().unwrap_or(&empty);
1875
1876        let mut where_clauses = Vec::new();
1877        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1878        let mut order_clause = String::new();
1879        let mut limit_clause = String::new();
1880        let mut idx = 1;
1881
1882        for (key, val) in obj {
1883            match key.as_str() {
1884                "$order" => {
1885                    if let Some(o) = val.as_object() {
1886                        let mut parts: Vec<String> = Vec::new();
1887                        for (col, dir) in o {
1888                            validate_column_name(col, ent)?;
1889                            let d = match dir.as_str().unwrap_or("asc") {
1890                                "desc" | "DESC" => "DESC",
1891                                _ => "ASC",
1892                            };
1893                            parts.push(format!("{} {d}", quote_ident(col)));
1894                        }
1895                        if !parts.is_empty() {
1896                            order_clause = format!(" ORDER BY {}", parts.join(", "));
1897                        }
1898                    }
1899                }
1900                "$limit" => {
1901                    if let Some(n) = val.as_u64() {
1902                        limit_clause = format!(" LIMIT {n}");
1903                    }
1904                }
1905                "$offset" => {
1906                    if let Some(n) = val.as_u64() {
1907                        if limit_clause.is_empty() {
1908                            limit_clause = " LIMIT -1".into();
1909                        }
1910                        limit_clause = format!("{limit_clause} OFFSET {n}");
1911                    }
1912                }
1913                _ => {
1914                    validate_column_name(key, ent)?;
1915                    let qk = quote_ident(key);
1916                    if let Some(op_obj) = val.as_object() {
1917                        for (op, op_val) in op_obj {
1918                            match op.as_str() {
1919                                "$not" => {
1920                                    where_clauses.push(format!("{qk} != ?{idx}"));
1921                                    values.push(json_to_sql(op_val));
1922                                    idx += 1;
1923                                }
1924                                "$gt" => {
1925                                    where_clauses.push(format!("{qk} > ?{idx}"));
1926                                    values.push(json_to_sql(op_val));
1927                                    idx += 1;
1928                                }
1929                                "$gte" => {
1930                                    where_clauses.push(format!("{qk} >= ?{idx}"));
1931                                    values.push(json_to_sql(op_val));
1932                                    idx += 1;
1933                                }
1934                                "$lt" => {
1935                                    where_clauses.push(format!("{qk} < ?{idx}"));
1936                                    values.push(json_to_sql(op_val));
1937                                    idx += 1;
1938                                }
1939                                "$lte" => {
1940                                    where_clauses.push(format!("{qk} <= ?{idx}"));
1941                                    values.push(json_to_sql(op_val));
1942                                    idx += 1;
1943                                }
1944                                "$like" => {
1945                                    where_clauses.push(format!("{qk} LIKE ?{idx}"));
1946                                    let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1947                                    values.push(Box::new(p));
1948                                    idx += 1;
1949                                }
1950                                "$in" => {
1951                                    if let Some(arr) = op_val.as_array() {
1952                                        let ph: Vec<String> = arr
1953                                            .iter()
1954                                            .map(|v| {
1955                                                let p = format!("?{idx}");
1956                                                values.push(json_to_sql(v));
1957                                                idx += 1;
1958                                                p
1959                                            })
1960                                            .collect();
1961                                        if !ph.is_empty() {
1962                                            where_clauses
1963                                                .push(format!("{qk} IN ({})", ph.join(", ")));
1964                                        }
1965                                    }
1966                                }
1967                                _ => {}
1968                            }
1969                        }
1970                    } else {
1971                        where_clauses.push(format!("{qk} = ?{idx}"));
1972                        values.push(json_to_sql(val));
1973                        idx += 1;
1974                    }
1975                }
1976            }
1977        }
1978
1979        let where_sql = if where_clauses.is_empty() {
1980            String::new()
1981        } else {
1982            format!(" WHERE {}", where_clauses.join(" AND "))
1983        };
1984        if order_clause.is_empty() {
1985            order_clause = " ORDER BY \"id\"".into();
1986        }
1987
1988        let sql = format!(
1989            "SELECT * FROM {}{}{}{}",
1990            quote_ident(entity),
1991            where_sql,
1992            order_clause,
1993            limit_clause
1994        );
1995        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1996            values.iter().map(|v| v.as_ref()).collect();
1997        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1998            code: "QUERY_FAILED".into(),
1999            message: format!("Failed to prepare: {e}"),
2000        })?;
2001        let rows = stmt
2002            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
2003            .map_err(|e| RuntimeError {
2004                code: "QUERY_FAILED".into(),
2005                message: format!("Query failed: {e}"),
2006            })?;
2007        Ok(rows.flatten().collect())
2008    }
2009
2010    /// Graph query using a pre-held connection (for transactions).
2011    pub fn query_graph_with_conn(
2012        &self,
2013        conn: &Connection,
2014        query: &serde_json::Value,
2015    ) -> Result<serde_json::Value, RuntimeError> {
2016        let obj = query.as_object().ok_or_else(|| RuntimeError {
2017            code: "INVALID_QUERY".into(),
2018            message: "Graph query must be a JSON object".into(),
2019        })?;
2020        let mut results = serde_json::Map::new();
2021        for (entity_name, query_opts) in obj {
2022            let _ent = self.require_entity(entity_name)?;
2023            let filter = query_opts
2024                .get("where")
2025                .cloned()
2026                .unwrap_or(serde_json::json!({}));
2027            let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
2028            results.insert(entity_name.clone(), serde_json::json!(rows));
2029        }
2030        Ok(serde_json::Value::Object(results))
2031    }
2032
2033    // -----------------------------------------------------------------------
2034    // Aggregations — count, sum, avg, min, max, group by
2035    // -----------------------------------------------------------------------
2036
2037    /// Run an aggregation query. See [`pylon_http::DataStore::aggregate`]
2038    /// for the spec shape.
2039    pub fn aggregate(
2040        &self,
2041        entity: &str,
2042        spec: &serde_json::Value,
2043    ) -> Result<serde_json::Value, RuntimeError> {
2044        if let Some(pg) = self.pg_backend() {
2045            return pylon_http::DataStore::aggregate(&pg.store, entity, spec)
2046                .map_err(data_err_to_runtime);
2047        }
2048        let ent = self.require_entity(entity)?;
2049        let conn = self.lock_read_conn()?;
2050        let obj = spec.as_object().ok_or_else(|| RuntimeError {
2051            code: "INVALID_QUERY".into(),
2052            message: "aggregate spec must be an object".into(),
2053        })?;
2054
2055        // Build the SELECT list.
2056        let mut select_parts: Vec<String> = Vec::new();
2057        let mut result_fields: Vec<String> = Vec::new();
2058
2059        if let Some(count) = obj.get("count") {
2060            match count {
2061                serde_json::Value::String(s) if s == "*" => {
2062                    select_parts.push("COUNT(*) AS count".into());
2063                    result_fields.push("count".into());
2064                }
2065                serde_json::Value::String(field) => {
2066                    validate_column_name(field, ent)?;
2067                    let alias = format!("count_{field}");
2068                    select_parts.push(format!(
2069                        "COUNT({}) AS {}",
2070                        quote_ident(field),
2071                        quote_ident(&alias)
2072                    ));
2073                    result_fields.push(alias);
2074                }
2075                _ => {}
2076            }
2077        }
2078
2079        for (fn_name, alias_prefix) in [
2080            ("sum", "sum_"),
2081            ("avg", "avg_"),
2082            ("min", "min_"),
2083            ("max", "max_"),
2084        ] {
2085            if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
2086                for field in fields {
2087                    if let Some(f) = field.as_str() {
2088                        validate_column_name(f, ent)?;
2089                        let alias = format!("{alias_prefix}{f}");
2090                        let sql_fn = fn_name.to_uppercase();
2091                        select_parts.push(format!(
2092                            "{}({}) AS {}",
2093                            sql_fn,
2094                            quote_ident(f),
2095                            quote_ident(&alias)
2096                        ));
2097                        result_fields.push(alias);
2098                    }
2099                }
2100            }
2101        }
2102
2103        // countDistinct — separate handler because COUNT(DISTINCT) is a
2104        // distinct SQL form from COUNT(field). Lets dashboards ask "how
2105        // many unique customers placed orders this month" without a
2106        // client-side post-processing pass.
2107        if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
2108            for field in fields {
2109                if let Some(f) = field.as_str() {
2110                    validate_column_name(f, ent)?;
2111                    let alias = format!("count_distinct_{f}");
2112                    select_parts.push(format!(
2113                        "COUNT(DISTINCT {}) AS {}",
2114                        quote_ident(f),
2115                        quote_ident(&alias)
2116                    ));
2117                    result_fields.push(alias);
2118                }
2119            }
2120        }
2121
2122        // Group-by fields come first in the SELECT so each row is identifiable.
2123        // Each entry is either a plain column name (string) or a date-bucket
2124        // spec — `{ field: "createdAt", bucket: "day" }`. Buckets map to
2125        // SQLite strftime patterns so aggregation keys collapse to the
2126        // bucket boundary (hour / day / week / month / year).
2127        let mut group_by: Vec<String> = Vec::new();
2128        let mut group_select: Vec<String> = Vec::new();
2129        let mut group_field_names: Vec<String> = Vec::new();
2130        if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
2131            for g in groups {
2132                if let Some(f) = g.as_str() {
2133                    validate_column_name(f, ent)?;
2134                    let quoted = quote_ident(f);
2135                    group_by.push(quoted.clone());
2136                    group_select.push(quoted);
2137                    group_field_names.push(f.to_string());
2138                } else if let Some(spec) = g.as_object() {
2139                    let field =
2140                        spec.get("field")
2141                            .and_then(|v| v.as_str())
2142                            .ok_or_else(|| RuntimeError {
2143                                code: "INVALID_QUERY".into(),
2144                                message: "groupBy object spec requires `field`".into(),
2145                            })?;
2146                    validate_column_name(field, ent)?;
2147                    let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
2148                    let fmt = match bucket {
2149                        "hour" => "%Y-%m-%d %H:00:00",
2150                        "day" => "%Y-%m-%d",
2151                        "month" => "%Y-%m",
2152                        "year" => "%Y",
2153                        "week" => "%Y-W%W",
2154                        _ => {
2155                            return Err(RuntimeError {
2156                                code: "INVALID_QUERY".into(),
2157                                message: format!(
2158                                    "bucket must be one of hour/day/week/month/year, got {bucket}"
2159                                ),
2160                            });
2161                        }
2162                    };
2163                    let alias = format!("{field}_{bucket}");
2164                    let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
2165                    group_by.push(expr.clone());
2166                    group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
2167                    group_field_names.push(alias);
2168                }
2169            }
2170        }
2171        let mut full_select = group_select.clone();
2172        full_select.extend(select_parts.iter().cloned());
2173        if full_select.is_empty() {
2174            return Err(RuntimeError {
2175                code: "INVALID_QUERY".into(),
2176                message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
2177            });
2178        }
2179
2180        // WHERE clause (reuse filter syntax, but only simple equality for now).
2181        let mut where_clauses = Vec::new();
2182        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
2183        let mut idx = 1;
2184        if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
2185            for (k, v) in where_obj {
2186                validate_column_name(k, ent)?;
2187                where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
2188                values.push(json_to_sql(v));
2189                idx += 1;
2190            }
2191        }
2192        let where_sql = if where_clauses.is_empty() {
2193            String::new()
2194        } else {
2195            format!(" WHERE {}", where_clauses.join(" AND "))
2196        };
2197
2198        let group_sql = if group_by.is_empty() {
2199            String::new()
2200        } else {
2201            format!(" GROUP BY {}", group_by.join(", "))
2202        };
2203
2204        let sql = format!(
2205            "SELECT {} FROM {}{}{}",
2206            full_select.join(", "),
2207            quote_ident(entity),
2208            where_sql,
2209            group_sql
2210        );
2211
2212        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
2213            values.iter().map(|v| v.as_ref()).collect();
2214        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
2215            code: "QUERY_FAILED".into(),
2216            message: format!("Failed to prepare aggregate: {e}"),
2217        })?;
2218
2219        let column_names: Vec<String> = {
2220            let mut v = group_field_names.clone();
2221            v.extend(result_fields.iter().cloned());
2222            v
2223        };
2224
2225        let rows = stmt
2226            .query_map(param_refs.as_slice(), |row| {
2227                let mut obj = serde_json::Map::new();
2228                for (i, name) in column_names.iter().enumerate() {
2229                    // Try int first (counts/sums), then float, then string, then null.
2230                    if let Ok(n) = row.get::<_, i64>(i) {
2231                        obj.insert(name.clone(), serde_json::Value::Number(n.into()));
2232                    } else if let Ok(f) = row.get::<_, f64>(i) {
2233                        if let Some(num) = serde_json::Number::from_f64(f) {
2234                            obj.insert(name.clone(), serde_json::Value::Number(num));
2235                        } else {
2236                            obj.insert(name.clone(), serde_json::Value::Null);
2237                        }
2238                    } else if let Ok(s) = row.get::<_, String>(i) {
2239                        obj.insert(name.clone(), serde_json::Value::String(s));
2240                    } else {
2241                        obj.insert(name.clone(), serde_json::Value::Null);
2242                    }
2243                }
2244                Ok(serde_json::Value::Object(obj))
2245            })
2246            .map_err(|e| RuntimeError {
2247                code: "QUERY_FAILED".into(),
2248                message: format!("Aggregate failed: {e}"),
2249            })?;
2250
2251        let mut result = Vec::new();
2252        for row in rows {
2253            if let Ok(val) = row {
2254                result.push(val);
2255            }
2256        }
2257        Ok(serde_json::json!({ "rows": result }))
2258    }
2259
2260    // -----------------------------------------------------------------------
2261    // Helpers
2262    // -----------------------------------------------------------------------
2263
2264    fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
2265        self.entities.get(name).ok_or_else(|| RuntimeError {
2266            code: "ENTITY_NOT_FOUND".into(),
2267            message: format!("Unknown entity: \"{name}\""),
2268        })
2269    }
2270
2271    /// Acquire the write connection. Used for INSERT, UPDATE, DELETE.
2272    /// SQLite-only — Postgres callers should never reach this (each
2273    /// public CRUD method branches at the top and dispatches to
2274    /// `PostgresDataStore` first). Returns `NOT_SQLITE_BACKEND` if
2275    /// invoked on a Postgres runtime, which indicates a missing dispatch.
2276    fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
2277        let sb = self.sqlite_backend()?;
2278        sb.write_conn.lock().map_err(|e| RuntimeError {
2279            code: "LOCK_FAILED".into(),
2280            message: format!("Failed to acquire write connection lock: {e}"),
2281        })
2282    }
2283
2284    /// Acquire a read connection. Uses the read pool if available (file-backed
2285    /// databases), otherwise falls back to the write connection (in-memory).
2286    /// Connections are selected round-robin to spread load evenly. SQLite-only.
2287    fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
2288        let sb = self.sqlite_backend()?;
2289        if !sb.read_pool.is_empty() {
2290            let idx = sb.read_counter.fetch_add(1, Ordering::Relaxed) % sb.read_pool.len();
2291            let guard = sb.read_pool[idx].lock().map_err(|e| RuntimeError {
2292                code: "LOCK_FAILED".into(),
2293                message: format!("Failed to acquire read connection: {e}"),
2294            })?;
2295            Ok(ReadConnGuard::Pooled(guard))
2296        } else {
2297            // Fall back to write connection for in-memory DBs.
2298            let guard = sb.write_conn.lock().map_err(|e| RuntimeError {
2299                code: "LOCK_FAILED".into(),
2300                message: format!("Failed to acquire connection: {e}"),
2301            })?;
2302            Ok(ReadConnGuard::Write(guard))
2303        }
2304    }
2305
2306    /// Borrow the SQLite backend, or fail with `NOT_SQLITE_BACKEND` if
2307    /// this runtime is Postgres-backed. Used by every SQLite-specific
2308    /// helper as a single point of dispatch.
2309    fn sqlite_backend(&self) -> Result<&SqliteBackend, RuntimeError> {
2310        match &self.backend {
2311            RuntimeBackend::Sqlite(sb) => Ok(sb),
2312            RuntimeBackend::Postgres(_) => Err(RuntimeError {
2313                code: "NOT_SQLITE_BACKEND".into(),
2314                message: "this operation requires a SQLite-backed Runtime".into(),
2315            }),
2316        }
2317    }
2318
2319    /// Borrow the Postgres backend, or `None` for SQLite. Used by the
2320    /// per-method dispatch at the top of each entity-CRUD function
2321    /// AND by the `DataStore` impl in `datastore.rs` to reach the
2322    /// CRDT sidecar.
2323    pub(crate) fn pg_backend(&self) -> Option<&PgBackend> {
2324        match &self.backend {
2325            RuntimeBackend::Sqlite(_) => None,
2326            RuntimeBackend::Postgres(pg) => Some(pg),
2327        }
2328    }
2329
2330    /// Borrow the underlying Postgres `DataStore` if this runtime is
2331    /// Postgres-backed. Used by the `DataStore` adapter in `datastore.rs`
2332    /// to delegate `transact`/`search` etc. without re-implementing them.
2333    /// Accessor for the underlying PostgresDataStore. Used by
2334    /// integration tests to exercise in-tx primitives directly
2335    /// without going through a TS function handler. Also useful for
2336    /// callers that need to drop down to raw PG (e.g. running an
2337    /// EXPLAIN against the live cluster from an admin tool).
2338    /// Returns None on SQLite-backed runtimes.
2339    pub fn pg_data_store_pub(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2340        self.pg_data_store()
2341    }
2342
2343    #[doc(hidden)]
2344    pub fn pg_data_store_for_tests(&self) -> &pylon_storage::pg_datastore::PostgresDataStore {
2345        self.pg_data_store().expect("pg backend")
2346    }
2347
2348    /// Test-only: run a closure inside a PG mutation tx with the
2349    /// CRDT hook installed — same code path FnOpsImpl::call uses
2350    /// for `Mutation` handlers. Lets integration tests verify the
2351    /// hook without spinning up a Bun runtime.
2352    #[doc(hidden)]
2353    pub fn run_in_pg_mutation_tx_for_tests<F, T, E>(&self, body: F) -> Result<T, E>
2354    where
2355        F: FnOnce(&dyn pylon_http::DataStore) -> Result<T, E>,
2356        E: From<pylon_http::DataError>,
2357    {
2358        let pg_backend = self.pg_backend().expect("pg backend");
2359        let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
2360            std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2361                crdt: std::sync::Arc::clone(&pg_backend.crdt),
2362                manifest: std::sync::Arc::new(self.manifest.clone()),
2363            });
2364        pg_backend.store.with_transaction_crdt(crdt_hook, body)
2365    }
2366
2367    pub(crate) fn pg_data_store(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2368        self.pg_backend().map(|pg| &pg.store)
2369    }
2370}
2371
2372// ---------------------------------------------------------------------------
2373// Helpers
2374// ---------------------------------------------------------------------------
2375
2376/// Generate a lex-sortable, monotonic-ish unique ID.
2377///
2378/// Same shape as `pylon_storage::postgres::generate_id` — fixed-width hex
2379/// of nanoseconds + 8-hex per-process counter (40 chars total). The fixed
2380/// width is what makes `WHERE id > $1 ORDER BY id` correct for cursor
2381/// pagination: variable-width hex sorts incorrectly at width boundaries
2382/// (e.g. `"ff"` lex-sorts after `"100"`).
2383/// Run `body` inside a SQLite transaction on `conn`. Commits on `Ok`,
2384/// rolls back on `Err` (or if `body` panics).
2385///
2386/// Used to make the multi-statement CRDT write paths (LoroDoc snapshot
2387/// upsert into `_pylon_crdt_snapshots` + the materialized entity row
2388/// INSERT/UPDATE + FTS / facet maintenance) atomic so a crash mid-write
2389/// can never leave the materialized view stale relative to the CRDT
2390/// snapshot. Uses unmanaged BEGIN/COMMIT/ROLLBACK rather than rusqlite's
2391/// `Transaction` API because the existing call sites borrow `conn`
2392/// through inner closures and the lifetime juggling for a `Transaction`
2393/// guard would force more refactoring than the explicit BEGIN/COMMIT.
2394///
2395/// `BEGIN IMMEDIATE` (vs the default `BEGIN DEFERRED`) takes the SQLite
2396/// reserved lock on entry instead of escalating later — matches the
2397/// pattern in `datastore.rs::transact` and avoids a SQLITE_BUSY race
2398/// where a concurrent reader prevents the lock upgrade mid-write.
2399fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
2400where
2401    F: FnOnce() -> Result<T, RuntimeError>,
2402{
2403    conn.execute("BEGIN IMMEDIATE", [])
2404        .map_err(|e| RuntimeError {
2405            code: "TX_BEGIN_FAILED".into(),
2406            message: format!("BEGIN: {e}"),
2407        })?;
2408    match body() {
2409        Ok(v) => {
2410            conn.execute("COMMIT", []).map_err(|e| RuntimeError {
2411                code: "TX_COMMIT_FAILED".into(),
2412                message: format!("COMMIT: {e}"),
2413            })?;
2414            Ok(v)
2415        }
2416        Err(e) => {
2417            // Best-effort rollback; if even ROLLBACK fails we surface
2418            // the *original* error since that's the more actionable one.
2419            let _ = conn.execute("ROLLBACK", []);
2420            Err(e)
2421        }
2422    }
2423}
2424
2425fn generate_id() -> String {
2426    use std::sync::atomic::{AtomicU32, Ordering};
2427    use std::time::{SystemTime, UNIX_EPOCH};
2428    static COUNTER: AtomicU32 = AtomicU32::new(0);
2429    let nanos = SystemTime::now()
2430        .duration_since(UNIX_EPOCH)
2431        .unwrap_or_default()
2432        .as_nanos();
2433    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2434    format!("{nanos:032x}{seq:08x}")
2435}
2436
2437/// Convert a `serde_json::Value` to a boxed `ToSql` for rusqlite.
2438fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2439    match val {
2440        serde_json::Value::Null => Box::new(rusqlite::types::Null),
2441        serde_json::Value::Bool(b) => Box::new(*b as i32),
2442        serde_json::Value::Number(n) => {
2443            if let Some(i) = n.as_i64() {
2444                Box::new(i)
2445            } else if let Some(f) = n.as_f64() {
2446                Box::new(f)
2447            } else {
2448                Box::new(n.to_string())
2449            }
2450        }
2451        serde_json::Value::String(s) => Box::new(s.clone()),
2452        other => Box::new(other.to_string()),
2453    }
2454}
2455
2456/// Convert a rusqlite row to a JSON value.
2457///
2458/// Reads columns by NAME (via the row's actual column metadata) rather
2459/// than by positional index. The previous implementation assumed the
2460/// SQLite table column order matched the manifest field order, which
2461/// silently breaks when a new field is inserted in the middle of the
2462/// manifest: SQLite's `ALTER TABLE ADD COLUMN` always appends to the
2463/// end of the table, so existing data lands in the wrong field on
2464/// every read.
2465///
2466/// `field_names` is still passed (unused in the body, kept for API
2467/// stability with callers that compute it from the manifest) — the
2468/// name set comes from the row itself now, which always matches the
2469/// SELECT's actual column shape.
2470fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2471    let mut obj = serde_json::Map::new();
2472
2473    let stmt = row.as_ref();
2474    let count = stmt.column_count();
2475    for i in 0..count {
2476        // Column names are short string slices into the prepared
2477        // statement; copy out into owned Strings before inserting into
2478        // the map (the slice borrow can't outlive the row).
2479        let name = match stmt.column_name(i) {
2480            Ok(n) => n.to_string(),
2481            Err(_) => continue,
2482        };
2483        let value = if let Ok(s) = row.get::<_, String>(i) {
2484            serde_json::Value::String(s)
2485        } else if let Ok(n) = row.get::<_, i64>(i) {
2486            serde_json::Value::Number(serde_json::Number::from(n))
2487        } else if let Ok(f) = row.get::<_, f64>(i) {
2488            serde_json::Number::from_f64(f)
2489                .map(serde_json::Value::Number)
2490                .unwrap_or(serde_json::Value::Null)
2491        } else {
2492            serde_json::Value::Null
2493        };
2494        obj.insert(name, value);
2495    }
2496
2497    serde_json::Value::Object(obj)
2498}
2499
2500#[cfg(test)]
2501mod tests {
2502    use super::*;
2503    use pylon_kernel::{ManifestField, ManifestIndex};
2504
2505    fn test_manifest() -> AppManifest {
2506        AppManifest {
2507            manifest_version: 1,
2508            name: "Test".into(),
2509            version: "0.1.0".into(),
2510            entities: vec![pylon_kernel::ManifestEntity {
2511                name: "User".into(),
2512                fields: vec![
2513                    ManifestField {
2514                        name: "email".into(),
2515                        field_type: "string".into(),
2516                        optional: false,
2517                        unique: true,
2518                        crdt: None,
2519                    },
2520                    ManifestField {
2521                        name: "displayName".into(),
2522                        field_type: "string".into(),
2523                        optional: false,
2524                        unique: false,
2525                        crdt: None,
2526                    },
2527                ],
2528                indexes: vec![ManifestIndex {
2529                    name: "user_email".into(),
2530                    fields: vec!["email".into()],
2531                    unique: true,
2532                }],
2533                relations: vec![],
2534                search: None,
2535                crdt: true,
2536            }],
2537            routes: vec![],
2538            queries: vec![],
2539            actions: vec![],
2540            policies: vec![],
2541            auth: Default::default(),
2542        }
2543    }
2544
2545    #[test]
2546    fn reset_for_tests_wipes_in_memory() {
2547        let rt = Runtime::in_memory(test_manifest()).unwrap();
2548        rt.insert(
2549            "User",
2550            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2551        )
2552        .unwrap();
2553        assert_eq!(rt.list("User").unwrap().len(), 1);
2554        rt.reset_for_tests().unwrap();
2555        assert_eq!(rt.list("User").unwrap().len(), 0);
2556    }
2557
2558    #[test]
2559    fn reset_for_tests_refuses_file_db() {
2560        let dir = std::env::temp_dir().join("pylon-reset-refuse");
2561        let _ = std::fs::create_dir_all(&dir);
2562        let db_path = dir.join("db.sqlite");
2563        let _ = std::fs::remove_file(&db_path);
2564        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2565        let err = rt.reset_for_tests().unwrap_err();
2566        assert_eq!(err.code, "RESET_REFUSED");
2567        let _ = std::fs::remove_file(&db_path);
2568    }
2569
2570    #[test]
2571    fn insert_and_get() {
2572        let rt = Runtime::in_memory(test_manifest()).unwrap();
2573        let id = rt
2574            .insert(
2575                "User",
2576                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2577            )
2578            .unwrap();
2579        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2580        assert_eq!(row["email"], "a@b.com");
2581    }
2582
2583    /// Regression: when a new field is added in the middle of a manifest,
2584    /// SQLite ALTER TABLE ADD COLUMN appends it to the end of the table.
2585    /// The previous `row_to_json` read columns by positional index in
2586    /// manifest order, so existing data shifted into the wrong fields
2587    /// on every read (createdAt's value showed up as the new field's,
2588    /// and vice versa). row_to_json now reads by column name from the
2589    /// row's own metadata, so the bug can't recur regardless of
2590    /// migration order.
2591    #[test]
2592    fn row_to_json_handles_columns_added_out_of_manifest_order() {
2593        // Manifest: id, email, displayName, avatarColor, createdAt
2594        let mut manifest = test_manifest();
2595        manifest.entities[0].fields = vec![
2596            ManifestField {
2597                name: "email".into(),
2598                field_type: "string".into(),
2599                optional: false,
2600                unique: true,
2601                crdt: None,
2602            },
2603            ManifestField {
2604                name: "displayName".into(),
2605                field_type: "string".into(),
2606                optional: false,
2607                unique: false,
2608                crdt: None,
2609            },
2610            ManifestField {
2611                name: "avatarColor".into(),
2612                field_type: "string".into(),
2613                optional: true,
2614                unique: false,
2615                crdt: None,
2616            },
2617            ManifestField {
2618                name: "createdAt".into(),
2619                field_type: "datetime".into(),
2620                optional: true,
2621                unique: false,
2622                crdt: None,
2623            },
2624        ];
2625        // Important: turn off CRDT mode for this test — CRDT mode writes
2626        // the projection back to SQLite explicitly per-field, so it
2627        // wouldn't exercise the column-order bug we're regressing
2628        // against. The bug bites the legacy path that still does
2629        // `INSERT (id, email, displayName, ...) VALUES (...)` and then
2630        // `SELECT * ... → row_to_json` to read it back.
2631        manifest.entities[0].crdt = false;
2632        let rt = Runtime::in_memory(manifest).unwrap();
2633        let id = rt
2634            .insert(
2635                "User",
2636                &serde_json::json!({
2637                    "email": "a@b.com",
2638                    "displayName": "Alice",
2639                    "avatarColor": "#abc",
2640                    "createdAt": "2026-01-01T00:00:00Z",
2641                }),
2642            )
2643            .unwrap();
2644
2645        // Simulate an ALTER TABLE ADD COLUMN that appends a new field
2646        // at the end of the SQLite table even though the manifest
2647        // places it in the middle. This is the exact shape of what
2648        // happens when a user adds a new field between existing ones
2649        // and pylon dev migrates the table forward.
2650        {
2651            let conn = rt.lock_write_conn().unwrap();
2652            conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2653                .unwrap();
2654            conn.execute(
2655                "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2656                rusqlite::params!["hashed-password", &id],
2657            )
2658            .unwrap();
2659        }
2660        // Update the in-memory manifest to reflect the new field
2661        // sitting between avatarColor and createdAt — this is what the
2662        // regenerated manifest would look like.
2663        // (We mutate via the storage path to mirror the actual flow.)
2664
2665        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2666        // The crucial assertions: each column maps to its own value,
2667        // not the value of whichever column happens to share its
2668        // SQLite position.
2669        assert_eq!(row["email"], "a@b.com");
2670        assert_eq!(row["displayName"], "Alice");
2671        assert_eq!(row["avatarColor"], "#abc");
2672        assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2673        assert_eq!(row["passwordHash"], "hashed-password");
2674    }
2675
2676    /// CRDT-mode entities (the default) populate the sidecar snapshot
2677    /// table on every write — the LoroDoc is the source of truth, the
2678    /// SQLite row is the materialized projection. This proves the CRDT
2679    /// branch in `insert` actually fires.
2680    #[test]
2681    fn crdt_default_writes_through_loro_store() {
2682        let rt = Runtime::in_memory(test_manifest()).unwrap();
2683        let id = rt
2684            .insert(
2685                "User",
2686                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2687            )
2688            .unwrap();
2689
2690        // Sidecar contains exactly one snapshot for the new row.
2691        let conn = rt.lock_write_conn().unwrap();
2692        let snap_count: i64 = conn
2693            .query_row(
2694                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2695                 WHERE entity = ?1 AND row_id = ?2",
2696                rusqlite::params!["User", &id],
2697                |r| r.get(0),
2698            )
2699            .unwrap();
2700        assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2701
2702        // Loro doc is cached in memory after the write — proves
2703        // get_or_hydrate ran during apply_patch.
2704        assert!(rt.crdt_store().cached_rows() >= 1);
2705
2706        // SQLite materialized view has the projected row.
2707        drop(conn);
2708        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2709        assert_eq!(row["email"], "x@y.com");
2710        assert_eq!(row["displayName"], "Eric");
2711    }
2712
2713    /// Updates write through the LoroDoc as well — verifies the sidecar
2714    /// snapshot grows (Loro tracks new ops) and the materialized row
2715    /// reflects the new value.
2716    #[test]
2717    fn crdt_update_persists_new_snapshot() {
2718        let rt = Runtime::in_memory(test_manifest()).unwrap();
2719        let id = rt
2720            .insert(
2721                "User",
2722                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2723            )
2724            .unwrap();
2725
2726        let snap_after_insert: Vec<u8> = {
2727            let conn = rt.lock_write_conn().unwrap();
2728            conn.query_row(
2729                "SELECT snapshot FROM _pylon_crdt_snapshots
2730                 WHERE entity = 'User' AND row_id = ?1",
2731                rusqlite::params![&id],
2732                |r| r.get(0),
2733            )
2734            .unwrap()
2735        };
2736
2737        rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2738            .unwrap();
2739
2740        let snap_after_update: Vec<u8> = {
2741            let conn = rt.lock_write_conn().unwrap();
2742            conn.query_row(
2743                "SELECT snapshot FROM _pylon_crdt_snapshots
2744                 WHERE entity = 'User' AND row_id = ?1",
2745                rusqlite::params![&id],
2746                |r| r.get(0),
2747            )
2748            .unwrap()
2749        };
2750
2751        assert_ne!(
2752            snap_after_insert, snap_after_update,
2753            "snapshot bytes should change after an update"
2754        );
2755
2756        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2757        assert_eq!(row["displayName"], "Eric C");
2758        assert_eq!(row["email"], "x@y.com");
2759    }
2760
2761    /// Regression: when the SQL INSERT step inside Runtime::insert fails
2762    /// (UNIQUE-constraint violation here), the LoroDoc snapshot must
2763    /// also roll back — neither half lands. Previously the LoroStore
2764    /// wrote first and committed independently, so a doomed INSERT left
2765    /// a sidecar row pointing at a doc that the materialized table
2766    /// never knew about.
2767    #[test]
2768    fn crdt_insert_rolls_back_when_sql_step_fails() {
2769        let rt = Runtime::in_memory(test_manifest()).unwrap();
2770        // Seed a row.
2771        rt.insert(
2772            "User",
2773            &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2774        )
2775        .unwrap();
2776
2777        // Snapshot the sidecar row count BEFORE the failing insert.
2778        let snap_count_before: i64 = {
2779            let conn = rt.lock_write_conn().unwrap();
2780            conn.query_row(
2781                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2782                [],
2783                |r| r.get(0),
2784            )
2785            .unwrap()
2786        };
2787
2788        // Attempt a duplicate-email insert. SQL UNIQUE rejects.
2789        let err = rt
2790            .insert(
2791                "User",
2792                &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2793            )
2794            .expect_err("duplicate email must fail");
2795        assert_eq!(err.code, "INSERT_FAILED");
2796
2797        // Sidecar row count unchanged — the LoroDoc snapshot the CRDT
2798        // path wrote was rolled back along with the failed SQL INSERT.
2799        let snap_count_after: i64 = {
2800            let conn = rt.lock_write_conn().unwrap();
2801            conn.query_row(
2802                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2803                [],
2804                |r| r.get(0),
2805            )
2806            .unwrap()
2807        };
2808        assert_eq!(
2809            snap_count_after, snap_count_before,
2810            "failed insert should not leave a sidecar snapshot behind"
2811        );
2812    }
2813
2814    /// Entities with `crdt: false` skip the LoroDoc entirely — no sidecar
2815    /// row, no Loro cache entry. Proves the opt-out actually opts out.
2816    #[test]
2817    fn crdt_false_skips_loro_store() {
2818        let mut manifest = test_manifest();
2819        // Flip the User entity to LWW-only mode.
2820        manifest.entities[0].crdt = false;
2821        let rt = Runtime::in_memory(manifest).unwrap();
2822
2823        let id = rt
2824            .insert(
2825                "User",
2826                &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2827            )
2828            .unwrap();
2829
2830        let conn = rt.lock_write_conn().unwrap();
2831        let snap_count: i64 = conn
2832            .query_row(
2833                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2834                 WHERE entity = 'User' AND row_id = ?1",
2835                rusqlite::params![&id],
2836                |r| r.get(0),
2837            )
2838            .unwrap();
2839        assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2840        assert_eq!(
2841            rt.crdt_store().cached_rows(),
2842            0,
2843            "crdt:false should not warm the cache"
2844        );
2845
2846        // SQLite path still works — the row landed via the legacy
2847        // direct-write path.
2848        drop(conn);
2849        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2850        assert_eq!(row["email"], "lww@example.com");
2851    }
2852
2853    #[test]
2854    fn list_entities() {
2855        let rt = Runtime::in_memory(test_manifest()).unwrap();
2856        rt.insert(
2857            "User",
2858            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2859        )
2860        .unwrap();
2861        rt.insert(
2862            "User",
2863            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2864        )
2865        .unwrap();
2866        let rows = rt.list("User").unwrap();
2867        assert_eq!(rows.len(), 2);
2868    }
2869
2870    #[test]
2871    fn update_entity() {
2872        let rt = Runtime::in_memory(test_manifest()).unwrap();
2873        let id = rt
2874            .insert(
2875                "User",
2876                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2877            )
2878            .unwrap();
2879        let updated = rt
2880            .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2881            .unwrap();
2882        assert!(updated);
2883        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2884        assert_eq!(row["displayName"], "Updated");
2885    }
2886
2887    #[test]
2888    fn delete_entity() {
2889        let rt = Runtime::in_memory(test_manifest()).unwrap();
2890        let id = rt
2891            .insert(
2892                "User",
2893                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2894            )
2895            .unwrap();
2896        let deleted = rt.delete("User", &id).unwrap();
2897        assert!(deleted);
2898        assert!(rt.get_by_id("User", &id).unwrap().is_none());
2899    }
2900
2901    #[test]
2902    fn lookup_by_field() {
2903        let rt = Runtime::in_memory(test_manifest()).unwrap();
2904        rt.insert(
2905            "User",
2906            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2907        )
2908        .unwrap();
2909        let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2910        assert_eq!(row["displayName"], "A");
2911    }
2912
2913    #[test]
2914    fn unknown_entity_returns_error() {
2915        let rt = Runtime::in_memory(test_manifest()).unwrap();
2916        let err = rt.list("Nonexistent").unwrap_err();
2917        assert_eq!(err.code, "ENTITY_NOT_FOUND");
2918    }
2919
2920    #[test]
2921    fn insert_rejects_unknown_column() {
2922        let rt = Runtime::in_memory(test_manifest()).unwrap();
2923        let err = rt
2924            .insert(
2925                "User",
2926                &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2927            )
2928            .unwrap_err();
2929        assert_eq!(err.code, "INVALID_COLUMN");
2930    }
2931
2932    #[test]
2933    fn update_rejects_unknown_column() {
2934        let rt = Runtime::in_memory(test_manifest()).unwrap();
2935        let id = rt
2936            .insert(
2937                "User",
2938                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2939            )
2940            .unwrap();
2941        let err = rt
2942            .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2943            .unwrap_err();
2944        assert_eq!(err.code, "INVALID_COLUMN");
2945    }
2946
2947    #[test]
2948    fn lookup_rejects_unknown_column() {
2949        let rt = Runtime::in_memory(test_manifest()).unwrap();
2950        let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2951        assert_eq!(err.code, "INVALID_COLUMN");
2952    }
2953
2954    #[test]
2955    fn query_filtered_rejects_unknown_column() {
2956        let rt = Runtime::in_memory(test_manifest()).unwrap();
2957        let err = rt
2958            .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2959            .unwrap_err();
2960        assert_eq!(err.code, "INVALID_COLUMN");
2961    }
2962
2963    #[test]
2964    fn query_filtered_rejects_unknown_order_column() {
2965        let rt = Runtime::in_memory(test_manifest()).unwrap();
2966        let err = rt
2967            .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2968            .unwrap_err();
2969        assert_eq!(err.code, "INVALID_COLUMN");
2970    }
2971
2972    #[test]
2973    fn query_filtered_sanitizes_order_direction() {
2974        let rt = Runtime::in_memory(test_manifest()).unwrap();
2975        rt.insert(
2976            "User",
2977            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2978        )
2979        .unwrap();
2980        // Even a malicious direction value should be normalized to ASC.
2981        let rows = rt
2982            .query_filtered(
2983                "User",
2984                &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2985            )
2986            .unwrap();
2987        assert_eq!(rows.len(), 1);
2988    }
2989
2990    #[test]
2991    fn in_memory_has_no_read_pool() {
2992        let rt = Runtime::in_memory(test_manifest()).unwrap();
2993        assert_eq!(rt.read_pool_size(), 0);
2994    }
2995
2996    #[test]
2997    fn open_creates_read_pool() {
2998        let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2999        std::fs::create_dir_all(&dir).unwrap();
3000        let db_path = dir.join("test_read_pool.db");
3001
3002        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
3003        assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
3004
3005        // Write then read through the pool.
3006        let id = rt
3007            .insert(
3008                "User",
3009                &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
3010            )
3011            .unwrap();
3012        let row = rt.get_by_id("User", &id).unwrap().unwrap();
3013        assert_eq!(row["email"], "pool@test.com");
3014
3015        // Clean up.
3016        let _ = std::fs::remove_dir_all(&dir);
3017    }
3018
3019    #[test]
3020    fn concurrent_reads_dont_block_on_write() {
3021        use std::sync::Arc;
3022
3023        let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
3024        std::fs::create_dir_all(&dir).unwrap();
3025        let db_path = dir.join("test_concurrent.db");
3026
3027        let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
3028
3029        // Seed some data so reads have something to return.
3030        rt.insert(
3031            "User",
3032            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
3033        )
3034        .unwrap();
3035        rt.insert(
3036            "User",
3037            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
3038        )
3039        .unwrap();
3040
3041        // Hold the write lock to simulate a long write.
3042        let write_guard = rt.lock_write_conn().unwrap();
3043
3044        // Spawn reader threads that should succeed despite the held write lock.
3045        let mut handles = Vec::new();
3046        for _ in 0..4 {
3047            let rt_clone = Arc::clone(&rt);
3048            handles.push(std::thread::spawn(move || {
3049                let rows = rt_clone.list("User").unwrap();
3050                assert_eq!(rows.len(), 2);
3051            }));
3052        }
3053
3054        for h in handles {
3055            h.join().expect("reader thread panicked");
3056        }
3057
3058        // Release the write lock.
3059        drop(write_guard);
3060
3061        // Clean up.
3062        let _ = std::fs::remove_dir_all(&dir);
3063    }
3064}