Skip to main content

pylon_runtime/
lib.rs

1pub mod account_backend;
2pub mod api_key_backend;
3pub mod cache_handlers;
4pub mod cache_server;
5pub mod config;
6pub mod cron;
7pub mod datastore;
8pub mod ip_limit;
9pub mod job_store;
10pub mod jobs;
11pub mod log;
12pub mod loro_store;
13pub mod pg_loro_store;
14pub mod magic_code_backend;
15pub mod metrics;
16pub mod oauth_backend;
17pub mod org_backend;
18pub mod openapi;
19pub mod presence;
20pub mod pubsub;
21pub mod rate_limit;
22pub mod resp;
23pub mod resp_server;
24pub mod rooms;
25pub mod scheduler;
26pub mod server;
27pub mod session_backend;
28pub mod shard_ws;
29pub mod sse;
30pub mod tls;
31pub mod workflow_store;
32pub mod workflows;
33pub mod ws;
34
35use std::collections::HashMap;
36use std::sync::atomic::{AtomicUsize, Ordering};
37use std::sync::Mutex;
38
39use pylon_kernel::{AppManifest, ManifestEntity};
40use rusqlite::Connection;
41
42// ---------------------------------------------------------------------------
43// Runtime errors
44// ---------------------------------------------------------------------------
45
46#[derive(Debug, Clone)]
47pub struct RuntimeError {
48    pub code: String,
49    pub message: String,
50}
51
52impl std::fmt::Display for RuntimeError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "[{}] {}", self.code, self.message)
55    }
56}
57
58impl std::error::Error for RuntimeError {}
59
60/// Lift a `DataError` (the cross-crate error type for PG `DataStore`
61/// operations) into a `RuntimeError`. Used by `PostgresDataStore`
62/// closure bounds (`with_client`, `with_transaction`) so callers in
63/// the runtime can propagate PG errors with their native error type.
64impl From<pylon_http::DataError> for RuntimeError {
65    fn from(e: pylon_http::DataError) -> Self {
66        RuntimeError {
67            code: e.code,
68            message: e.message,
69        }
70    }
71}
72
73// ---------------------------------------------------------------------------
74// SQL safety helpers
75// ---------------------------------------------------------------------------
76
77/// Quote a SQL identifier with double quotes to prevent injection.
78/// Any embedded double quotes are escaped by doubling them (SQL standard).
79fn quote_ident(name: &str) -> String {
80    format!("\"{}\"", name.replace('"', "\"\""))
81}
82
83/// Validate that `name` is a known column on the given entity.
84/// Always allows "id" (the primary key). Returns an error listing valid
85/// columns when validation fails.
86fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
87    if name == "id" {
88        return Ok(());
89    }
90    if entity.fields.iter().any(|f| f.name == name) {
91        return Ok(());
92    }
93    Err(RuntimeError {
94        code: "INVALID_COLUMN".into(),
95        message: format!(
96            "Unknown column \"{}\" -- valid columns: id, {}",
97            name,
98            entity
99                .fields
100                .iter()
101                .map(|f| f.name.as_str())
102                .collect::<Vec<_>>()
103                .join(", ")
104        ),
105    })
106}
107
108// ---------------------------------------------------------------------------
109// Connection tuning
110// ---------------------------------------------------------------------------
111
112/// Apply the production pragma set on a SQLite connection. Identical
113/// values to `pylon_storage::sqlite::tune_connection` — kept here too
114/// because the Runtime opens its own connections directly (write +
115/// read pool) without going through the storage adapter.
116///
117/// See `crates/storage/src/sqlite.rs` for the rationale on each
118/// pragma. Skipping it on writes drops throughput by 5–10×.
119fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
120    let pragmas: &[(&str, &str)] = if in_memory {
121        &[
122            ("temp_store", "MEMORY"),
123            ("cache_size", "-65536"),
124            ("foreign_keys", "ON"),
125        ]
126    } else {
127        &[
128            ("journal_mode", "WAL"),
129            ("synchronous", "NORMAL"),
130            ("cache_size", "-65536"),
131            ("mmap_size", "268435456"),
132            ("temp_store", "MEMORY"),
133            ("busy_timeout", "5000"),
134            ("foreign_keys", "ON"),
135            ("wal_autocheckpoint", "1000"),
136        ]
137    };
138    for (key, value) in pragmas {
139        let _ = conn.pragma_update(None, key, value);
140    }
141}
142
143// ---------------------------------------------------------------------------
144// Read connection guard
145// ---------------------------------------------------------------------------
146
147/// A guard that dereferences to a `Connection`, abstracting over whether
148/// it came from the read pool or fell back to the write connection.
149enum ReadConnGuard<'a> {
150    Pooled(std::sync::MutexGuard<'a, Connection>),
151    Write(std::sync::MutexGuard<'a, Connection>),
152}
153
154impl<'a> std::ops::Deref for ReadConnGuard<'a> {
155    type Target = Connection;
156    fn deref(&self) -> &Connection {
157        match self {
158            ReadConnGuard::Pooled(g) => g,
159            ReadConnGuard::Write(g) => g,
160        }
161    }
162}
163
164// ---------------------------------------------------------------------------
165// Runtime — the core execution engine
166// ---------------------------------------------------------------------------
167
168/// A manifest-driven runtime that executes CRUD operations against an
169/// underlying data store. Two backends are supported:
170///
171/// - **SQLite** (default): single-process, file-or-memory, with a write
172///   mutex + read pool, FTS5 search, and per-row LoroDoc CRDT snapshots.
173/// - **Postgres**: live cluster, suitable for multi-replica deployments.
174///   Routes entity CRUD through [`pylon_storage::pg_datastore::PostgresDataStore`].
175///   CRDT mode and FTS5-shaped search are SQLite-only at this layer; the
176///   Postgres backend returns `NOT_SUPPORTED` for those paths and the router
177///   degrades to JSON change events (no binary CRDT broadcasts).
178///
179/// Pick a backend by passing a `postgres://` URL to [`Runtime::open`]; any
180/// other string is treated as a SQLite filesystem path.
181pub struct Runtime {
182    backend: RuntimeBackend,
183    manifest: AppManifest,
184    entities: HashMap<String, ManifestEntity>,
185    /// True only for the SQLite in-memory variant. Postgres mode reports false.
186    /// Gates the test-reset endpoint — a false positive here would let
187    /// `/api/__test__/reset` truncate real tables.
188    is_in_memory: bool,
189}
190
191/// Backend storage for entity CRUD. SQLite variant owns the connection
192/// pool and CRDT cache; Postgres variant wraps a `PostgresDataStore`.
193enum RuntimeBackend {
194    Sqlite(SqliteBackend),
195    Postgres(PgBackend),
196}
197
198/// SQLite-backed entity store. WAL mode allows one writer and multiple
199/// concurrent readers — the struct exploits this with a single write
200/// connection behind a mutex plus a pool of read-only connections.
201struct SqliteBackend {
202    /// Write connection — single mutex, serializes writes.
203    write_conn: Mutex<Connection>,
204    /// Read connections — pool of connections for concurrent reads.
205    /// Empty for in-memory databases where extra connections are not possible.
206    read_pool: Vec<Mutex<Connection>>,
207    /// Counter for round-robin read pool selection.
208    read_counter: AtomicUsize,
209    /// Per-row LoroDoc cache + sidecar persistence. Used for entities with
210    /// `crdt: true` (the default). Reads still hit SQLite directly via the
211    /// read pool — the LoroDoc just produces the projected JSON that gets
212    /// materialized into SQLite columns on every write.
213    crdt: crate::loro_store::LoroStore,
214}
215
216/// Postgres-backed entity store. Wraps `PostgresDataStore` from
217/// pylon-storage and delegates the `DataStore` surface directly.
218pub(crate) struct PgBackend {
219    pub(crate) store: pylon_storage::pg_datastore::PostgresDataStore,
220    /// Per-row LoroDoc snapshot store for entities with `crdt: true`.
221    /// Arc'd so the runtime layer can hand a clone to PgCrdtHookImpl
222    /// (the bridge that lets PgTxStore call back into CRDT machinery
223    /// from inside a held tx).
224    pub(crate) crdt: std::sync::Arc<crate::pg_loro_store::PgLoroStore>,
225}
226
227/// Number of read-only connections to open in the pool.
228const READ_POOL_SIZE: usize = 4;
229
230/// True iff `url` is a Postgres connection string. Treats `postgres://`,
231/// `postgresql://`, and the ambient-credentials forms as PG; everything
232/// else is interpreted as a SQLite filesystem path.
233fn is_postgres_url(url: &str) -> bool {
234    let lower = url.to_ascii_lowercase();
235    lower.starts_with("postgres://") || lower.starts_with("postgresql://")
236}
237
238/// Convert a `pylon_http::DataError` (returned by `PostgresDataStore`)
239/// into the runtime's error type. The codes round-trip; only the type
240/// changes.
241fn data_err_to_runtime(e: pylon_http::DataError) -> RuntimeError {
242    RuntimeError {
243        code: e.code,
244        message: e.message,
245    }
246}
247
248impl Runtime {
249    /// Open a runtime against either a SQLite file path or a Postgres URL.
250    ///
251    /// Backend selection is by URL prefix:
252    /// - `postgres://...` or `postgresql://...` → Postgres (requires the
253    ///   `postgres-live` feature on `pylon-storage`, enabled by default).
254    /// - Anything else → SQLite, treating the string as a filesystem path
255    ///   (`":memory:"` works via `Runtime::in_memory` instead).
256    pub fn open(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
257        if is_postgres_url(url) {
258            Self::open_postgres(url, manifest)
259        } else {
260            let conn = Connection::open(url).map_err(|e| RuntimeError {
261                code: "RUNTIME_OPEN_FAILED".into(),
262                message: format!("Failed to open database: {e}"),
263            })?;
264            Self::from_connection(conn, manifest, false)
265        }
266    }
267
268    /// Open a runtime backed by a live Postgres cluster.
269    ///
270    /// Schema must be applied separately via `pylon migrate` / the
271    /// storage adapter's plan apply path — Runtime does not auto-create
272    /// tables on Postgres (in contrast to SQLite, where `from_connection`
273    /// runs CREATE TABLE IF NOT EXISTS on every open). This matches how
274    /// production Postgres deployments are typically managed: schema is
275    /// migrated via a controlled, observable step, not as a side effect
276    /// of the server starting up.
277    pub fn open_postgres(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
278        let store = pylon_storage::pg_datastore::PostgresDataStore::connect(url, manifest.clone())
279            .map_err(data_err_to_runtime)?;
280        // Bootstrap the CRDT sidecar table on every open. Idempotent
281        // (`CREATE TABLE IF NOT EXISTS`); same shape as the SQLite
282        // path's `ensure_sidecar` call. Without this, the first
283        // CRDT-mode write would error because `_pylon_crdt_snapshots`
284        // doesn't exist yet on a fresh PG database.
285        store.with_client(|c| crate::pg_loro_store::ensure_sidecar(c)).map_err(|e| {
286            RuntimeError {
287                code: "CRDT_SIDECAR_BOOTSTRAP_FAILED".into(),
288                message: format!("ensure pg crdt sidecar: {e}"),
289            }
290        })?;
291        let entities: HashMap<String, ManifestEntity> = manifest
292            .entities
293            .iter()
294            .map(|e| (e.name.clone(), e.clone()))
295            .collect();
296        Ok(Self {
297            backend: RuntimeBackend::Postgres(PgBackend {
298                store,
299                crdt: std::sync::Arc::new(crate::pg_loro_store::PgLoroStore::new()),
300            }),
301            manifest,
302            entities,
303            is_in_memory: false,
304        })
305    }
306
307    /// Returns true if this runtime is backed by an in-memory SQLite DB.
308    ///
309    /// Stored at open time rather than queried via `conn.path()` because
310    /// the path-based check conflates "no filename" with "in-memory":
311    /// `Connection::open("")` yields a file-backed DB with empty path,
312    /// and would falsely pass as in-memory. Since we always know at
313    /// construction time which constructor was used, track the bit.
314    ///
315    /// Gates the test-reset endpoint — a false positive here would let
316    /// `/api/__test__/reset` truncate real tables.
317    pub fn is_in_memory(&self) -> bool {
318        self.is_in_memory
319    }
320
321    /// Filesystem path to the SQLite database, if this runtime is file-backed.
322    /// Returns `None` for in-memory runtimes AND Postgres runtimes (no local
323    /// file). Used by the server bootstrap to derive companion paths
324    /// (session store, change log persistence) without requiring the caller
325    /// to pass them in.
326    pub fn db_path(&self) -> Option<String> {
327        if self.is_in_memory {
328            return None;
329        }
330        let sb = match &self.backend {
331            RuntimeBackend::Sqlite(sb) => sb,
332            RuntimeBackend::Postgres(_) => return None,
333        };
334        let conn = sb.write_conn.lock().ok()?;
335        conn.path().filter(|p| !p.is_empty()).map(String::from)
336    }
337
338    /// Drop every row from every entity table. Intended for test harnesses
339    /// that call `/api/__test__/reset` between cases; refuses to run on
340    /// anything but an in-memory database.
341    ///
342    /// Does NOT drop the tables themselves — schema stays, indexes stay,
343    /// triggers stay. Just truncates user data + the change log.
344    pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
345        if !self.is_in_memory() {
346            return Err(RuntimeError {
347                code: "RESET_REFUSED".into(),
348                message: "reset_for_tests is only available on in-memory databases".into(),
349            });
350        }
351        let conn = self.lock_write_conn()?;
352        let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
353        for name in entity_names {
354            let sql = format!("DELETE FROM {}", quote_ident(&name));
355            let _ = conn.execute(&sql, []);
356            // Also clear any FTS5 shadow table if present.
357            let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
358            let _ = conn.execute(&fts_sql, []);
359        }
360        Ok(())
361    }
362
363    /// Create an in-memory SQLite-backed runtime (useful for tests and
364    /// benchmarks). For Postgres-backed equivalents, use `open_postgres`
365    /// with a test-cluster URL.
366    pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
367        let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
368            code: "RUNTIME_OPEN_FAILED".into(),
369            message: format!("Failed to open in-memory database: {e}"),
370        })?;
371        Self::from_connection(conn, manifest, true)
372    }
373
374    fn from_connection(
375        conn: Connection,
376        manifest: AppManifest,
377        is_in_memory: bool,
378    ) -> Result<Self, RuntimeError> {
379        // Apply the production pragma set on the write connection.
380        tune_runtime_connection(&conn, is_in_memory);
381
382        // Build entity lookup map.
383        let entities: HashMap<String, ManifestEntity> = manifest
384            .entities
385            .iter()
386            .map(|e| (e.name.clone(), e.clone()))
387            .collect();
388
389        // Create tables for all entities.
390        for entity in &manifest.entities {
391            let fields: Vec<String> = entity
392                .fields
393                .iter()
394                .map(|f| {
395                    let col_type = match f.field_type.as_str() {
396                        "int" => "INTEGER",
397                        "float" => "REAL",
398                        "bool" => "INTEGER",
399                        _ => "TEXT",
400                    };
401                    let not_null = if f.optional { "" } else { " NOT NULL" };
402                    let unique = if f.unique { " UNIQUE" } else { "" };
403                    format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
404                })
405                .collect();
406
407            let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
408            cols.extend(fields);
409            let sql = format!(
410                "CREATE TABLE IF NOT EXISTS {} ({})",
411                quote_ident(&entity.name),
412                cols.join(", ")
413            );
414            conn.execute(&sql, []).map_err(|e| RuntimeError {
415                code: "SCHEMA_INIT_FAILED".into(),
416                message: format!("Failed to create table {}: {e}", entity.name),
417            })?;
418
419            // Create indexes.
420            for idx in &entity.indexes {
421                let unique_kw = if idx.unique { "UNIQUE " } else { "" };
422                let quoted_fields: Vec<String> =
423                    idx.fields.iter().map(|f| quote_ident(f)).collect();
424                let idx_sql = format!(
425                    "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
426                    quote_ident(&idx.name),
427                    quote_ident(&entity.name),
428                    quoted_fields.join(", ")
429                );
430                conn.execute(&idx_sql, []).ok();
431            }
432
433            // Create an FTS5 virtual table over all text-ish fields so clients
434            // can do full-text search via the `$search` query operator.
435            //
436            // Fields that look like "string" / "richtext" / "text" are indexed.
437            // The FTS table is a contentless external-content table pointed at
438            // the entity table, so SQLite keeps it consistent via triggers we
439            // install below.
440            let text_fields: Vec<&str> = entity
441                .fields
442                .iter()
443                .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
444                .map(|f| f.name.as_str())
445                .collect();
446            if !text_fields.is_empty() {
447                let fts_name = format!("{}_fts", entity.name);
448                let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
449                let fts_sql = format!(
450                    "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
451                    quote_ident(&fts_name),
452                    quoted_cols.join(", "),
453                    quote_ident(&entity.name),
454                );
455                // FTS5 may not be compiled in; ignore errors so those builds
456                // still work (queries using $search will return empty).
457                let fts_ok = conn.execute(&fts_sql, []).is_ok();
458
459                if fts_ok {
460                    // Sync triggers: keep FTS index current on INSERT/UPDATE/DELETE.
461                    //
462                    // Subtle bug fixed: the trigger NAME must be built from
463                    // the raw `fts_name` + suffix and THEN quoted once.
464                    // Previously this code quoted `fts_name` first and then
465                    // appended `_ai`/`_ad`/`_au` AFTER the closing quote,
466                    // producing invalid SQL like `"foo_fts"_ai`. The
467                    // `.ok()` after execute silently ate the error, so the
468                    // triggers were never created and FTS stayed out of
469                    // sync on writes.
470                    let tbl = quote_ident(&entity.name);
471                    let ftb = quote_ident(&fts_name);
472                    let cols_list = quoted_cols.join(", ");
473                    let new_list: Vec<String> = text_fields
474                        .iter()
475                        .map(|f| format!("new.{}", quote_ident(f)))
476                        .collect();
477                    let old_list: Vec<String> = text_fields
478                        .iter()
479                        .map(|f| format!("old.{}", quote_ident(f)))
480                        .collect();
481
482                    let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
483                    let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
484                    let trigger_au = quote_ident(&format!("{}_au", fts_name));
485
486                    let trigger_ins = format!(
487                        "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
488                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
489                        new_vals = new_list.join(", "),
490                    );
491                    let trigger_del = format!(
492                        "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
493                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
494                        old_vals = old_list.join(", "),
495                    );
496                    let trigger_upd = format!(
497                        "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
498                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
499                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
500                        new_vals = new_list.join(", "),
501                        old_vals = old_list.join(", "),
502                    );
503                    // Log failures instead of silently dropping — FTS going
504                    // stale should be visible to operators.
505                    for (label, sql) in [
506                        ("ai", &trigger_ins),
507                        ("ad", &trigger_del),
508                        ("au", &trigger_upd),
509                    ] {
510                        if let Err(e) = conn.execute(sql, []) {
511                            tracing::warn!(
512                                "[fts] failed to create {label} trigger for {}: {e}",
513                                entity.name
514                            );
515                        }
516                    }
517                }
518            }
519        }
520
521        // Open read-only connection pool for file-backed databases.
522        // In-memory databases cannot share connections, so the pool stays empty
523        // and reads fall back to the write connection.
524        let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
525
526        let read_pool = if let Some(ref path) = db_path {
527            let mut pool = Vec::with_capacity(READ_POOL_SIZE);
528            for _ in 0..READ_POOL_SIZE {
529                let read_conn = Connection::open_with_flags(
530                    path,
531                    rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
532                        | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
533                )
534                .map_err(|e| RuntimeError {
535                    code: "POOL_OPEN_FAILED".into(),
536                    message: format!("Failed to open read connection: {e}"),
537                })?;
538                tune_runtime_connection(&read_conn, false);
539                pool.push(Mutex::new(read_conn));
540            }
541            pool
542        } else {
543            // In-memory DB — no separate read connections possible.
544            Vec::new()
545        };
546
547        // Sidecar table for CRDT snapshots — created always so toggling
548        // `crdt: true` on an entity post-deploy doesn't need a migration.
549        crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
550            code: "CRDT_SIDECAR_FAILED".into(),
551            message: format!("create CRDT sidecar table: {e}"),
552        })?;
553
554        Ok(Self {
555            backend: RuntimeBackend::Sqlite(SqliteBackend {
556                write_conn: Mutex::new(conn),
557                read_pool,
558                read_counter: AtomicUsize::new(0),
559                crdt: crate::loro_store::LoroStore::new(),
560            }),
561            manifest,
562            entities,
563            is_in_memory,
564        })
565    }
566
567    /// Create the search index tables (`_facet_bitmap`, per-entity
568    /// `_fts_<Entity>`, and a covering index for each declared
569    /// sortable field) for every searchable entity in the manifest.
570    ///
571    /// Production deployments do this via the storage adapter's
572    /// `apply_schema` / migration plan; that path also handles
573    /// adding/removing the tables when a `search:` block is added or
574    /// removed across deploys. This method is a quick path for tests
575    /// and benchmarks that build a `Runtime::in_memory(...)` directly
576    /// without going through the schema-plan pipeline.
577    pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
578        // Postgres: schema (FTS, facets) is owned by the storage adapter's
579        // migration plan. Tests / benchmarks against Postgres must apply
580        // the plan separately; this fast-path is a SQLite-only convenience.
581        if matches!(self.backend, RuntimeBackend::Postgres(_)) {
582            return Ok(());
583        }
584        let conn = self.lock_write_conn()?;
585        conn.execute(pylon_storage::search::create_facet_table_sql(), [])
586            .map_err(|e| RuntimeError {
587                code: "FACET_TABLE_FAILED".into(),
588                message: format!("create _facet_bitmap: {e}"),
589            })?;
590        for entity in &self.manifest.entities {
591            if let Some(cfg) = &entity.search {
592                if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
593                    conn.execute(&sql, []).map_err(|e| RuntimeError {
594                        code: "FTS_TABLE_FAILED".into(),
595                        message: format!("create FTS table for {}: {e}", entity.name),
596                    })?;
597                }
598                for field in &cfg.sortable {
599                    let idx_sql = format!(
600                        "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
601                        entity.name, entity.name,
602                    );
603                    conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
604                        code: "SORT_INDEX_FAILED".into(),
605                        message: format!("create sort index for {}.{field}: {e}", entity.name),
606                    })?;
607                }
608            }
609        }
610        Ok(())
611    }
612
613    /// Return a reference to the app manifest.
614    pub fn manifest(&self) -> &AppManifest {
615        &self.manifest
616    }
617
618    /// Expose the write connection mutex for transactional operations.
619    /// SQLite-only — Postgres mode returns `NOT_SQLITE_BACKEND`. Callers
620    /// that need a transaction on Postgres should use [`Runtime::transact_ops`]
621    /// (via the `DataStore` trait), which routes to a real Postgres
622    /// transaction inside `PostgresDataStore`.
623    pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
624        self.lock_write_conn()
625    }
626
627    /// Return the number of read connections in the pool. Always 0 for
628    /// in-memory SQLite (pool is empty by design) and for Postgres mode
629    /// (the pool concept doesn't apply — `PostgresDataStore` manages its
630    /// own connection internally).
631    pub fn read_pool_size(&self) -> usize {
632        match &self.backend {
633            RuntimeBackend::Sqlite(sb) => sb.read_pool.len(),
634            RuntimeBackend::Postgres(_) => 0,
635        }
636    }
637
638    /// Return true if this runtime is backed by Postgres. Useful for
639    /// SQLite-only fast-paths to early-exit cleanly.
640    pub fn is_postgres(&self) -> bool {
641        matches!(self.backend, RuntimeBackend::Postgres(_))
642    }
643
644    // -----------------------------------------------------------------------
645    // CRDT helpers
646    // -----------------------------------------------------------------------
647
648    /// Map an entity's manifest fields → the [`pylon_crdt::CrdtField`] vec
649    /// the LoroStore needs. Resolves each field's CRDT shape from the
650    /// (type, annotation) pair via `pylon_crdt::field_kind`. Caches
651    /// nothing yet — called per write, fine at our entity counts.
652    pub(crate) fn crdt_fields_for(
653        &self,
654        ent: &ManifestEntity,
655    ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
656        let mut out = Vec::with_capacity(ent.fields.len());
657        for f in &ent.fields {
658            // Skip the implicit `id` column — it's the row key, not a
659            // CRDT-managed value. SQLite's PRIMARY KEY constraint owns it.
660            if f.name == "id" {
661                continue;
662            }
663            let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
664                code: "INVALID_CRDT_FIELD".into(),
665                message: format!(
666                    "{}.{}: {e} (declared type={}, crdt={:?})",
667                    ent.name, f.name, f.field_type, f.crdt
668                ),
669            })?;
670            out.push(pylon_crdt::CrdtField {
671                name: f.name.clone(),
672                kind,
673            });
674        }
675        Ok(out)
676    }
677
678    /// Borrow the CRDT store. SQLite-only — Postgres mode does not yet
679    /// support per-row CRDT snapshots at the runtime layer (CRDT
680    /// broadcasts degrade to JSON change events).
681    ///
682    /// # Panics
683    /// Panics on Postgres backend. Call sites that may run under either
684    /// backend should branch on `is_postgres()` first.
685    pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
686        match &self.backend {
687            RuntimeBackend::Sqlite(sb) => &sb.crdt,
688            RuntimeBackend::Postgres(_) => {
689                panic!("crdt_store() called on Postgres-backed Runtime")
690            }
691        }
692    }
693
694    // -----------------------------------------------------------------------
695    // CRUD operations
696    // -----------------------------------------------------------------------
697
698    /// Insert a new row. Returns the generated ID.
699    ///
700    /// For entities with `crdt: true` (the default) the LoroDoc snapshot
701    /// + the SQLite materialized row are committed together in a single
702    /// SQLite transaction so a crash between the two leaves neither.
703    /// `crdt: false` entities skip the LoroDoc and use a direct write
704    /// (legacy LWW path). Both produce the same on-disk row shape, so
705    /// reads, indexes, FTS, and policies don't change between modes.
706    pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
707        if let Some(pg) = self.pg_backend() {
708            let ent = self.require_entity(entity)?;
709            // Both CRDT-mode and non-CRDT writes go through one
710            // transaction so the row, the FTS shadow, and (for CRDT)
711            // the LoroDoc snapshot either all commit or all roll back.
712            // Pre-fix this was three separate autocommits and any
713            // failure between them desynced the layers.
714            if ent.crdt {
715                let crdt_fields = self.crdt_fields_for(ent)?;
716                let id = generate_id();
717                // Inject the generated id so build_insert_sql reuses
718                // it — keeps the snapshot key and the row id aligned.
719                let mut row = data.clone();
720                if let Some(obj) = row.as_object_mut() {
721                    obj.insert("id".into(), serde_json::Value::String(id.clone()));
722                }
723                let result = pg.store.with_transaction_raw(|tx| -> Result<(), RuntimeError> {
724                    pg.crdt
725                        .apply_patch(tx, entity, &id, &crdt_fields, data)
726                        .map_err(|e| RuntimeError {
727                            code: "CRDT_APPLY_FAILED".into(),
728                            message: format!("crdt write {entity}/{id}: {e}"),
729                        })?;
730                    pylon_storage::pg_tx_store::tx_insert(tx, &self.manifest, entity, &row)
731                        .map(|_| ())
732                        .map_err(data_err_to_runtime)?;
733                    pg.crdt.cache_after_commit(tx, entity, &id);
734                    Ok(())
735                });
736                if result.is_err() {
737                    // Rollback drops the persisted snapshot, but the
738                    // in-memory LoroDoc was mutated in-place by
739                    // apply_patch. Evict it so the next access
740                    // re-hydrates from disk (which is back in the
741                    // pre-apply state). Without this, the cache would
742                    // hold a doc ahead of the materialized row.
743                    pg.crdt.evict(entity, &id);
744                }
745                result?;
746                return Ok(id);
747            }
748            // Non-CRDT path: still one tx — the typed `DataStore::insert`
749            // already wraps in `with_transaction` internally for FTS
750            // atomicity, so we can delegate straight through.
751            return pylon_http::DataStore::insert(&pg.store, entity, data)
752                .map_err(data_err_to_runtime);
753        }
754        let ent = self.require_entity(entity)?;
755        let conn = self.lock_write_conn()?;
756
757        let id = generate_id();
758
759        let obj = data.as_object().ok_or_else(|| RuntimeError {
760            code: "INVALID_DATA".into(),
761            message: "Insert data must be a JSON object".into(),
762        })?;
763
764        // Validate columns up-front so we don't even open a transaction
765        // for a patch that the SQL INSERT will reject.
766        for key in obj.keys() {
767            if key != "id" {
768                validate_column_name(key, ent)?;
769            }
770        }
771
772        // SQLite-only path past this point — Postgres dispatch happened
773        // at the top of `insert()`. Hoist the backend handle here so we
774        // can reach the LoroStore from inside the tx closure without
775        // a second runtime branch on every iteration.
776        let sb = self.sqlite_backend()?;
777
778        // Atomic block — CRDT sidecar snapshot + materialized SQL row +
779        // search-index maintenance all land together or none does. SQLite's
780        // rollback journal makes this crash-safe end-to-end.
781        with_write_tx(&conn, || {
782            if ent.crdt {
783                let crdt_fields = self.crdt_fields_for(ent)?;
784                sb.crdt
785                    .apply_patch(&conn, entity, &id, &crdt_fields, data)
786                    .map_err(|e| RuntimeError {
787                        code: "CRDT_APPLY_FAILED".into(),
788                        message: format!("crdt write {entity}/{id}: {e}"),
789                    })?;
790            }
791
792            let mut col_names = vec![quote_ident("id")];
793            let mut placeholders = vec!["?1".to_string()];
794            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
795
796            let mut idx = 2;
797            for (key, val) in obj {
798                if key == "id" {
799                    continue;
800                }
801                col_names.push(quote_ident(key));
802                placeholders.push(format!("?{idx}"));
803                values.push(json_to_sql(val));
804                idx += 1;
805            }
806
807            let sql = format!(
808                "INSERT INTO {} ({}) VALUES ({})",
809                quote_ident(entity),
810                col_names.join(", "),
811                placeholders.join(", ")
812            );
813
814            let params: Vec<&dyn rusqlite::types::ToSql> =
815                values.iter().map(|v| v.as_ref()).collect();
816            conn.execute(&sql, params.as_slice())
817                .map_err(|e| RuntimeError {
818                    code: "INSERT_FAILED".into(),
819                    message: format!("Insert into {entity} failed: {e}"),
820                })?;
821
822            // Search-index maintenance lives inside the same tx so a
823            // crash between the row insert and the FTS update can't leave
824            // the search index inconsistent with the row table.
825            if let Some(cfg) = ent.search.as_ref() {
826                if !cfg.is_empty() {
827                    pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
828                        .map_err(|e| RuntimeError {
829                            code: "SEARCH_MAINTENANCE_FAILED".into(),
830                            message: format!("search index update on insert {entity}: {e}"),
831                        })?;
832                }
833            }
834            Ok(())
835        })?;
836
837        Ok(id)
838    }
839
840    /// Get a single row by ID.
841    pub fn get_by_id(
842        &self,
843        entity: &str,
844        id: &str,
845    ) -> Result<Option<serde_json::Value>, RuntimeError> {
846        if let Some(pg) = self.pg_backend() {
847            return pylon_http::DataStore::get_by_id(&pg.store, entity, id)
848                .map_err(data_err_to_runtime);
849        }
850        let ent = self.require_entity(entity)?;
851        let conn = self.lock_read_conn()?;
852
853        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
854        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
855            code: "QUERY_FAILED".into(),
856            message: format!("Failed to prepare query: {e}"),
857        })?;
858
859        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
860
861        let result = stmt
862            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
863            .ok();
864
865        Ok(result)
866    }
867
868    /// List all rows for an entity.
869    pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
870        if let Some(pg) = self.pg_backend() {
871            return pylon_http::DataStore::list(&pg.store, entity).map_err(data_err_to_runtime);
872        }
873        let ent = self.require_entity(entity)?;
874        let conn = self.lock_read_conn()?;
875
876        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
877        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
878            code: "QUERY_FAILED".into(),
879            message: format!("Failed to prepare query: {e}"),
880        })?;
881
882        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
883
884        let rows = stmt
885            .query_map([], |row| Ok(row_to_json(row, &columns)))
886            .map_err(|e| RuntimeError {
887                code: "QUERY_FAILED".into(),
888                message: format!("Query failed: {e}"),
889            })?;
890
891        let mut result = Vec::new();
892        for row in rows {
893            if let Ok(val) = row {
894                result.push(val);
895            }
896        }
897        Ok(result)
898    }
899
900    /// List rows after a cursor ID (for cursor-based pagination).
901    pub fn list_after(
902        &self,
903        entity: &str,
904        after: Option<&str>,
905        limit: usize,
906    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
907        if let Some(pg) = self.pg_backend() {
908            return pylon_http::DataStore::list_after(&pg.store, entity, after, limit)
909                .map_err(data_err_to_runtime);
910        }
911        let ent = self.require_entity(entity)?;
912        let conn = self.lock_read_conn()?;
913
914        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
915        let table = quote_ident(entity);
916
917        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
918            Some(cursor) => (
919                format!(
920                    "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
921                    table
922                ),
923                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
924            ),
925            None => (
926                format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
927                vec![Box::new(limit as i64)],
928            ),
929        };
930
931        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
932            params.iter().map(|v| v.as_ref()).collect();
933
934        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
935            code: "QUERY_FAILED".into(),
936            message: format!("Failed to prepare query: {e}"),
937        })?;
938
939        let rows = stmt
940            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
941            .map_err(|e| RuntimeError {
942                code: "QUERY_FAILED".into(),
943                message: format!("Query failed: {e}"),
944            })?;
945
946        let mut result = Vec::new();
947        for row in rows {
948            if let Ok(val) = row {
949                result.push(val);
950            }
951        }
952        Ok(result)
953    }
954
955    /// Update a row by ID. Returns true if a row was found and updated.
956    ///
957    /// For entities with `crdt: true` (the default) the LoroDoc receives
958    /// the patch first; the SQLite UPDATE writes the same fields so the
959    /// materialized view stays in lockstep with the doc state.
960    pub fn update(
961        &self,
962        entity: &str,
963        id: &str,
964        data: &serde_json::Value,
965    ) -> Result<bool, RuntimeError> {
966        if let Some(pg) = self.pg_backend() {
967            let ent = self.require_entity(entity)?;
968            if ent.crdt {
969                // CRDT mode: snapshot apply + materialized update +
970                // FTS shadow rebuild all share one tx. Pre-fix the
971                // snapshot landed in autocommit and the row write in
972                // a separate one — a mid-write crash desynced them.
973                //
974                // The closure also FAILS the tx if `tx_update` returns
975                // false (no row matched). Without that, the snapshot
976                // would commit alone — orphaned state pointing at a
977                // row that doesn't exist. Codex flagged this. On
978                // rollback the runtime evicts the cached LoroDoc so
979                // the next read re-hydrates from the (unchanged)
980                // sidecar.
981                let crdt_fields = self.crdt_fields_for(ent)?;
982                let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
983                    pg.crdt
984                        .apply_patch(tx, entity, id, &crdt_fields, data)
985                        .map_err(|e| RuntimeError {
986                            code: "CRDT_APPLY_FAILED".into(),
987                            message: format!("crdt update {entity}/{id}: {e}"),
988                        })?;
989                    let updated = pylon_storage::pg_tx_store::tx_update(
990                        tx,
991                        &self.manifest,
992                        entity,
993                        id,
994                        data,
995                    )
996                    .map_err(data_err_to_runtime)?;
997                    if !updated {
998                        // Roll back via Err so the snapshot doesn't
999                        // commit against a missing row.
1000                        return Err(RuntimeError {
1001                            code: "ENTITY_NOT_FOUND".into(),
1002                            message: format!(
1003                                "Update on {entity}/{id} found no row — refusing to commit \
1004                                 a CRDT snapshot that would orphan."
1005                            ),
1006                        });
1007                    }
1008                    // Refresh the cache from the just-persisted
1009                    // snapshot so post-commit reads on this process
1010                    // skip the re-hydration round-trip.
1011                    pg.crdt.cache_after_commit(tx, entity, id);
1012                    Ok(updated)
1013                });
1014                if result.is_err() {
1015                    pg.crdt.evict(entity, id);
1016                    // ENTITY_NOT_FOUND from the inner closure is the
1017                    // intended return for "no such row" — translate
1018                    // into Ok(false) so callers see the same shape
1019                    // the SQLite path returns. Real errors (CRDT
1020                    // apply failed, BEGIN/COMMIT failed) propagate.
1021                    if let Err(ref e) = result {
1022                        if e.code == "ENTITY_NOT_FOUND" {
1023                            return Ok(false);
1024                        }
1025                    }
1026                }
1027                return result;
1028            }
1029            return pylon_http::DataStore::update(&pg.store, entity, id, data)
1030                .map_err(data_err_to_runtime);
1031        }
1032        let ent = self.require_entity(entity)?;
1033        let conn = self.lock_write_conn()?;
1034
1035        let obj = data.as_object().ok_or_else(|| RuntimeError {
1036            code: "INVALID_DATA".into(),
1037            message: "Update data must be a JSON object".into(),
1038        })?;
1039
1040        // Validate up-front and exit cheap if there's nothing to write.
1041        for key in obj.keys() {
1042            if key != "id" {
1043                validate_column_name(key, ent)?;
1044            }
1045        }
1046        let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
1047        if writable_keys.is_empty() {
1048            return Ok(false);
1049        }
1050
1051        // SQLite-only path past this point — see note in `insert()`.
1052        let sb = self.sqlite_backend()?;
1053
1054        // Atomic block — same shape as insert. CRDT snapshot, SQL UPDATE,
1055        // and FTS maintenance all commit together.
1056        let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
1057            if ent.crdt {
1058                let crdt_fields = self.crdt_fields_for(ent)?;
1059                sb.crdt
1060                    .apply_patch(&conn, entity, id, &crdt_fields, data)
1061                    .map_err(|e| RuntimeError {
1062                        code: "CRDT_APPLY_FAILED".into(),
1063                        message: format!("crdt write {entity}/{id}: {e}"),
1064                    })?;
1065            }
1066
1067            let mut set_clauses = Vec::new();
1068            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1069            let mut idx = 1;
1070            for key in &writable_keys {
1071                set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1072                values.push(json_to_sql(&obj[key.as_str()]));
1073                idx += 1;
1074            }
1075
1076            // Capture pre-UPDATE row for search-maintenance diff INSIDE the
1077            // tx. Matches the contract of search_maintenance::apply_update
1078            // — old state must be read before the UPDATE lands.
1079            let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1080            let old_row = if searchable {
1081                self.get_by_id_with_conn(&conn, entity, id)?
1082            } else {
1083                None
1084            };
1085
1086            values.push(Box::new(id.to_string()));
1087            let sql = format!(
1088                "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1089                quote_ident(entity),
1090                set_clauses.join(", ")
1091            );
1092
1093            let params: Vec<&dyn rusqlite::types::ToSql> =
1094                values.iter().map(|v| v.as_ref()).collect();
1095            let affected = conn
1096                .execute(&sql, params.as_slice())
1097                .map_err(|e| RuntimeError {
1098                    code: "UPDATE_FAILED".into(),
1099                    message: format!("Update {entity}/{id} failed: {e}"),
1100                })? as i64;
1101
1102            if affected > 0 && searchable {
1103                if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1104                    pylon_storage::search_maintenance::apply_update(
1105                        &conn, entity, id, &old, data, cfg,
1106                    )
1107                    .map_err(|e| RuntimeError {
1108                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1109                        message: format!("search index update on update {entity}: {e}"),
1110                    })?;
1111                }
1112            }
1113            Ok(affected)
1114        })?;
1115
1116        Ok(affected > 0)
1117    }
1118
1119    /// Delete a row by ID. Returns true if a row was actually deleted.
1120    pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
1121        if let Some(pg) = self.pg_backend() {
1122            let ent = self.require_entity(entity)?;
1123            if ent.crdt {
1124                // Sidecar delete + entity delete + FTS shadow delete
1125                // share one tx. Eviction of the in-memory cache runs
1126                // AFTER commit so a rolled-back delete leaves the
1127                // cache valid (the snapshot is still on disk).
1128                let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
1129                    tx.execute(
1130                        "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
1131                        &[&entity, &id],
1132                    )
1133                    .map_err(|e| RuntimeError {
1134                        code: "CRDT_SIDECAR_DELETE_FAILED".into(),
1135                        message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
1136                    })?;
1137                    pylon_storage::pg_tx_store::tx_delete(tx, &self.manifest, entity, id)
1138                        .map_err(data_err_to_runtime)
1139                });
1140                // Evict regardless of whether tx_delete found a row —
1141                // we issued the sidecar DELETE inside the same tx, so
1142                // any cached doc is now stale even if the entity row
1143                // was already gone (orphan sidecar case codex flagged).
1144                // Only skip eviction if the WHOLE tx rolled back.
1145                if result.is_ok() {
1146                    pg.crdt.evict(entity, id);
1147                }
1148                return result;
1149            }
1150            return pylon_http::DataStore::delete(&pg.store, entity, id)
1151                .map_err(data_err_to_runtime);
1152        }
1153        let ent = self.require_entity(entity)?;
1154        let conn = self.lock_write_conn()?;
1155
1156        // Apply search-maintenance BEFORE the DELETE — we still need
1157        // the row's facet values to clear the bitmap bits.
1158        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1159        if searchable {
1160            if let (Some(cfg), Ok(Some(row))) = (
1161                ent.search.as_ref(),
1162                self.get_by_id_with_conn(&conn, entity, id),
1163            ) {
1164                pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
1165                    .map_err(|e| RuntimeError {
1166                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1167                        message: format!("search index update on delete {entity}: {e}"),
1168                    })?;
1169            }
1170        }
1171
1172        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1173        let affected = conn
1174            .execute(&sql, rusqlite::params![id])
1175            .map_err(|e| RuntimeError {
1176                code: "DELETE_FAILED".into(),
1177                message: format!("Delete {entity}/{id} failed: {e}"),
1178            })?;
1179
1180        Ok(affected > 0)
1181    }
1182
1183    /// Lookup a single row by a field value (e.g., email).
1184    pub fn lookup(
1185        &self,
1186        entity: &str,
1187        field: &str,
1188        value: &str,
1189    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1190        if let Some(pg) = self.pg_backend() {
1191            return pylon_http::DataStore::lookup(&pg.store, entity, field, value)
1192                .map_err(data_err_to_runtime);
1193        }
1194        let ent = self.require_entity(entity)?;
1195        validate_column_name(field, ent)?;
1196        let conn = self.lock_read_conn()?;
1197
1198        let sql = format!(
1199            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1200            quote_ident(entity),
1201            quote_ident(field)
1202        );
1203        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1204
1205        let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1206            stmt.query_row(rusqlite::params![value], |row| {
1207                Ok(row_to_json(row, &columns))
1208            })
1209            .ok()
1210        });
1211
1212        Ok(result)
1213    }
1214
1215    /// Link two entities by setting a foreign-key field.
1216    pub fn link(
1217        &self,
1218        entity: &str,
1219        id: &str,
1220        relation: &str,
1221        target_id: &str,
1222    ) -> Result<bool, RuntimeError> {
1223        let ent = self.require_entity(entity)?;
1224
1225        // Find the relation definition to determine which field to set.
1226        let rel = ent
1227            .relations
1228            .iter()
1229            .find(|r| r.name == relation)
1230            .ok_or_else(|| RuntimeError {
1231                code: "RELATION_NOT_FOUND".into(),
1232                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1233            })?;
1234
1235        let data = serde_json::json!({ rel.field.clone(): target_id });
1236        self.update(entity, id, &data)
1237    }
1238
1239    /// Unlink a relation by setting the foreign-key field to null.
1240    pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
1241        let ent = self.require_entity(entity)?;
1242
1243        let rel = ent
1244            .relations
1245            .iter()
1246            .find(|r| r.name == relation)
1247            .ok_or_else(|| RuntimeError {
1248                code: "RELATION_NOT_FOUND".into(),
1249                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1250            })?;
1251
1252        let data = serde_json::json!({ rel.field.clone(): null });
1253        self.update(entity, id, &data)
1254    }
1255
1256    /// Execute a filtered query with operators ($not, $gt, $in, $like, $order, $limit).
1257    pub fn query_filtered(
1258        &self,
1259        entity: &str,
1260        filter: &serde_json::Value,
1261    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1262        if let Some(pg) = self.pg_backend() {
1263            return pylon_http::DataStore::query_filtered(&pg.store, entity, filter)
1264                .map_err(data_err_to_runtime);
1265        }
1266        let ent = self.require_entity(entity)?;
1267        let conn = self.lock_read_conn()?;
1268
1269        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1270        let obj = filter
1271            .as_object()
1272            .unwrap_or(&serde_json::Map::new())
1273            .clone();
1274
1275        let mut where_clauses = Vec::new();
1276        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1277        let mut order_clause = String::new();
1278        let mut limit_clause = String::new();
1279        let mut join_clause = String::new();
1280        let mut fts_order = false;
1281        let mut idx = 1;
1282
1283        for (key, val) in &obj {
1284            match key.as_str() {
1285                "$order" => {
1286                    if let Some(order_obj) = val.as_object() {
1287                        let mut parts: Vec<String> = Vec::new();
1288                        for (col, dir) in order_obj {
1289                            validate_column_name(col, ent)?;
1290                            let d = match dir.as_str().unwrap_or("asc") {
1291                                "desc" | "DESC" => "DESC",
1292                                _ => "ASC",
1293                            };
1294                            parts.push(format!("{} {d}", quote_ident(col)));
1295                        }
1296                        if !parts.is_empty() {
1297                            order_clause = format!(" ORDER BY {}", parts.join(", "));
1298                        }
1299                    }
1300                }
1301                "$limit" => {
1302                    if let Some(n) = val.as_u64() {
1303                        limit_clause = format!(" LIMIT {n}");
1304                    }
1305                }
1306                "$offset" => {
1307                    if let Some(n) = val.as_u64() {
1308                        // SQLite requires LIMIT before OFFSET; add a default.
1309                        if limit_clause.is_empty() {
1310                            limit_clause = " LIMIT -1".into();
1311                        }
1312                        limit_clause = format!("{limit_clause} OFFSET {n}");
1313                    }
1314                }
1315                "$search" => {
1316                    if let Some(q) = val.as_str() {
1317                        // Join against the entity's FTS5 virtual table.
1318                        let fts = format!("{}_fts", entity);
1319                        join_clause = format!(
1320                            " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
1321                            fts = quote_ident(&fts),
1322                            ent = quote_ident(entity),
1323                        );
1324                        where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
1325                        values.push(Box::new(q.to_string()));
1326                        fts_order = true;
1327                        idx += 1;
1328                    }
1329                }
1330                _ => {
1331                    validate_column_name(key, ent)?;
1332                    let quoted_key = quote_ident(key);
1333
1334                    if let Some(op_obj) = val.as_object() {
1335                        for (op, op_val) in op_obj {
1336                            match op.as_str() {
1337                                "$not" => {
1338                                    where_clauses.push(format!("{quoted_key} != ?{idx}"));
1339                                    values.push(json_to_sql(op_val));
1340                                    idx += 1;
1341                                }
1342                                "$gt" => {
1343                                    where_clauses.push(format!("{quoted_key} > ?{idx}"));
1344                                    values.push(json_to_sql(op_val));
1345                                    idx += 1;
1346                                }
1347                                "$gte" => {
1348                                    where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1349                                    values.push(json_to_sql(op_val));
1350                                    idx += 1;
1351                                }
1352                                "$lt" => {
1353                                    where_clauses.push(format!("{quoted_key} < ?{idx}"));
1354                                    values.push(json_to_sql(op_val));
1355                                    idx += 1;
1356                                }
1357                                "$lte" => {
1358                                    where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1359                                    values.push(json_to_sql(op_val));
1360                                    idx += 1;
1361                                }
1362                                "$like" => {
1363                                    where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1364                                    let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1365                                    values.push(Box::new(pattern));
1366                                    idx += 1;
1367                                }
1368                                "$in" => {
1369                                    if let Some(arr) = op_val.as_array() {
1370                                        if arr.is_empty() {
1371                                            // Empty $in matches nothing.
1372                                            // Previously SQLite SKIPPED the
1373                                            // predicate (returning ALL rows)
1374                                            // while PG short-circuited to
1375                                            // FALSE — a real cross-backend
1376                                            // drift bug codex caught. Both
1377                                            // now emit `0` (false) so empty
1378                                            // $in returns an empty set.
1379                                            where_clauses.push("0".into());
1380                                        } else {
1381                                            let placeholders: Vec<String> = arr
1382                                                .iter()
1383                                                .map(|v| {
1384                                                    let p = format!("?{idx}");
1385                                                    values.push(json_to_sql(v));
1386                                                    idx += 1;
1387                                                    p
1388                                                })
1389                                                .collect();
1390                                            where_clauses.push(format!(
1391                                                "{quoted_key} IN ({})",
1392                                                placeholders.join(", ")
1393                                            ));
1394                                        }
1395                                    }
1396                                }
1397                                _ => {}
1398                            }
1399                        }
1400                    } else {
1401                        // Simple equality.
1402                        where_clauses.push(format!("{quoted_key} = ?{idx}"));
1403                        values.push(json_to_sql(val));
1404                        idx += 1;
1405                    }
1406                }
1407            }
1408        }
1409
1410        let where_sql = if where_clauses.is_empty() {
1411            String::new()
1412        } else {
1413            format!(" WHERE {}", where_clauses.join(" AND "))
1414        };
1415
1416        if order_clause.is_empty() {
1417            order_clause = if fts_order {
1418                // FTS joins default-order by bm25 relevance.
1419                " ORDER BY bm25(".to_string() + &quote_ident(&format!("{}_fts", entity)) + ")"
1420            } else {
1421                format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1422            };
1423        }
1424
1425        let select_prefix = format!("{}.*", quote_ident(entity));
1426        let sql = format!(
1427            "SELECT {} FROM {}{}{}{}{}",
1428            select_prefix,
1429            quote_ident(entity),
1430            join_clause,
1431            where_sql,
1432            order_clause,
1433            limit_clause
1434        );
1435        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1436            values.iter().map(|v| v.as_ref()).collect();
1437
1438        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1439            code: "QUERY_FAILED".into(),
1440            message: format!("Failed to prepare filtered query: {e}"),
1441        })?;
1442
1443        let rows = stmt
1444            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1445            .map_err(|e| RuntimeError {
1446                code: "QUERY_FAILED".into(),
1447                message: format!("Filtered query failed: {e}"),
1448            })?;
1449
1450        let mut result = Vec::new();
1451        for row in rows {
1452            if let Ok(val) = row {
1453                result.push(val);
1454            }
1455        }
1456        Ok(result)
1457    }
1458
1459    /// Execute a graph-style query.
1460    ///
1461    /// Input: `{ "User": { "where": { "email": "..." }, "include": { "posts": {} } } }`
1462    /// Returns nested results following relations.
1463    pub fn query_graph(
1464        &self,
1465        query: &serde_json::Value,
1466    ) -> Result<serde_json::Value, RuntimeError> {
1467        let obj = query.as_object().ok_or_else(|| RuntimeError {
1468            code: "INVALID_QUERY".into(),
1469            message: "Graph query must be a JSON object".into(),
1470        })?;
1471
1472        let mut results = serde_json::Map::new();
1473
1474        for (entity_name, query_opts) in obj {
1475            let _ent = self.require_entity(entity_name)?;
1476
1477            // Apply where clause if present.
1478            let filter = query_opts
1479                .get("where")
1480                .cloned()
1481                .unwrap_or(serde_json::json!({}));
1482            let rows = self.query_filtered(entity_name, &filter)?;
1483
1484            // Apply includes (relations) if present.
1485            let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1486            {
1487                // Internal invariant: if query_filtered succeeded above, the
1488                // entity must exist. Previously this used .unwrap() which
1489                // would panic if the invariant broke — a panic inside the
1490                // handler path poisons the connection mutex and takes down
1491                // all subsequent reads. Fail the request cleanly instead.
1492                let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1493                    code: "INVARIANT_BROKEN".into(),
1494                    message: format!(
1495                        "entity \"{entity_name}\" missing from registry during include expansion"
1496                    ),
1497                })?;
1498                rows.into_iter()
1499                    .map(|mut row| {
1500                        for (rel_name, _sub_query) in include {
1501                            if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1502                                let fk_value = row
1503                                    .get(&rel.field)
1504                                    .and_then(|v| v.as_str())
1505                                    .map(|s| s.to_string());
1506                                if let Some(fk) = fk_value {
1507                                    if rel.many {
1508                                        // One-to-many: find rows in target where field matches id.
1509                                        let sub_filter = serde_json::json!({ &rel.field: &fk });
1510                                        if let Ok(related) =
1511                                            self.query_filtered(&rel.target, &sub_filter)
1512                                        {
1513                                            row[rel_name] = serde_json::json!(related);
1514                                        }
1515                                    } else {
1516                                        // One-to-one / many-to-one: get by id.
1517                                        if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1518                                        {
1519                                            row[rel_name] = related;
1520                                        }
1521                                    }
1522                                }
1523                            }
1524                        }
1525                        row
1526                    })
1527                    .collect()
1528            } else {
1529                rows
1530            };
1531
1532            // Apply limit if present.
1533            let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1534                rows.into_iter().take(limit as usize).collect()
1535            } else {
1536                rows
1537            };
1538
1539            results.insert(entity_name.clone(), serde_json::json!(rows));
1540        }
1541
1542        Ok(serde_json::Value::Object(results))
1543    }
1544
1545    // -----------------------------------------------------------------------
1546    // Transaction-safe variants (use a pre-held connection guard)
1547    // -----------------------------------------------------------------------
1548
1549    /// Insert using an already-locked connection (for transactions).
1550    pub fn insert_with_conn(
1551        &self,
1552        conn: &Connection,
1553        entity: &str,
1554        data: &serde_json::Value,
1555    ) -> Result<String, RuntimeError> {
1556        let ent = self.require_entity(entity)?;
1557        let id = generate_id();
1558        let obj = data.as_object().ok_or_else(|| RuntimeError {
1559            code: "INVALID_DATA".into(),
1560            message: "Insert data must be a JSON object".into(),
1561        })?;
1562
1563        let mut col_names = vec![quote_ident("id")];
1564        let mut placeholders = vec!["?1".to_string()];
1565        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1566        let mut idx = 2;
1567        for (key, val) in obj {
1568            if key == "id" {
1569                continue;
1570            }
1571            validate_column_name(key, ent)?;
1572            col_names.push(quote_ident(key));
1573            placeholders.push(format!("?{idx}"));
1574            values.push(json_to_sql(val));
1575            idx += 1;
1576        }
1577
1578        let sql = format!(
1579            "INSERT INTO {} ({}) VALUES ({})",
1580            quote_ident(entity),
1581            col_names.join(", "),
1582            placeholders.join(", ")
1583        );
1584        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1585        conn.execute(&sql, params.as_slice())
1586            .map_err(|e| RuntimeError {
1587                code: "INSERT_FAILED".into(),
1588                message: format!("Insert into {entity} failed: {e}"),
1589            })?;
1590
1591        // Faceted-search maintenance in the same transaction. Skipped
1592        // for entities that don't declare `search:` in their schema.
1593        if let Some(cfg) = ent.search.as_ref() {
1594            if !cfg.is_empty() {
1595                pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1596                    .map_err(|e| RuntimeError {
1597                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1598                        message: format!("search index update on insert {entity}: {e}"),
1599                    })?;
1600            }
1601        }
1602
1603        Ok(id)
1604    }
1605
1606    /// Update using an already-locked connection (for transactions).
1607    pub fn update_with_conn(
1608        &self,
1609        conn: &Connection,
1610        entity: &str,
1611        id: &str,
1612        data: &serde_json::Value,
1613    ) -> Result<bool, RuntimeError> {
1614        let ent = self.require_entity(entity)?;
1615        let obj = data.as_object().ok_or_else(|| RuntimeError {
1616            code: "INVALID_DATA".into(),
1617            message: "Update data must be a JSON object".into(),
1618        })?;
1619
1620        let mut set_clauses = Vec::new();
1621        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1622        let mut idx = 1;
1623        for (key, val) in obj {
1624            if key == "id" {
1625                continue;
1626            }
1627            validate_column_name(key, ent)?;
1628            set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1629            values.push(json_to_sql(val));
1630            idx += 1;
1631        }
1632        if set_clauses.is_empty() {
1633            return Ok(false);
1634        }
1635
1636        // Capture the pre-UPDATE row if we need to diff facet values.
1637        // Read happens before the UPDATE so apply_update sees the OLD
1638        // state of any facet field. Cheap — single-row lookup on the
1639        // `id` primary-key index.
1640        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1641        let old_row = if searchable {
1642            self.get_by_id_with_conn(conn, entity, id)?
1643        } else {
1644            None
1645        };
1646
1647        values.push(Box::new(id.to_string()));
1648        let sql = format!(
1649            "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1650            quote_ident(entity),
1651            set_clauses.join(", ")
1652        );
1653        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1654        let affected = conn
1655            .execute(&sql, params.as_slice())
1656            .map_err(|e| RuntimeError {
1657                code: "UPDATE_FAILED".into(),
1658                message: format!("Update {entity}/{id} failed: {e}"),
1659            })?;
1660
1661        if affected > 0 && searchable {
1662            if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1663                pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1664                    .map_err(|e| RuntimeError {
1665                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1666                        message: format!("search index update on update {entity}: {e}"),
1667                    })?;
1668            }
1669        }
1670
1671        Ok(affected > 0)
1672    }
1673
1674    /// Delete using an already-locked connection (for transactions).
1675    pub fn delete_with_conn(
1676        &self,
1677        conn: &Connection,
1678        entity: &str,
1679        id: &str,
1680    ) -> Result<bool, RuntimeError> {
1681        let ent = self.require_entity(entity)?;
1682
1683        // Apply search maintenance BEFORE the DELETE so we still have
1684        // the row's facet values to diff against.
1685        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1686        if searchable {
1687            if let (Some(cfg), Ok(Some(row))) = (
1688                ent.search.as_ref(),
1689                self.get_by_id_with_conn(conn, entity, id),
1690            ) {
1691                pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1692                    .map_err(|e| RuntimeError {
1693                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1694                        message: format!("search index update on delete {entity}: {e}"),
1695                    })?;
1696            }
1697        }
1698
1699        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1700        let affected = conn
1701            .execute(&sql, rusqlite::params![id])
1702            .map_err(|e| RuntimeError {
1703                code: "DELETE_FAILED".into(),
1704                message: format!("Delete {entity}/{id} failed: {e}"),
1705            })?;
1706        Ok(affected > 0)
1707    }
1708
1709    /// Read a row by id using a pre-held connection (for transactions).
1710    pub fn get_by_id_with_conn(
1711        &self,
1712        conn: &Connection,
1713        entity: &str,
1714        id: &str,
1715    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1716        let ent = self.require_entity(entity)?;
1717        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1718        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1719            code: "QUERY_FAILED".into(),
1720            message: format!("Failed to prepare query: {e}"),
1721        })?;
1722        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1723        Ok(stmt
1724            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1725            .ok())
1726    }
1727
1728    /// List rows using a pre-held connection (for transactions).
1729    pub fn list_with_conn(
1730        &self,
1731        conn: &Connection,
1732        entity: &str,
1733    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1734        let ent = self.require_entity(entity)?;
1735        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1736        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1737            code: "QUERY_FAILED".into(),
1738            message: format!("Failed to prepare query: {e}"),
1739        })?;
1740        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1741        let rows = stmt
1742            .query_map([], |row| Ok(row_to_json(row, &columns)))
1743            .map_err(|e| RuntimeError {
1744                code: "QUERY_FAILED".into(),
1745                message: format!("Query failed: {e}"),
1746            })?;
1747        Ok(rows.flatten().collect())
1748    }
1749
1750    /// List after cursor using a pre-held connection (for transactions).
1751    pub fn list_after_with_conn(
1752        &self,
1753        conn: &Connection,
1754        entity: &str,
1755        after: Option<&str>,
1756        limit: usize,
1757    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1758        let ent = self.require_entity(entity)?;
1759        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1760        let table = quote_ident(entity);
1761        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1762            Some(cursor) => (
1763                format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1764                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1765            ),
1766            None => (
1767                format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1768                vec![Box::new(limit as i64)],
1769            ),
1770        };
1771        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1772            params.iter().map(|v| v.as_ref()).collect();
1773        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1774            code: "QUERY_FAILED".into(),
1775            message: format!("Failed to prepare: {e}"),
1776        })?;
1777        let rows = stmt
1778            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1779            .map_err(|e| RuntimeError {
1780                code: "QUERY_FAILED".into(),
1781                message: format!("Query failed: {e}"),
1782            })?;
1783        Ok(rows.flatten().collect())
1784    }
1785
1786    /// Lookup by field using a pre-held connection (for transactions).
1787    pub fn lookup_with_conn(
1788        &self,
1789        conn: &Connection,
1790        entity: &str,
1791        field: &str,
1792        value: &str,
1793    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1794        let ent = self.require_entity(entity)?;
1795        validate_column_name(field, ent)?;
1796        let sql = format!(
1797            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1798            quote_ident(entity),
1799            quote_ident(field)
1800        );
1801        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1802        Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1803            stmt.query_row(rusqlite::params![value], |row| {
1804                Ok(row_to_json(row, &columns))
1805            })
1806            .ok()
1807        }))
1808    }
1809
1810    /// Link relation using a pre-held connection (for transactions).
1811    pub fn link_with_conn(
1812        &self,
1813        conn: &Connection,
1814        entity: &str,
1815        id: &str,
1816        relation: &str,
1817        target_id: &str,
1818    ) -> Result<bool, RuntimeError> {
1819        let ent = self.require_entity(entity)?;
1820        let rel = ent
1821            .relations
1822            .iter()
1823            .find(|r| r.name == relation)
1824            .ok_or_else(|| RuntimeError {
1825                code: "RELATION_NOT_FOUND".into(),
1826                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1827            })?;
1828        let data = serde_json::json!({ rel.field.clone(): target_id });
1829        self.update_with_conn(conn, entity, id, &data)
1830    }
1831
1832    /// Unlink relation using a pre-held connection (for transactions).
1833    pub fn unlink_with_conn(
1834        &self,
1835        conn: &Connection,
1836        entity: &str,
1837        id: &str,
1838        relation: &str,
1839    ) -> Result<bool, RuntimeError> {
1840        let ent = self.require_entity(entity)?;
1841        let rel = ent
1842            .relations
1843            .iter()
1844            .find(|r| r.name == relation)
1845            .ok_or_else(|| RuntimeError {
1846                code: "RELATION_NOT_FOUND".into(),
1847                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1848            })?;
1849        let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1850        self.update_with_conn(conn, entity, id, &data)
1851    }
1852
1853    /// Query with filters using a pre-held connection (for transactions).
1854    ///
1855    /// Shares the filter-building logic with [`query_filtered`] by executing
1856    /// against the provided connection rather than acquiring one.
1857    pub fn query_filtered_with_conn(
1858        &self,
1859        conn: &Connection,
1860        entity: &str,
1861        filter: &serde_json::Value,
1862    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1863        let ent = self.require_entity(entity)?;
1864        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1865        let empty = serde_json::Map::new();
1866        let obj = filter.as_object().unwrap_or(&empty);
1867
1868        let mut where_clauses = Vec::new();
1869        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1870        let mut order_clause = String::new();
1871        let mut limit_clause = String::new();
1872        let mut idx = 1;
1873
1874        for (key, val) in obj {
1875            match key.as_str() {
1876                "$order" => {
1877                    if let Some(o) = val.as_object() {
1878                        let mut parts: Vec<String> = Vec::new();
1879                        for (col, dir) in o {
1880                            validate_column_name(col, ent)?;
1881                            let d = match dir.as_str().unwrap_or("asc") {
1882                                "desc" | "DESC" => "DESC",
1883                                _ => "ASC",
1884                            };
1885                            parts.push(format!("{} {d}", quote_ident(col)));
1886                        }
1887                        if !parts.is_empty() {
1888                            order_clause = format!(" ORDER BY {}", parts.join(", "));
1889                        }
1890                    }
1891                }
1892                "$limit" => {
1893                    if let Some(n) = val.as_u64() {
1894                        limit_clause = format!(" LIMIT {n}");
1895                    }
1896                }
1897                "$offset" => {
1898                    if let Some(n) = val.as_u64() {
1899                        if limit_clause.is_empty() {
1900                            limit_clause = " LIMIT -1".into();
1901                        }
1902                        limit_clause = format!("{limit_clause} OFFSET {n}");
1903                    }
1904                }
1905                _ => {
1906                    validate_column_name(key, ent)?;
1907                    let qk = quote_ident(key);
1908                    if let Some(op_obj) = val.as_object() {
1909                        for (op, op_val) in op_obj {
1910                            match op.as_str() {
1911                                "$not" => {
1912                                    where_clauses.push(format!("{qk} != ?{idx}"));
1913                                    values.push(json_to_sql(op_val));
1914                                    idx += 1;
1915                                }
1916                                "$gt" => {
1917                                    where_clauses.push(format!("{qk} > ?{idx}"));
1918                                    values.push(json_to_sql(op_val));
1919                                    idx += 1;
1920                                }
1921                                "$gte" => {
1922                                    where_clauses.push(format!("{qk} >= ?{idx}"));
1923                                    values.push(json_to_sql(op_val));
1924                                    idx += 1;
1925                                }
1926                                "$lt" => {
1927                                    where_clauses.push(format!("{qk} < ?{idx}"));
1928                                    values.push(json_to_sql(op_val));
1929                                    idx += 1;
1930                                }
1931                                "$lte" => {
1932                                    where_clauses.push(format!("{qk} <= ?{idx}"));
1933                                    values.push(json_to_sql(op_val));
1934                                    idx += 1;
1935                                }
1936                                "$like" => {
1937                                    where_clauses.push(format!("{qk} LIKE ?{idx}"));
1938                                    let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1939                                    values.push(Box::new(p));
1940                                    idx += 1;
1941                                }
1942                                "$in" => {
1943                                    if let Some(arr) = op_val.as_array() {
1944                                        let ph: Vec<String> = arr
1945                                            .iter()
1946                                            .map(|v| {
1947                                                let p = format!("?{idx}");
1948                                                values.push(json_to_sql(v));
1949                                                idx += 1;
1950                                                p
1951                                            })
1952                                            .collect();
1953                                        if !ph.is_empty() {
1954                                            where_clauses
1955                                                .push(format!("{qk} IN ({})", ph.join(", ")));
1956                                        }
1957                                    }
1958                                }
1959                                _ => {}
1960                            }
1961                        }
1962                    } else {
1963                        where_clauses.push(format!("{qk} = ?{idx}"));
1964                        values.push(json_to_sql(val));
1965                        idx += 1;
1966                    }
1967                }
1968            }
1969        }
1970
1971        let where_sql = if where_clauses.is_empty() {
1972            String::new()
1973        } else {
1974            format!(" WHERE {}", where_clauses.join(" AND "))
1975        };
1976        if order_clause.is_empty() {
1977            order_clause = " ORDER BY \"id\"".into();
1978        }
1979
1980        let sql = format!(
1981            "SELECT * FROM {}{}{}{}",
1982            quote_ident(entity),
1983            where_sql,
1984            order_clause,
1985            limit_clause
1986        );
1987        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1988            values.iter().map(|v| v.as_ref()).collect();
1989        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1990            code: "QUERY_FAILED".into(),
1991            message: format!("Failed to prepare: {e}"),
1992        })?;
1993        let rows = stmt
1994            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1995            .map_err(|e| RuntimeError {
1996                code: "QUERY_FAILED".into(),
1997                message: format!("Query failed: {e}"),
1998            })?;
1999        Ok(rows.flatten().collect())
2000    }
2001
2002    /// Graph query using a pre-held connection (for transactions).
2003    pub fn query_graph_with_conn(
2004        &self,
2005        conn: &Connection,
2006        query: &serde_json::Value,
2007    ) -> Result<serde_json::Value, RuntimeError> {
2008        let obj = query.as_object().ok_or_else(|| RuntimeError {
2009            code: "INVALID_QUERY".into(),
2010            message: "Graph query must be a JSON object".into(),
2011        })?;
2012        let mut results = serde_json::Map::new();
2013        for (entity_name, query_opts) in obj {
2014            let _ent = self.require_entity(entity_name)?;
2015            let filter = query_opts
2016                .get("where")
2017                .cloned()
2018                .unwrap_or(serde_json::json!({}));
2019            let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
2020            results.insert(entity_name.clone(), serde_json::json!(rows));
2021        }
2022        Ok(serde_json::Value::Object(results))
2023    }
2024
2025    // -----------------------------------------------------------------------
2026    // Aggregations — count, sum, avg, min, max, group by
2027    // -----------------------------------------------------------------------
2028
2029    /// Run an aggregation query. See [`pylon_http::DataStore::aggregate`]
2030    /// for the spec shape.
2031    pub fn aggregate(
2032        &self,
2033        entity: &str,
2034        spec: &serde_json::Value,
2035    ) -> Result<serde_json::Value, RuntimeError> {
2036        if let Some(pg) = self.pg_backend() {
2037            return pylon_http::DataStore::aggregate(&pg.store, entity, spec)
2038                .map_err(data_err_to_runtime);
2039        }
2040        let ent = self.require_entity(entity)?;
2041        let conn = self.lock_read_conn()?;
2042        let obj = spec.as_object().ok_or_else(|| RuntimeError {
2043            code: "INVALID_QUERY".into(),
2044            message: "aggregate spec must be an object".into(),
2045        })?;
2046
2047        // Build the SELECT list.
2048        let mut select_parts: Vec<String> = Vec::new();
2049        let mut result_fields: Vec<String> = Vec::new();
2050
2051        if let Some(count) = obj.get("count") {
2052            match count {
2053                serde_json::Value::String(s) if s == "*" => {
2054                    select_parts.push("COUNT(*) AS count".into());
2055                    result_fields.push("count".into());
2056                }
2057                serde_json::Value::String(field) => {
2058                    validate_column_name(field, ent)?;
2059                    let alias = format!("count_{field}");
2060                    select_parts.push(format!(
2061                        "COUNT({}) AS {}",
2062                        quote_ident(field),
2063                        quote_ident(&alias)
2064                    ));
2065                    result_fields.push(alias);
2066                }
2067                _ => {}
2068            }
2069        }
2070
2071        for (fn_name, alias_prefix) in [
2072            ("sum", "sum_"),
2073            ("avg", "avg_"),
2074            ("min", "min_"),
2075            ("max", "max_"),
2076        ] {
2077            if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
2078                for field in fields {
2079                    if let Some(f) = field.as_str() {
2080                        validate_column_name(f, ent)?;
2081                        let alias = format!("{alias_prefix}{f}");
2082                        let sql_fn = fn_name.to_uppercase();
2083                        select_parts.push(format!(
2084                            "{}({}) AS {}",
2085                            sql_fn,
2086                            quote_ident(f),
2087                            quote_ident(&alias)
2088                        ));
2089                        result_fields.push(alias);
2090                    }
2091                }
2092            }
2093        }
2094
2095        // countDistinct — separate handler because COUNT(DISTINCT) is a
2096        // distinct SQL form from COUNT(field). Lets dashboards ask "how
2097        // many unique customers placed orders this month" without a
2098        // client-side post-processing pass.
2099        if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
2100            for field in fields {
2101                if let Some(f) = field.as_str() {
2102                    validate_column_name(f, ent)?;
2103                    let alias = format!("count_distinct_{f}");
2104                    select_parts.push(format!(
2105                        "COUNT(DISTINCT {}) AS {}",
2106                        quote_ident(f),
2107                        quote_ident(&alias)
2108                    ));
2109                    result_fields.push(alias);
2110                }
2111            }
2112        }
2113
2114        // Group-by fields come first in the SELECT so each row is identifiable.
2115        // Each entry is either a plain column name (string) or a date-bucket
2116        // spec — `{ field: "createdAt", bucket: "day" }`. Buckets map to
2117        // SQLite strftime patterns so aggregation keys collapse to the
2118        // bucket boundary (hour / day / week / month / year).
2119        let mut group_by: Vec<String> = Vec::new();
2120        let mut group_select: Vec<String> = Vec::new();
2121        let mut group_field_names: Vec<String> = Vec::new();
2122        if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
2123            for g in groups {
2124                if let Some(f) = g.as_str() {
2125                    validate_column_name(f, ent)?;
2126                    let quoted = quote_ident(f);
2127                    group_by.push(quoted.clone());
2128                    group_select.push(quoted);
2129                    group_field_names.push(f.to_string());
2130                } else if let Some(spec) = g.as_object() {
2131                    let field =
2132                        spec.get("field")
2133                            .and_then(|v| v.as_str())
2134                            .ok_or_else(|| RuntimeError {
2135                                code: "INVALID_QUERY".into(),
2136                                message: "groupBy object spec requires `field`".into(),
2137                            })?;
2138                    validate_column_name(field, ent)?;
2139                    let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
2140                    let fmt = match bucket {
2141                        "hour" => "%Y-%m-%d %H:00:00",
2142                        "day" => "%Y-%m-%d",
2143                        "month" => "%Y-%m",
2144                        "year" => "%Y",
2145                        "week" => "%Y-W%W",
2146                        _ => {
2147                            return Err(RuntimeError {
2148                                code: "INVALID_QUERY".into(),
2149                                message: format!(
2150                                    "bucket must be one of hour/day/week/month/year, got {bucket}"
2151                                ),
2152                            });
2153                        }
2154                    };
2155                    let alias = format!("{field}_{bucket}");
2156                    let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
2157                    group_by.push(expr.clone());
2158                    group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
2159                    group_field_names.push(alias);
2160                }
2161            }
2162        }
2163        let mut full_select = group_select.clone();
2164        full_select.extend(select_parts.iter().cloned());
2165        if full_select.is_empty() {
2166            return Err(RuntimeError {
2167                code: "INVALID_QUERY".into(),
2168                message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
2169            });
2170        }
2171
2172        // WHERE clause (reuse filter syntax, but only simple equality for now).
2173        let mut where_clauses = Vec::new();
2174        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
2175        let mut idx = 1;
2176        if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
2177            for (k, v) in where_obj {
2178                validate_column_name(k, ent)?;
2179                where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
2180                values.push(json_to_sql(v));
2181                idx += 1;
2182            }
2183        }
2184        let where_sql = if where_clauses.is_empty() {
2185            String::new()
2186        } else {
2187            format!(" WHERE {}", where_clauses.join(" AND "))
2188        };
2189
2190        let group_sql = if group_by.is_empty() {
2191            String::new()
2192        } else {
2193            format!(" GROUP BY {}", group_by.join(", "))
2194        };
2195
2196        let sql = format!(
2197            "SELECT {} FROM {}{}{}",
2198            full_select.join(", "),
2199            quote_ident(entity),
2200            where_sql,
2201            group_sql
2202        );
2203
2204        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
2205            values.iter().map(|v| v.as_ref()).collect();
2206        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
2207            code: "QUERY_FAILED".into(),
2208            message: format!("Failed to prepare aggregate: {e}"),
2209        })?;
2210
2211        let column_names: Vec<String> = {
2212            let mut v = group_field_names.clone();
2213            v.extend(result_fields.iter().cloned());
2214            v
2215        };
2216
2217        let rows = stmt
2218            .query_map(param_refs.as_slice(), |row| {
2219                let mut obj = serde_json::Map::new();
2220                for (i, name) in column_names.iter().enumerate() {
2221                    // Try int first (counts/sums), then float, then string, then null.
2222                    if let Ok(n) = row.get::<_, i64>(i) {
2223                        obj.insert(name.clone(), serde_json::Value::Number(n.into()));
2224                    } else if let Ok(f) = row.get::<_, f64>(i) {
2225                        if let Some(num) = serde_json::Number::from_f64(f) {
2226                            obj.insert(name.clone(), serde_json::Value::Number(num));
2227                        } else {
2228                            obj.insert(name.clone(), serde_json::Value::Null);
2229                        }
2230                    } else if let Ok(s) = row.get::<_, String>(i) {
2231                        obj.insert(name.clone(), serde_json::Value::String(s));
2232                    } else {
2233                        obj.insert(name.clone(), serde_json::Value::Null);
2234                    }
2235                }
2236                Ok(serde_json::Value::Object(obj))
2237            })
2238            .map_err(|e| RuntimeError {
2239                code: "QUERY_FAILED".into(),
2240                message: format!("Aggregate failed: {e}"),
2241            })?;
2242
2243        let mut result = Vec::new();
2244        for row in rows {
2245            if let Ok(val) = row {
2246                result.push(val);
2247            }
2248        }
2249        Ok(serde_json::json!({ "rows": result }))
2250    }
2251
2252    // -----------------------------------------------------------------------
2253    // Helpers
2254    // -----------------------------------------------------------------------
2255
2256    fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
2257        self.entities.get(name).ok_or_else(|| RuntimeError {
2258            code: "ENTITY_NOT_FOUND".into(),
2259            message: format!("Unknown entity: \"{name}\""),
2260        })
2261    }
2262
2263    /// Acquire the write connection. Used for INSERT, UPDATE, DELETE.
2264    /// SQLite-only — Postgres callers should never reach this (each
2265    /// public CRUD method branches at the top and dispatches to
2266    /// `PostgresDataStore` first). Returns `NOT_SQLITE_BACKEND` if
2267    /// invoked on a Postgres runtime, which indicates a missing dispatch.
2268    fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
2269        let sb = self.sqlite_backend()?;
2270        sb.write_conn.lock().map_err(|e| RuntimeError {
2271            code: "LOCK_FAILED".into(),
2272            message: format!("Failed to acquire write connection lock: {e}"),
2273        })
2274    }
2275
2276    /// Acquire a read connection. Uses the read pool if available (file-backed
2277    /// databases), otherwise falls back to the write connection (in-memory).
2278    /// Connections are selected round-robin to spread load evenly. SQLite-only.
2279    fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
2280        let sb = self.sqlite_backend()?;
2281        if !sb.read_pool.is_empty() {
2282            let idx = sb.read_counter.fetch_add(1, Ordering::Relaxed) % sb.read_pool.len();
2283            let guard = sb.read_pool[idx].lock().map_err(|e| RuntimeError {
2284                code: "LOCK_FAILED".into(),
2285                message: format!("Failed to acquire read connection: {e}"),
2286            })?;
2287            Ok(ReadConnGuard::Pooled(guard))
2288        } else {
2289            // Fall back to write connection for in-memory DBs.
2290            let guard = sb.write_conn.lock().map_err(|e| RuntimeError {
2291                code: "LOCK_FAILED".into(),
2292                message: format!("Failed to acquire connection: {e}"),
2293            })?;
2294            Ok(ReadConnGuard::Write(guard))
2295        }
2296    }
2297
2298    /// Borrow the SQLite backend, or fail with `NOT_SQLITE_BACKEND` if
2299    /// this runtime is Postgres-backed. Used by every SQLite-specific
2300    /// helper as a single point of dispatch.
2301    fn sqlite_backend(&self) -> Result<&SqliteBackend, RuntimeError> {
2302        match &self.backend {
2303            RuntimeBackend::Sqlite(sb) => Ok(sb),
2304            RuntimeBackend::Postgres(_) => Err(RuntimeError {
2305                code: "NOT_SQLITE_BACKEND".into(),
2306                message: "this operation requires a SQLite-backed Runtime".into(),
2307            }),
2308        }
2309    }
2310
2311    /// Borrow the Postgres backend, or `None` for SQLite. Used by the
2312    /// per-method dispatch at the top of each entity-CRUD function
2313    /// AND by the `DataStore` impl in `datastore.rs` to reach the
2314    /// CRDT sidecar.
2315    pub(crate) fn pg_backend(&self) -> Option<&PgBackend> {
2316        match &self.backend {
2317            RuntimeBackend::Sqlite(_) => None,
2318            RuntimeBackend::Postgres(pg) => Some(pg),
2319        }
2320    }
2321
2322    /// Borrow the underlying Postgres `DataStore` if this runtime is
2323    /// Postgres-backed. Used by the `DataStore` adapter in `datastore.rs`
2324    /// to delegate `transact`/`search` etc. without re-implementing them.
2325    /// Accessor for the underlying PostgresDataStore. Used by
2326    /// integration tests to exercise in-tx primitives directly
2327    /// without going through a TS function handler. Also useful for
2328    /// callers that need to drop down to raw PG (e.g. running an
2329    /// EXPLAIN against the live cluster from an admin tool).
2330    /// Returns None on SQLite-backed runtimes.
2331    pub fn pg_data_store_pub(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2332        self.pg_data_store()
2333    }
2334
2335    #[doc(hidden)]
2336    pub fn pg_data_store_for_tests(&self) -> &pylon_storage::pg_datastore::PostgresDataStore {
2337        self.pg_data_store().expect("pg backend")
2338    }
2339
2340    /// Test-only: run a closure inside a PG mutation tx with the
2341    /// CRDT hook installed — same code path FnOpsImpl::call uses
2342    /// for `Mutation` handlers. Lets integration tests verify the
2343    /// hook without spinning up a Bun runtime.
2344    #[doc(hidden)]
2345    pub fn run_in_pg_mutation_tx_for_tests<F, T, E>(&self, body: F) -> Result<T, E>
2346    where
2347        F: FnOnce(&dyn pylon_http::DataStore) -> Result<T, E>,
2348        E: From<pylon_http::DataError>,
2349    {
2350        let pg_backend = self.pg_backend().expect("pg backend");
2351        let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
2352            std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2353                crdt: std::sync::Arc::clone(&pg_backend.crdt),
2354                manifest: std::sync::Arc::new(self.manifest.clone()),
2355            });
2356        pg_backend
2357            .store
2358            .with_transaction_crdt(crdt_hook, body)
2359    }
2360
2361    pub(crate) fn pg_data_store(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2362        self.pg_backend().map(|pg| &pg.store)
2363    }
2364}
2365
2366// ---------------------------------------------------------------------------
2367// Helpers
2368// ---------------------------------------------------------------------------
2369
2370/// Generate a lex-sortable, monotonic-ish unique ID.
2371///
2372/// Same shape as `pylon_storage::postgres::generate_id` — fixed-width hex
2373/// of nanoseconds + 8-hex per-process counter (40 chars total). The fixed
2374/// width is what makes `WHERE id > $1 ORDER BY id` correct for cursor
2375/// pagination: variable-width hex sorts incorrectly at width boundaries
2376/// (e.g. `"ff"` lex-sorts after `"100"`).
2377/// Run `body` inside a SQLite transaction on `conn`. Commits on `Ok`,
2378/// rolls back on `Err` (or if `body` panics).
2379///
2380/// Used to make the multi-statement CRDT write paths (LoroDoc snapshot
2381/// upsert into `_pylon_crdt_snapshots` + the materialized entity row
2382/// INSERT/UPDATE + FTS / facet maintenance) atomic so a crash mid-write
2383/// can never leave the materialized view stale relative to the CRDT
2384/// snapshot. Uses unmanaged BEGIN/COMMIT/ROLLBACK rather than rusqlite's
2385/// `Transaction` API because the existing call sites borrow `conn`
2386/// through inner closures and the lifetime juggling for a `Transaction`
2387/// guard would force more refactoring than the explicit BEGIN/COMMIT.
2388///
2389/// `BEGIN IMMEDIATE` (vs the default `BEGIN DEFERRED`) takes the SQLite
2390/// reserved lock on entry instead of escalating later — matches the
2391/// pattern in `datastore.rs::transact` and avoids a SQLITE_BUSY race
2392/// where a concurrent reader prevents the lock upgrade mid-write.
2393fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
2394where
2395    F: FnOnce() -> Result<T, RuntimeError>,
2396{
2397    conn.execute("BEGIN IMMEDIATE", [])
2398        .map_err(|e| RuntimeError {
2399            code: "TX_BEGIN_FAILED".into(),
2400            message: format!("BEGIN: {e}"),
2401        })?;
2402    match body() {
2403        Ok(v) => {
2404            conn.execute("COMMIT", []).map_err(|e| RuntimeError {
2405                code: "TX_COMMIT_FAILED".into(),
2406                message: format!("COMMIT: {e}"),
2407            })?;
2408            Ok(v)
2409        }
2410        Err(e) => {
2411            // Best-effort rollback; if even ROLLBACK fails we surface
2412            // the *original* error since that's the more actionable one.
2413            let _ = conn.execute("ROLLBACK", []);
2414            Err(e)
2415        }
2416    }
2417}
2418
2419fn generate_id() -> String {
2420    use std::sync::atomic::{AtomicU32, Ordering};
2421    use std::time::{SystemTime, UNIX_EPOCH};
2422    static COUNTER: AtomicU32 = AtomicU32::new(0);
2423    let nanos = SystemTime::now()
2424        .duration_since(UNIX_EPOCH)
2425        .unwrap_or_default()
2426        .as_nanos();
2427    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2428    format!("{nanos:032x}{seq:08x}")
2429}
2430
2431/// Convert a `serde_json::Value` to a boxed `ToSql` for rusqlite.
2432fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2433    match val {
2434        serde_json::Value::Null => Box::new(rusqlite::types::Null),
2435        serde_json::Value::Bool(b) => Box::new(*b as i32),
2436        serde_json::Value::Number(n) => {
2437            if let Some(i) = n.as_i64() {
2438                Box::new(i)
2439            } else if let Some(f) = n.as_f64() {
2440                Box::new(f)
2441            } else {
2442                Box::new(n.to_string())
2443            }
2444        }
2445        serde_json::Value::String(s) => Box::new(s.clone()),
2446        other => Box::new(other.to_string()),
2447    }
2448}
2449
2450/// Convert a rusqlite row to a JSON value.
2451///
2452/// Reads columns by NAME (via the row's actual column metadata) rather
2453/// than by positional index. The previous implementation assumed the
2454/// SQLite table column order matched the manifest field order, which
2455/// silently breaks when a new field is inserted in the middle of the
2456/// manifest: SQLite's `ALTER TABLE ADD COLUMN` always appends to the
2457/// end of the table, so existing data lands in the wrong field on
2458/// every read.
2459///
2460/// `field_names` is still passed (unused in the body, kept for API
2461/// stability with callers that compute it from the manifest) — the
2462/// name set comes from the row itself now, which always matches the
2463/// SELECT's actual column shape.
2464fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2465    let mut obj = serde_json::Map::new();
2466
2467    let stmt = row.as_ref();
2468    let count = stmt.column_count();
2469    for i in 0..count {
2470        // Column names are short string slices into the prepared
2471        // statement; copy out into owned Strings before inserting into
2472        // the map (the slice borrow can't outlive the row).
2473        let name = match stmt.column_name(i) {
2474            Ok(n) => n.to_string(),
2475            Err(_) => continue,
2476        };
2477        let value = if let Ok(s) = row.get::<_, String>(i) {
2478            serde_json::Value::String(s)
2479        } else if let Ok(n) = row.get::<_, i64>(i) {
2480            serde_json::Value::Number(serde_json::Number::from(n))
2481        } else if let Ok(f) = row.get::<_, f64>(i) {
2482            serde_json::Number::from_f64(f)
2483                .map(serde_json::Value::Number)
2484                .unwrap_or(serde_json::Value::Null)
2485        } else {
2486            serde_json::Value::Null
2487        };
2488        obj.insert(name, value);
2489    }
2490
2491    serde_json::Value::Object(obj)
2492}
2493
2494#[cfg(test)]
2495mod tests {
2496    use super::*;
2497    use pylon_kernel::{ManifestField, ManifestIndex};
2498
2499    fn test_manifest() -> AppManifest {
2500        AppManifest {
2501            manifest_version: 1,
2502            name: "Test".into(),
2503            version: "0.1.0".into(),
2504            entities: vec![pylon_kernel::ManifestEntity {
2505                name: "User".into(),
2506                fields: vec![
2507                    ManifestField {
2508                        name: "email".into(),
2509                        field_type: "string".into(),
2510                        optional: false,
2511                        unique: true,
2512                        crdt: None,
2513                    },
2514                    ManifestField {
2515                        name: "displayName".into(),
2516                        field_type: "string".into(),
2517                        optional: false,
2518                        unique: false,
2519                        crdt: None,
2520                    },
2521                ],
2522                indexes: vec![ManifestIndex {
2523                    name: "user_email".into(),
2524                    fields: vec!["email".into()],
2525                    unique: true,
2526                }],
2527                relations: vec![],
2528                search: None,
2529                crdt: true,
2530            }],
2531            routes: vec![],
2532            queries: vec![],
2533            actions: vec![],
2534            policies: vec![],
2535            auth: Default::default(),
2536        }
2537    }
2538
2539    #[test]
2540    fn reset_for_tests_wipes_in_memory() {
2541        let rt = Runtime::in_memory(test_manifest()).unwrap();
2542        rt.insert(
2543            "User",
2544            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2545        )
2546        .unwrap();
2547        assert_eq!(rt.list("User").unwrap().len(), 1);
2548        rt.reset_for_tests().unwrap();
2549        assert_eq!(rt.list("User").unwrap().len(), 0);
2550    }
2551
2552    #[test]
2553    fn reset_for_tests_refuses_file_db() {
2554        let dir = std::env::temp_dir().join("pylon-reset-refuse");
2555        let _ = std::fs::create_dir_all(&dir);
2556        let db_path = dir.join("db.sqlite");
2557        let _ = std::fs::remove_file(&db_path);
2558        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2559        let err = rt.reset_for_tests().unwrap_err();
2560        assert_eq!(err.code, "RESET_REFUSED");
2561        let _ = std::fs::remove_file(&db_path);
2562    }
2563
2564    #[test]
2565    fn insert_and_get() {
2566        let rt = Runtime::in_memory(test_manifest()).unwrap();
2567        let id = rt
2568            .insert(
2569                "User",
2570                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2571            )
2572            .unwrap();
2573        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2574        assert_eq!(row["email"], "a@b.com");
2575    }
2576
2577    /// Regression: when a new field is added in the middle of a manifest,
2578    /// SQLite ALTER TABLE ADD COLUMN appends it to the end of the table.
2579    /// The previous `row_to_json` read columns by positional index in
2580    /// manifest order, so existing data shifted into the wrong fields
2581    /// on every read (createdAt's value showed up as the new field's,
2582    /// and vice versa). row_to_json now reads by column name from the
2583    /// row's own metadata, so the bug can't recur regardless of
2584    /// migration order.
2585    #[test]
2586    fn row_to_json_handles_columns_added_out_of_manifest_order() {
2587        // Manifest: id, email, displayName, avatarColor, createdAt
2588        let mut manifest = test_manifest();
2589        manifest.entities[0].fields = vec![
2590            ManifestField {
2591                name: "email".into(),
2592                field_type: "string".into(),
2593                optional: false,
2594                unique: true,
2595                crdt: None,
2596            },
2597            ManifestField {
2598                name: "displayName".into(),
2599                field_type: "string".into(),
2600                optional: false,
2601                unique: false,
2602                crdt: None,
2603            },
2604            ManifestField {
2605                name: "avatarColor".into(),
2606                field_type: "string".into(),
2607                optional: true,
2608                unique: false,
2609                crdt: None,
2610            },
2611            ManifestField {
2612                name: "createdAt".into(),
2613                field_type: "datetime".into(),
2614                optional: true,
2615                unique: false,
2616                crdt: None,
2617            },
2618        ];
2619        // Important: turn off CRDT mode for this test — CRDT mode writes
2620        // the projection back to SQLite explicitly per-field, so it
2621        // wouldn't exercise the column-order bug we're regressing
2622        // against. The bug bites the legacy path that still does
2623        // `INSERT (id, email, displayName, ...) VALUES (...)` and then
2624        // `SELECT * ... → row_to_json` to read it back.
2625        manifest.entities[0].crdt = false;
2626        let rt = Runtime::in_memory(manifest).unwrap();
2627        let id = rt
2628            .insert(
2629                "User",
2630                &serde_json::json!({
2631                    "email": "a@b.com",
2632                    "displayName": "Alice",
2633                    "avatarColor": "#abc",
2634                    "createdAt": "2026-01-01T00:00:00Z",
2635                }),
2636            )
2637            .unwrap();
2638
2639        // Simulate an ALTER TABLE ADD COLUMN that appends a new field
2640        // at the end of the SQLite table even though the manifest
2641        // places it in the middle. This is the exact shape of what
2642        // happens when a user adds a new field between existing ones
2643        // and pylon dev migrates the table forward.
2644        {
2645            let conn = rt.lock_write_conn().unwrap();
2646            conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2647                .unwrap();
2648            conn.execute(
2649                "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2650                rusqlite::params!["hashed-password", &id],
2651            )
2652            .unwrap();
2653        }
2654        // Update the in-memory manifest to reflect the new field
2655        // sitting between avatarColor and createdAt — this is what the
2656        // regenerated manifest would look like.
2657        // (We mutate via the storage path to mirror the actual flow.)
2658
2659        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2660        // The crucial assertions: each column maps to its own value,
2661        // not the value of whichever column happens to share its
2662        // SQLite position.
2663        assert_eq!(row["email"], "a@b.com");
2664        assert_eq!(row["displayName"], "Alice");
2665        assert_eq!(row["avatarColor"], "#abc");
2666        assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2667        assert_eq!(row["passwordHash"], "hashed-password");
2668    }
2669
2670    /// CRDT-mode entities (the default) populate the sidecar snapshot
2671    /// table on every write — the LoroDoc is the source of truth, the
2672    /// SQLite row is the materialized projection. This proves the CRDT
2673    /// branch in `insert` actually fires.
2674    #[test]
2675    fn crdt_default_writes_through_loro_store() {
2676        let rt = Runtime::in_memory(test_manifest()).unwrap();
2677        let id = rt
2678            .insert(
2679                "User",
2680                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2681            )
2682            .unwrap();
2683
2684        // Sidecar contains exactly one snapshot for the new row.
2685        let conn = rt.lock_write_conn().unwrap();
2686        let snap_count: i64 = conn
2687            .query_row(
2688                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2689                 WHERE entity = ?1 AND row_id = ?2",
2690                rusqlite::params!["User", &id],
2691                |r| r.get(0),
2692            )
2693            .unwrap();
2694        assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2695
2696        // Loro doc is cached in memory after the write — proves
2697        // get_or_hydrate ran during apply_patch.
2698        assert!(rt.crdt_store().cached_rows() >= 1);
2699
2700        // SQLite materialized view has the projected row.
2701        drop(conn);
2702        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2703        assert_eq!(row["email"], "x@y.com");
2704        assert_eq!(row["displayName"], "Eric");
2705    }
2706
2707    /// Updates write through the LoroDoc as well — verifies the sidecar
2708    /// snapshot grows (Loro tracks new ops) and the materialized row
2709    /// reflects the new value.
2710    #[test]
2711    fn crdt_update_persists_new_snapshot() {
2712        let rt = Runtime::in_memory(test_manifest()).unwrap();
2713        let id = rt
2714            .insert(
2715                "User",
2716                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2717            )
2718            .unwrap();
2719
2720        let snap_after_insert: Vec<u8> = {
2721            let conn = rt.lock_write_conn().unwrap();
2722            conn.query_row(
2723                "SELECT snapshot FROM _pylon_crdt_snapshots
2724                 WHERE entity = 'User' AND row_id = ?1",
2725                rusqlite::params![&id],
2726                |r| r.get(0),
2727            )
2728            .unwrap()
2729        };
2730
2731        rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2732            .unwrap();
2733
2734        let snap_after_update: Vec<u8> = {
2735            let conn = rt.lock_write_conn().unwrap();
2736            conn.query_row(
2737                "SELECT snapshot FROM _pylon_crdt_snapshots
2738                 WHERE entity = 'User' AND row_id = ?1",
2739                rusqlite::params![&id],
2740                |r| r.get(0),
2741            )
2742            .unwrap()
2743        };
2744
2745        assert_ne!(
2746            snap_after_insert, snap_after_update,
2747            "snapshot bytes should change after an update"
2748        );
2749
2750        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2751        assert_eq!(row["displayName"], "Eric C");
2752        assert_eq!(row["email"], "x@y.com");
2753    }
2754
2755    /// Regression: when the SQL INSERT step inside Runtime::insert fails
2756    /// (UNIQUE-constraint violation here), the LoroDoc snapshot must
2757    /// also roll back — neither half lands. Previously the LoroStore
2758    /// wrote first and committed independently, so a doomed INSERT left
2759    /// a sidecar row pointing at a doc that the materialized table
2760    /// never knew about.
2761    #[test]
2762    fn crdt_insert_rolls_back_when_sql_step_fails() {
2763        let rt = Runtime::in_memory(test_manifest()).unwrap();
2764        // Seed a row.
2765        rt.insert(
2766            "User",
2767            &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2768        )
2769        .unwrap();
2770
2771        // Snapshot the sidecar row count BEFORE the failing insert.
2772        let snap_count_before: i64 = {
2773            let conn = rt.lock_write_conn().unwrap();
2774            conn.query_row(
2775                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2776                [],
2777                |r| r.get(0),
2778            )
2779            .unwrap()
2780        };
2781
2782        // Attempt a duplicate-email insert. SQL UNIQUE rejects.
2783        let err = rt
2784            .insert(
2785                "User",
2786                &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2787            )
2788            .expect_err("duplicate email must fail");
2789        assert_eq!(err.code, "INSERT_FAILED");
2790
2791        // Sidecar row count unchanged — the LoroDoc snapshot the CRDT
2792        // path wrote was rolled back along with the failed SQL INSERT.
2793        let snap_count_after: i64 = {
2794            let conn = rt.lock_write_conn().unwrap();
2795            conn.query_row(
2796                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2797                [],
2798                |r| r.get(0),
2799            )
2800            .unwrap()
2801        };
2802        assert_eq!(
2803            snap_count_after, snap_count_before,
2804            "failed insert should not leave a sidecar snapshot behind"
2805        );
2806    }
2807
2808    /// Entities with `crdt: false` skip the LoroDoc entirely — no sidecar
2809    /// row, no Loro cache entry. Proves the opt-out actually opts out.
2810    #[test]
2811    fn crdt_false_skips_loro_store() {
2812        let mut manifest = test_manifest();
2813        // Flip the User entity to LWW-only mode.
2814        manifest.entities[0].crdt = false;
2815        let rt = Runtime::in_memory(manifest).unwrap();
2816
2817        let id = rt
2818            .insert(
2819                "User",
2820                &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2821            )
2822            .unwrap();
2823
2824        let conn = rt.lock_write_conn().unwrap();
2825        let snap_count: i64 = conn
2826            .query_row(
2827                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2828                 WHERE entity = 'User' AND row_id = ?1",
2829                rusqlite::params![&id],
2830                |r| r.get(0),
2831            )
2832            .unwrap();
2833        assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2834        assert_eq!(
2835            rt.crdt_store().cached_rows(),
2836            0,
2837            "crdt:false should not warm the cache"
2838        );
2839
2840        // SQLite path still works — the row landed via the legacy
2841        // direct-write path.
2842        drop(conn);
2843        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2844        assert_eq!(row["email"], "lww@example.com");
2845    }
2846
2847    #[test]
2848    fn list_entities() {
2849        let rt = Runtime::in_memory(test_manifest()).unwrap();
2850        rt.insert(
2851            "User",
2852            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2853        )
2854        .unwrap();
2855        rt.insert(
2856            "User",
2857            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2858        )
2859        .unwrap();
2860        let rows = rt.list("User").unwrap();
2861        assert_eq!(rows.len(), 2);
2862    }
2863
2864    #[test]
2865    fn update_entity() {
2866        let rt = Runtime::in_memory(test_manifest()).unwrap();
2867        let id = rt
2868            .insert(
2869                "User",
2870                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2871            )
2872            .unwrap();
2873        let updated = rt
2874            .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2875            .unwrap();
2876        assert!(updated);
2877        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2878        assert_eq!(row["displayName"], "Updated");
2879    }
2880
2881    #[test]
2882    fn delete_entity() {
2883        let rt = Runtime::in_memory(test_manifest()).unwrap();
2884        let id = rt
2885            .insert(
2886                "User",
2887                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2888            )
2889            .unwrap();
2890        let deleted = rt.delete("User", &id).unwrap();
2891        assert!(deleted);
2892        assert!(rt.get_by_id("User", &id).unwrap().is_none());
2893    }
2894
2895    #[test]
2896    fn lookup_by_field() {
2897        let rt = Runtime::in_memory(test_manifest()).unwrap();
2898        rt.insert(
2899            "User",
2900            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2901        )
2902        .unwrap();
2903        let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2904        assert_eq!(row["displayName"], "A");
2905    }
2906
2907    #[test]
2908    fn unknown_entity_returns_error() {
2909        let rt = Runtime::in_memory(test_manifest()).unwrap();
2910        let err = rt.list("Nonexistent").unwrap_err();
2911        assert_eq!(err.code, "ENTITY_NOT_FOUND");
2912    }
2913
2914    #[test]
2915    fn insert_rejects_unknown_column() {
2916        let rt = Runtime::in_memory(test_manifest()).unwrap();
2917        let err = rt
2918            .insert(
2919                "User",
2920                &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2921            )
2922            .unwrap_err();
2923        assert_eq!(err.code, "INVALID_COLUMN");
2924    }
2925
2926    #[test]
2927    fn update_rejects_unknown_column() {
2928        let rt = Runtime::in_memory(test_manifest()).unwrap();
2929        let id = rt
2930            .insert(
2931                "User",
2932                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2933            )
2934            .unwrap();
2935        let err = rt
2936            .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2937            .unwrap_err();
2938        assert_eq!(err.code, "INVALID_COLUMN");
2939    }
2940
2941    #[test]
2942    fn lookup_rejects_unknown_column() {
2943        let rt = Runtime::in_memory(test_manifest()).unwrap();
2944        let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2945        assert_eq!(err.code, "INVALID_COLUMN");
2946    }
2947
2948    #[test]
2949    fn query_filtered_rejects_unknown_column() {
2950        let rt = Runtime::in_memory(test_manifest()).unwrap();
2951        let err = rt
2952            .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2953            .unwrap_err();
2954        assert_eq!(err.code, "INVALID_COLUMN");
2955    }
2956
2957    #[test]
2958    fn query_filtered_rejects_unknown_order_column() {
2959        let rt = Runtime::in_memory(test_manifest()).unwrap();
2960        let err = rt
2961            .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2962            .unwrap_err();
2963        assert_eq!(err.code, "INVALID_COLUMN");
2964    }
2965
2966    #[test]
2967    fn query_filtered_sanitizes_order_direction() {
2968        let rt = Runtime::in_memory(test_manifest()).unwrap();
2969        rt.insert(
2970            "User",
2971            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2972        )
2973        .unwrap();
2974        // Even a malicious direction value should be normalized to ASC.
2975        let rows = rt
2976            .query_filtered(
2977                "User",
2978                &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2979            )
2980            .unwrap();
2981        assert_eq!(rows.len(), 1);
2982    }
2983
2984    #[test]
2985    fn in_memory_has_no_read_pool() {
2986        let rt = Runtime::in_memory(test_manifest()).unwrap();
2987        assert_eq!(rt.read_pool_size(), 0);
2988    }
2989
2990    #[test]
2991    fn open_creates_read_pool() {
2992        let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2993        std::fs::create_dir_all(&dir).unwrap();
2994        let db_path = dir.join("test_read_pool.db");
2995
2996        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2997        assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
2998
2999        // Write then read through the pool.
3000        let id = rt
3001            .insert(
3002                "User",
3003                &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
3004            )
3005            .unwrap();
3006        let row = rt.get_by_id("User", &id).unwrap().unwrap();
3007        assert_eq!(row["email"], "pool@test.com");
3008
3009        // Clean up.
3010        let _ = std::fs::remove_dir_all(&dir);
3011    }
3012
3013    #[test]
3014    fn concurrent_reads_dont_block_on_write() {
3015        use std::sync::Arc;
3016
3017        let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
3018        std::fs::create_dir_all(&dir).unwrap();
3019        let db_path = dir.join("test_concurrent.db");
3020
3021        let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
3022
3023        // Seed some data so reads have something to return.
3024        rt.insert(
3025            "User",
3026            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
3027        )
3028        .unwrap();
3029        rt.insert(
3030            "User",
3031            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
3032        )
3033        .unwrap();
3034
3035        // Hold the write lock to simulate a long write.
3036        let write_guard = rt.lock_write_conn().unwrap();
3037
3038        // Spawn reader threads that should succeed despite the held write lock.
3039        let mut handles = Vec::new();
3040        for _ in 0..4 {
3041            let rt_clone = Arc::clone(&rt);
3042            handles.push(std::thread::spawn(move || {
3043                let rows = rt_clone.list("User").unwrap();
3044                assert_eq!(rows.len(), 2);
3045            }));
3046        }
3047
3048        for h in handles {
3049            h.join().expect("reader thread panicked");
3050        }
3051
3052        // Release the write lock.
3053        drop(write_guard);
3054
3055        // Clean up.
3056        let _ = std::fs::remove_dir_all(&dir);
3057    }
3058}