Skip to main content

pylon_runtime/
lib.rs

1pub mod account_backend;
2pub mod api_key_backend;
3pub mod cache_handlers;
4pub mod cache_server;
5pub mod config;
6pub mod cron;
7pub mod datastore;
8pub mod ip_limit;
9pub mod job_store;
10pub mod jobs;
11pub mod log;
12pub mod loro_store;
13pub mod pg_loro_store;
14pub mod magic_code_backend;
15pub mod metrics;
16pub mod oauth_backend;
17pub mod org_backend;
18pub mod verification_backend;
19pub mod openapi;
20pub mod presence;
21pub mod pubsub;
22pub mod rate_limit;
23pub mod resp;
24pub mod resp_server;
25pub mod rooms;
26pub mod scheduler;
27pub mod server;
28pub mod session_backend;
29pub mod shard_ws;
30pub mod sse;
31pub mod tls;
32pub mod workflow_store;
33pub mod workflows;
34pub mod ws;
35
36use std::collections::HashMap;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::Mutex;
39
40use pylon_kernel::{AppManifest, ManifestEntity};
41use rusqlite::Connection;
42
43// ---------------------------------------------------------------------------
44// Runtime errors
45// ---------------------------------------------------------------------------
46
47#[derive(Debug, Clone)]
48pub struct RuntimeError {
49    pub code: String,
50    pub message: String,
51}
52
53impl std::fmt::Display for RuntimeError {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        write!(f, "[{}] {}", self.code, self.message)
56    }
57}
58
59impl std::error::Error for RuntimeError {}
60
61/// Lift a `DataError` (the cross-crate error type for PG `DataStore`
62/// operations) into a `RuntimeError`. Used by `PostgresDataStore`
63/// closure bounds (`with_client`, `with_transaction`) so callers in
64/// the runtime can propagate PG errors with their native error type.
65impl From<pylon_http::DataError> for RuntimeError {
66    fn from(e: pylon_http::DataError) -> Self {
67        RuntimeError {
68            code: e.code,
69            message: e.message,
70        }
71    }
72}
73
74// ---------------------------------------------------------------------------
75// SQL safety helpers
76// ---------------------------------------------------------------------------
77
78/// Quote a SQL identifier with double quotes to prevent injection.
79/// Any embedded double quotes are escaped by doubling them (SQL standard).
80fn quote_ident(name: &str) -> String {
81    format!("\"{}\"", name.replace('"', "\"\""))
82}
83
84/// Validate that `name` is a known column on the given entity.
85/// Always allows "id" (the primary key). Returns an error listing valid
86/// columns when validation fails.
87fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
88    if name == "id" {
89        return Ok(());
90    }
91    if entity.fields.iter().any(|f| f.name == name) {
92        return Ok(());
93    }
94    Err(RuntimeError {
95        code: "INVALID_COLUMN".into(),
96        message: format!(
97            "Unknown column \"{}\" -- valid columns: id, {}",
98            name,
99            entity
100                .fields
101                .iter()
102                .map(|f| f.name.as_str())
103                .collect::<Vec<_>>()
104                .join(", ")
105        ),
106    })
107}
108
109// ---------------------------------------------------------------------------
110// Connection tuning
111// ---------------------------------------------------------------------------
112
113/// Apply the production pragma set on a SQLite connection. Identical
114/// values to `pylon_storage::sqlite::tune_connection` — kept here too
115/// because the Runtime opens its own connections directly (write +
116/// read pool) without going through the storage adapter.
117///
118/// See `crates/storage/src/sqlite.rs` for the rationale on each
119/// pragma. Skipping it on writes drops throughput by 5–10×.
120fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
121    let pragmas: &[(&str, &str)] = if in_memory {
122        &[
123            ("temp_store", "MEMORY"),
124            ("cache_size", "-65536"),
125            ("foreign_keys", "ON"),
126        ]
127    } else {
128        &[
129            ("journal_mode", "WAL"),
130            ("synchronous", "NORMAL"),
131            ("cache_size", "-65536"),
132            ("mmap_size", "268435456"),
133            ("temp_store", "MEMORY"),
134            ("busy_timeout", "5000"),
135            ("foreign_keys", "ON"),
136            ("wal_autocheckpoint", "1000"),
137        ]
138    };
139    for (key, value) in pragmas {
140        let _ = conn.pragma_update(None, key, value);
141    }
142}
143
144// ---------------------------------------------------------------------------
145// Read connection guard
146// ---------------------------------------------------------------------------
147
148/// A guard that dereferences to a `Connection`, abstracting over whether
149/// it came from the read pool or fell back to the write connection.
150enum ReadConnGuard<'a> {
151    Pooled(std::sync::MutexGuard<'a, Connection>),
152    Write(std::sync::MutexGuard<'a, Connection>),
153}
154
155impl<'a> std::ops::Deref for ReadConnGuard<'a> {
156    type Target = Connection;
157    fn deref(&self) -> &Connection {
158        match self {
159            ReadConnGuard::Pooled(g) => g,
160            ReadConnGuard::Write(g) => g,
161        }
162    }
163}
164
165// ---------------------------------------------------------------------------
166// Runtime — the core execution engine
167// ---------------------------------------------------------------------------
168
169/// A manifest-driven runtime that executes CRUD operations against an
170/// underlying data store. Two backends are supported:
171///
172/// - **SQLite** (default): single-process, file-or-memory, with a write
173///   mutex + read pool, FTS5 search, and per-row LoroDoc CRDT snapshots.
174/// - **Postgres**: live cluster, suitable for multi-replica deployments.
175///   Routes entity CRUD through [`pylon_storage::pg_datastore::PostgresDataStore`].
176///   CRDT mode and FTS5-shaped search are SQLite-only at this layer; the
177///   Postgres backend returns `NOT_SUPPORTED` for those paths and the router
178///   degrades to JSON change events (no binary CRDT broadcasts).
179///
180/// Pick a backend by passing a `postgres://` URL to [`Runtime::open`]; any
181/// other string is treated as a SQLite filesystem path.
182pub struct Runtime {
183    backend: RuntimeBackend,
184    manifest: AppManifest,
185    entities: HashMap<String, ManifestEntity>,
186    /// True only for the SQLite in-memory variant. Postgres mode reports false.
187    /// Gates the test-reset endpoint — a false positive here would let
188    /// `/api/__test__/reset` truncate real tables.
189    is_in_memory: bool,
190}
191
192/// Backend storage for entity CRUD. SQLite variant owns the connection
193/// pool and CRDT cache; Postgres variant wraps a `PostgresDataStore`.
194enum RuntimeBackend {
195    Sqlite(SqliteBackend),
196    Postgres(PgBackend),
197}
198
199/// SQLite-backed entity store. WAL mode allows one writer and multiple
200/// concurrent readers — the struct exploits this with a single write
201/// connection behind a mutex plus a pool of read-only connections.
202struct SqliteBackend {
203    /// Write connection — single mutex, serializes writes.
204    write_conn: Mutex<Connection>,
205    /// Read connections — pool of connections for concurrent reads.
206    /// Empty for in-memory databases where extra connections are not possible.
207    read_pool: Vec<Mutex<Connection>>,
208    /// Counter for round-robin read pool selection.
209    read_counter: AtomicUsize,
210    /// Per-row LoroDoc cache + sidecar persistence. Used for entities with
211    /// `crdt: true` (the default). Reads still hit SQLite directly via the
212    /// read pool — the LoroDoc just produces the projected JSON that gets
213    /// materialized into SQLite columns on every write.
214    crdt: crate::loro_store::LoroStore,
215}
216
217/// Postgres-backed entity store. Wraps `PostgresDataStore` from
218/// pylon-storage and delegates the `DataStore` surface directly.
219pub(crate) struct PgBackend {
220    pub(crate) store: pylon_storage::pg_datastore::PostgresDataStore,
221    /// Per-row LoroDoc snapshot store for entities with `crdt: true`.
222    /// Arc'd so the runtime layer can hand a clone to PgCrdtHookImpl
223    /// (the bridge that lets PgTxStore call back into CRDT machinery
224    /// from inside a held tx).
225    pub(crate) crdt: std::sync::Arc<crate::pg_loro_store::PgLoroStore>,
226}
227
228/// Number of read-only connections to open in the pool.
229const READ_POOL_SIZE: usize = 4;
230
231/// True iff `url` is a Postgres connection string. Treats `postgres://`,
232/// `postgresql://`, and the ambient-credentials forms as PG; everything
233/// else is interpreted as a SQLite filesystem path.
234fn is_postgres_url(url: &str) -> bool {
235    let lower = url.to_ascii_lowercase();
236    lower.starts_with("postgres://") || lower.starts_with("postgresql://")
237}
238
239/// Convert a `pylon_http::DataError` (returned by `PostgresDataStore`)
240/// into the runtime's error type. The codes round-trip; only the type
241/// changes.
242fn data_err_to_runtime(e: pylon_http::DataError) -> RuntimeError {
243    RuntimeError {
244        code: e.code,
245        message: e.message,
246    }
247}
248
249impl Runtime {
250    /// Open a runtime against either a SQLite file path or a Postgres URL.
251    ///
252    /// Backend selection is by URL prefix:
253    /// - `postgres://...` or `postgresql://...` → Postgres (requires the
254    ///   `postgres-live` feature on `pylon-storage`, enabled by default).
255    /// - Anything else → SQLite, treating the string as a filesystem path
256    ///   (`":memory:"` works via `Runtime::in_memory` instead).
257    pub fn open(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
258        if is_postgres_url(url) {
259            Self::open_postgres(url, manifest)
260        } else {
261            let conn = Connection::open(url).map_err(|e| RuntimeError {
262                code: "RUNTIME_OPEN_FAILED".into(),
263                message: format!("Failed to open database: {e}"),
264            })?;
265            Self::from_connection(conn, manifest, false)
266        }
267    }
268
269    /// Open a runtime backed by a live Postgres cluster.
270    ///
271    /// Schema must be applied separately via `pylon migrate` / the
272    /// storage adapter's plan apply path — Runtime does not auto-create
273    /// tables on Postgres (in contrast to SQLite, where `from_connection`
274    /// runs CREATE TABLE IF NOT EXISTS on every open). This matches how
275    /// production Postgres deployments are typically managed: schema is
276    /// migrated via a controlled, observable step, not as a side effect
277    /// of the server starting up.
278    pub fn open_postgres(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
279        let store = pylon_storage::pg_datastore::PostgresDataStore::connect(url, manifest.clone())
280            .map_err(data_err_to_runtime)?;
281        // Bootstrap the CRDT sidecar table on every open. Idempotent
282        // (`CREATE TABLE IF NOT EXISTS`); same shape as the SQLite
283        // path's `ensure_sidecar` call. Without this, the first
284        // CRDT-mode write would error because `_pylon_crdt_snapshots`
285        // doesn't exist yet on a fresh PG database.
286        store.with_client(|c| crate::pg_loro_store::ensure_sidecar(c)).map_err(|e| {
287            RuntimeError {
288                code: "CRDT_SIDECAR_BOOTSTRAP_FAILED".into(),
289                message: format!("ensure pg crdt sidecar: {e}"),
290            }
291        })?;
292        let entities: HashMap<String, ManifestEntity> = manifest
293            .entities
294            .iter()
295            .map(|e| (e.name.clone(), e.clone()))
296            .collect();
297        Ok(Self {
298            backend: RuntimeBackend::Postgres(PgBackend {
299                store,
300                crdt: std::sync::Arc::new(crate::pg_loro_store::PgLoroStore::new()),
301            }),
302            manifest,
303            entities,
304            is_in_memory: false,
305        })
306    }
307
308    /// Returns true if this runtime is backed by an in-memory SQLite DB.
309    ///
310    /// Stored at open time rather than queried via `conn.path()` because
311    /// the path-based check conflates "no filename" with "in-memory":
312    /// `Connection::open("")` yields a file-backed DB with empty path,
313    /// and would falsely pass as in-memory. Since we always know at
314    /// construction time which constructor was used, track the bit.
315    ///
316    /// Gates the test-reset endpoint — a false positive here would let
317    /// `/api/__test__/reset` truncate real tables.
318    pub fn is_in_memory(&self) -> bool {
319        self.is_in_memory
320    }
321
322    /// Filesystem path to the SQLite database, if this runtime is file-backed.
323    /// Returns `None` for in-memory runtimes AND Postgres runtimes (no local
324    /// file). Used by the server bootstrap to derive companion paths
325    /// (session store, change log persistence) without requiring the caller
326    /// to pass them in.
327    pub fn db_path(&self) -> Option<String> {
328        if self.is_in_memory {
329            return None;
330        }
331        let sb = match &self.backend {
332            RuntimeBackend::Sqlite(sb) => sb,
333            RuntimeBackend::Postgres(_) => return None,
334        };
335        let conn = sb.write_conn.lock().ok()?;
336        conn.path().filter(|p| !p.is_empty()).map(String::from)
337    }
338
339    /// Drop every row from every entity table. Intended for test harnesses
340    /// that call `/api/__test__/reset` between cases; refuses to run on
341    /// anything but an in-memory database.
342    ///
343    /// Does NOT drop the tables themselves — schema stays, indexes stay,
344    /// triggers stay. Just truncates user data + the change log.
345    pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
346        if !self.is_in_memory() {
347            return Err(RuntimeError {
348                code: "RESET_REFUSED".into(),
349                message: "reset_for_tests is only available on in-memory databases".into(),
350            });
351        }
352        let conn = self.lock_write_conn()?;
353        let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
354        for name in entity_names {
355            let sql = format!("DELETE FROM {}", quote_ident(&name));
356            let _ = conn.execute(&sql, []);
357            // Also clear any FTS5 shadow table if present.
358            let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
359            let _ = conn.execute(&fts_sql, []);
360        }
361        Ok(())
362    }
363
364    /// Create an in-memory SQLite-backed runtime (useful for tests and
365    /// benchmarks). For Postgres-backed equivalents, use `open_postgres`
366    /// with a test-cluster URL.
367    pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
368        let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
369            code: "RUNTIME_OPEN_FAILED".into(),
370            message: format!("Failed to open in-memory database: {e}"),
371        })?;
372        Self::from_connection(conn, manifest, true)
373    }
374
375    fn from_connection(
376        conn: Connection,
377        manifest: AppManifest,
378        is_in_memory: bool,
379    ) -> Result<Self, RuntimeError> {
380        // Apply the production pragma set on the write connection.
381        tune_runtime_connection(&conn, is_in_memory);
382
383        // Build entity lookup map.
384        let entities: HashMap<String, ManifestEntity> = manifest
385            .entities
386            .iter()
387            .map(|e| (e.name.clone(), e.clone()))
388            .collect();
389
390        // Create tables for all entities.
391        for entity in &manifest.entities {
392            let fields: Vec<String> = entity
393                .fields
394                .iter()
395                .map(|f| {
396                    let col_type = match f.field_type.as_str() {
397                        "int" => "INTEGER",
398                        "float" => "REAL",
399                        "bool" => "INTEGER",
400                        _ => "TEXT",
401                    };
402                    let not_null = if f.optional { "" } else { " NOT NULL" };
403                    let unique = if f.unique { " UNIQUE" } else { "" };
404                    format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
405                })
406                .collect();
407
408            let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
409            cols.extend(fields);
410            let sql = format!(
411                "CREATE TABLE IF NOT EXISTS {} ({})",
412                quote_ident(&entity.name),
413                cols.join(", ")
414            );
415            conn.execute(&sql, []).map_err(|e| RuntimeError {
416                code: "SCHEMA_INIT_FAILED".into(),
417                message: format!("Failed to create table {}: {e}", entity.name),
418            })?;
419
420            // Create indexes.
421            for idx in &entity.indexes {
422                let unique_kw = if idx.unique { "UNIQUE " } else { "" };
423                let quoted_fields: Vec<String> =
424                    idx.fields.iter().map(|f| quote_ident(f)).collect();
425                let idx_sql = format!(
426                    "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
427                    quote_ident(&idx.name),
428                    quote_ident(&entity.name),
429                    quoted_fields.join(", ")
430                );
431                conn.execute(&idx_sql, []).ok();
432            }
433
434            // Create an FTS5 virtual table over all text-ish fields so clients
435            // can do full-text search via the `$search` query operator.
436            //
437            // Fields that look like "string" / "richtext" / "text" are indexed.
438            // The FTS table is a contentless external-content table pointed at
439            // the entity table, so SQLite keeps it consistent via triggers we
440            // install below.
441            let text_fields: Vec<&str> = entity
442                .fields
443                .iter()
444                .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
445                .map(|f| f.name.as_str())
446                .collect();
447            if !text_fields.is_empty() {
448                let fts_name = format!("{}_fts", entity.name);
449                let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
450                let fts_sql = format!(
451                    "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
452                    quote_ident(&fts_name),
453                    quoted_cols.join(", "),
454                    quote_ident(&entity.name),
455                );
456                // FTS5 may not be compiled in; ignore errors so those builds
457                // still work (queries using $search will return empty).
458                let fts_ok = conn.execute(&fts_sql, []).is_ok();
459
460                if fts_ok {
461                    // Sync triggers: keep FTS index current on INSERT/UPDATE/DELETE.
462                    //
463                    // Subtle bug fixed: the trigger NAME must be built from
464                    // the raw `fts_name` + suffix and THEN quoted once.
465                    // Previously this code quoted `fts_name` first and then
466                    // appended `_ai`/`_ad`/`_au` AFTER the closing quote,
467                    // producing invalid SQL like `"foo_fts"_ai`. The
468                    // `.ok()` after execute silently ate the error, so the
469                    // triggers were never created and FTS stayed out of
470                    // sync on writes.
471                    let tbl = quote_ident(&entity.name);
472                    let ftb = quote_ident(&fts_name);
473                    let cols_list = quoted_cols.join(", ");
474                    let new_list: Vec<String> = text_fields
475                        .iter()
476                        .map(|f| format!("new.{}", quote_ident(f)))
477                        .collect();
478                    let old_list: Vec<String> = text_fields
479                        .iter()
480                        .map(|f| format!("old.{}", quote_ident(f)))
481                        .collect();
482
483                    let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
484                    let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
485                    let trigger_au = quote_ident(&format!("{}_au", fts_name));
486
487                    let trigger_ins = format!(
488                        "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
489                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
490                        new_vals = new_list.join(", "),
491                    );
492                    let trigger_del = format!(
493                        "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
494                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
495                        old_vals = old_list.join(", "),
496                    );
497                    let trigger_upd = format!(
498                        "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
499                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
500                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
501                        new_vals = new_list.join(", "),
502                        old_vals = old_list.join(", "),
503                    );
504                    // Log failures instead of silently dropping — FTS going
505                    // stale should be visible to operators.
506                    for (label, sql) in [
507                        ("ai", &trigger_ins),
508                        ("ad", &trigger_del),
509                        ("au", &trigger_upd),
510                    ] {
511                        if let Err(e) = conn.execute(sql, []) {
512                            tracing::warn!(
513                                "[fts] failed to create {label} trigger for {}: {e}",
514                                entity.name
515                            );
516                        }
517                    }
518                }
519            }
520        }
521
522        // Open read-only connection pool for file-backed databases.
523        // In-memory databases cannot share connections, so the pool stays empty
524        // and reads fall back to the write connection.
525        let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
526
527        let read_pool = if let Some(ref path) = db_path {
528            let mut pool = Vec::with_capacity(READ_POOL_SIZE);
529            for _ in 0..READ_POOL_SIZE {
530                let read_conn = Connection::open_with_flags(
531                    path,
532                    rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
533                        | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
534                )
535                .map_err(|e| RuntimeError {
536                    code: "POOL_OPEN_FAILED".into(),
537                    message: format!("Failed to open read connection: {e}"),
538                })?;
539                tune_runtime_connection(&read_conn, false);
540                pool.push(Mutex::new(read_conn));
541            }
542            pool
543        } else {
544            // In-memory DB — no separate read connections possible.
545            Vec::new()
546        };
547
548        // Sidecar table for CRDT snapshots — created always so toggling
549        // `crdt: true` on an entity post-deploy doesn't need a migration.
550        crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
551            code: "CRDT_SIDECAR_FAILED".into(),
552            message: format!("create CRDT sidecar table: {e}"),
553        })?;
554
555        Ok(Self {
556            backend: RuntimeBackend::Sqlite(SqliteBackend {
557                write_conn: Mutex::new(conn),
558                read_pool,
559                read_counter: AtomicUsize::new(0),
560                crdt: crate::loro_store::LoroStore::new(),
561            }),
562            manifest,
563            entities,
564            is_in_memory,
565        })
566    }
567
568    /// Create the search index tables (`_facet_bitmap`, per-entity
569    /// `_fts_<Entity>`, and a covering index for each declared
570    /// sortable field) for every searchable entity in the manifest.
571    ///
572    /// Production deployments do this via the storage adapter's
573    /// `apply_schema` / migration plan; that path also handles
574    /// adding/removing the tables when a `search:` block is added or
575    /// removed across deploys. This method is a quick path for tests
576    /// and benchmarks that build a `Runtime::in_memory(...)` directly
577    /// without going through the schema-plan pipeline.
578    pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
579        // Postgres: schema (FTS, facets) is owned by the storage adapter's
580        // migration plan. Tests / benchmarks against Postgres must apply
581        // the plan separately; this fast-path is a SQLite-only convenience.
582        if matches!(self.backend, RuntimeBackend::Postgres(_)) {
583            return Ok(());
584        }
585        let conn = self.lock_write_conn()?;
586        conn.execute(pylon_storage::search::create_facet_table_sql(), [])
587            .map_err(|e| RuntimeError {
588                code: "FACET_TABLE_FAILED".into(),
589                message: format!("create _facet_bitmap: {e}"),
590            })?;
591        for entity in &self.manifest.entities {
592            if let Some(cfg) = &entity.search {
593                if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
594                    conn.execute(&sql, []).map_err(|e| RuntimeError {
595                        code: "FTS_TABLE_FAILED".into(),
596                        message: format!("create FTS table for {}: {e}", entity.name),
597                    })?;
598                }
599                for field in &cfg.sortable {
600                    let idx_sql = format!(
601                        "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
602                        entity.name, entity.name,
603                    );
604                    conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
605                        code: "SORT_INDEX_FAILED".into(),
606                        message: format!("create sort index for {}.{field}: {e}", entity.name),
607                    })?;
608                }
609            }
610        }
611        Ok(())
612    }
613
614    /// Return a reference to the app manifest.
615    pub fn manifest(&self) -> &AppManifest {
616        &self.manifest
617    }
618
619    /// Expose the write connection mutex for transactional operations.
620    /// SQLite-only — Postgres mode returns `NOT_SQLITE_BACKEND`. Callers
621    /// that need a transaction on Postgres should use [`Runtime::transact_ops`]
622    /// (via the `DataStore` trait), which routes to a real Postgres
623    /// transaction inside `PostgresDataStore`.
624    pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
625        self.lock_write_conn()
626    }
627
628    /// Return the number of read connections in the pool. Always 0 for
629    /// in-memory SQLite (pool is empty by design) and for Postgres mode
630    /// (the pool concept doesn't apply — `PostgresDataStore` manages its
631    /// own connection internally).
632    pub fn read_pool_size(&self) -> usize {
633        match &self.backend {
634            RuntimeBackend::Sqlite(sb) => sb.read_pool.len(),
635            RuntimeBackend::Postgres(_) => 0,
636        }
637    }
638
639    /// Return true if this runtime is backed by Postgres. Useful for
640    /// SQLite-only fast-paths to early-exit cleanly.
641    pub fn is_postgres(&self) -> bool {
642        matches!(self.backend, RuntimeBackend::Postgres(_))
643    }
644
645    // -----------------------------------------------------------------------
646    // CRDT helpers
647    // -----------------------------------------------------------------------
648
649    /// Map an entity's manifest fields → the [`pylon_crdt::CrdtField`] vec
650    /// the LoroStore needs. Resolves each field's CRDT shape from the
651    /// (type, annotation) pair via `pylon_crdt::field_kind`. Caches
652    /// nothing yet — called per write, fine at our entity counts.
653    pub(crate) fn crdt_fields_for(
654        &self,
655        ent: &ManifestEntity,
656    ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
657        let mut out = Vec::with_capacity(ent.fields.len());
658        for f in &ent.fields {
659            // Skip the implicit `id` column — it's the row key, not a
660            // CRDT-managed value. SQLite's PRIMARY KEY constraint owns it.
661            if f.name == "id" {
662                continue;
663            }
664            let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
665                code: "INVALID_CRDT_FIELD".into(),
666                message: format!(
667                    "{}.{}: {e} (declared type={}, crdt={:?})",
668                    ent.name, f.name, f.field_type, f.crdt
669                ),
670            })?;
671            out.push(pylon_crdt::CrdtField {
672                name: f.name.clone(),
673                kind,
674            });
675        }
676        Ok(out)
677    }
678
679    /// Borrow the CRDT store. SQLite-only — Postgres mode does not yet
680    /// support per-row CRDT snapshots at the runtime layer (CRDT
681    /// broadcasts degrade to JSON change events).
682    ///
683    /// # Panics
684    /// Panics on Postgres backend. Call sites that may run under either
685    /// backend should branch on `is_postgres()` first.
686    pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
687        match &self.backend {
688            RuntimeBackend::Sqlite(sb) => &sb.crdt,
689            RuntimeBackend::Postgres(_) => {
690                panic!("crdt_store() called on Postgres-backed Runtime")
691            }
692        }
693    }
694
695    // -----------------------------------------------------------------------
696    // CRUD operations
697    // -----------------------------------------------------------------------
698
699    /// Insert a new row. Returns the generated ID.
700    ///
701    /// For entities with `crdt: true` (the default) the LoroDoc snapshot
702    /// + the SQLite materialized row are committed together in a single
703    /// SQLite transaction so a crash between the two leaves neither.
704    /// `crdt: false` entities skip the LoroDoc and use a direct write
705    /// (legacy LWW path). Both produce the same on-disk row shape, so
706    /// reads, indexes, FTS, and policies don't change between modes.
707    pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
708        if let Some(pg) = self.pg_backend() {
709            let ent = self.require_entity(entity)?;
710            // Both CRDT-mode and non-CRDT writes go through one
711            // transaction so the row, the FTS shadow, and (for CRDT)
712            // the LoroDoc snapshot either all commit or all roll back.
713            // Pre-fix this was three separate autocommits and any
714            // failure between them desynced the layers.
715            if ent.crdt {
716                let crdt_fields = self.crdt_fields_for(ent)?;
717                let id = generate_id();
718                // Inject the generated id so build_insert_sql reuses
719                // it — keeps the snapshot key and the row id aligned.
720                let mut row = data.clone();
721                if let Some(obj) = row.as_object_mut() {
722                    obj.insert("id".into(), serde_json::Value::String(id.clone()));
723                }
724                let result = pg.store.with_transaction_raw(|tx| -> Result<(), RuntimeError> {
725                    pg.crdt
726                        .apply_patch(tx, entity, &id, &crdt_fields, data)
727                        .map_err(|e| RuntimeError {
728                            code: "CRDT_APPLY_FAILED".into(),
729                            message: format!("crdt write {entity}/{id}: {e}"),
730                        })?;
731                    pylon_storage::pg_tx_store::tx_insert(tx, &self.manifest, entity, &row)
732                        .map(|_| ())
733                        .map_err(data_err_to_runtime)?;
734                    pg.crdt.cache_after_commit(tx, entity, &id);
735                    Ok(())
736                });
737                if result.is_err() {
738                    // Rollback drops the persisted snapshot, but the
739                    // in-memory LoroDoc was mutated in-place by
740                    // apply_patch. Evict it so the next access
741                    // re-hydrates from disk (which is back in the
742                    // pre-apply state). Without this, the cache would
743                    // hold a doc ahead of the materialized row.
744                    pg.crdt.evict(entity, &id);
745                }
746                result?;
747                return Ok(id);
748            }
749            // Non-CRDT path: still one tx — the typed `DataStore::insert`
750            // already wraps in `with_transaction` internally for FTS
751            // atomicity, so we can delegate straight through.
752            return pylon_http::DataStore::insert(&pg.store, entity, data)
753                .map_err(data_err_to_runtime);
754        }
755        let ent = self.require_entity(entity)?;
756        let conn = self.lock_write_conn()?;
757
758        let id = generate_id();
759
760        let obj = data.as_object().ok_or_else(|| RuntimeError {
761            code: "INVALID_DATA".into(),
762            message: "Insert data must be a JSON object".into(),
763        })?;
764
765        // Validate columns up-front so we don't even open a transaction
766        // for a patch that the SQL INSERT will reject.
767        for key in obj.keys() {
768            if key != "id" {
769                validate_column_name(key, ent)?;
770            }
771        }
772
773        // SQLite-only path past this point — Postgres dispatch happened
774        // at the top of `insert()`. Hoist the backend handle here so we
775        // can reach the LoroStore from inside the tx closure without
776        // a second runtime branch on every iteration.
777        let sb = self.sqlite_backend()?;
778
779        // Atomic block — CRDT sidecar snapshot + materialized SQL row +
780        // search-index maintenance all land together or none does. SQLite's
781        // rollback journal makes this crash-safe end-to-end.
782        with_write_tx(&conn, || {
783            if ent.crdt {
784                let crdt_fields = self.crdt_fields_for(ent)?;
785                sb.crdt
786                    .apply_patch(&conn, entity, &id, &crdt_fields, data)
787                    .map_err(|e| RuntimeError {
788                        code: "CRDT_APPLY_FAILED".into(),
789                        message: format!("crdt write {entity}/{id}: {e}"),
790                    })?;
791            }
792
793            let mut col_names = vec![quote_ident("id")];
794            let mut placeholders = vec!["?1".to_string()];
795            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
796
797            let mut idx = 2;
798            for (key, val) in obj {
799                if key == "id" {
800                    continue;
801                }
802                col_names.push(quote_ident(key));
803                placeholders.push(format!("?{idx}"));
804                values.push(json_to_sql(val));
805                idx += 1;
806            }
807
808            let sql = format!(
809                "INSERT INTO {} ({}) VALUES ({})",
810                quote_ident(entity),
811                col_names.join(", "),
812                placeholders.join(", ")
813            );
814
815            let params: Vec<&dyn rusqlite::types::ToSql> =
816                values.iter().map(|v| v.as_ref()).collect();
817            conn.execute(&sql, params.as_slice())
818                .map_err(|e| RuntimeError {
819                    code: "INSERT_FAILED".into(),
820                    message: format!("Insert into {entity} failed: {e}"),
821                })?;
822
823            // Search-index maintenance lives inside the same tx so a
824            // crash between the row insert and the FTS update can't leave
825            // the search index inconsistent with the row table.
826            if let Some(cfg) = ent.search.as_ref() {
827                if !cfg.is_empty() {
828                    pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
829                        .map_err(|e| RuntimeError {
830                            code: "SEARCH_MAINTENANCE_FAILED".into(),
831                            message: format!("search index update on insert {entity}: {e}"),
832                        })?;
833                }
834            }
835            Ok(())
836        })?;
837
838        Ok(id)
839    }
840
841    /// Get a single row by ID.
842    pub fn get_by_id(
843        &self,
844        entity: &str,
845        id: &str,
846    ) -> Result<Option<serde_json::Value>, RuntimeError> {
847        if let Some(pg) = self.pg_backend() {
848            return pylon_http::DataStore::get_by_id(&pg.store, entity, id)
849                .map_err(data_err_to_runtime);
850        }
851        let ent = self.require_entity(entity)?;
852        let conn = self.lock_read_conn()?;
853
854        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
855        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
856            code: "QUERY_FAILED".into(),
857            message: format!("Failed to prepare query: {e}"),
858        })?;
859
860        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
861
862        let result = stmt
863            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
864            .ok();
865
866        Ok(result)
867    }
868
869    /// List all rows for an entity.
870    pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
871        if let Some(pg) = self.pg_backend() {
872            return pylon_http::DataStore::list(&pg.store, entity).map_err(data_err_to_runtime);
873        }
874        let ent = self.require_entity(entity)?;
875        let conn = self.lock_read_conn()?;
876
877        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
878        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
879            code: "QUERY_FAILED".into(),
880            message: format!("Failed to prepare query: {e}"),
881        })?;
882
883        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
884
885        let rows = stmt
886            .query_map([], |row| Ok(row_to_json(row, &columns)))
887            .map_err(|e| RuntimeError {
888                code: "QUERY_FAILED".into(),
889                message: format!("Query failed: {e}"),
890            })?;
891
892        let mut result = Vec::new();
893        for row in rows {
894            if let Ok(val) = row {
895                result.push(val);
896            }
897        }
898        Ok(result)
899    }
900
901    /// List rows after a cursor ID (for cursor-based pagination).
902    pub fn list_after(
903        &self,
904        entity: &str,
905        after: Option<&str>,
906        limit: usize,
907    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
908        if let Some(pg) = self.pg_backend() {
909            return pylon_http::DataStore::list_after(&pg.store, entity, after, limit)
910                .map_err(data_err_to_runtime);
911        }
912        let ent = self.require_entity(entity)?;
913        let conn = self.lock_read_conn()?;
914
915        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
916        let table = quote_ident(entity);
917
918        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
919            Some(cursor) => (
920                format!(
921                    "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
922                    table
923                ),
924                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
925            ),
926            None => (
927                format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
928                vec![Box::new(limit as i64)],
929            ),
930        };
931
932        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
933            params.iter().map(|v| v.as_ref()).collect();
934
935        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
936            code: "QUERY_FAILED".into(),
937            message: format!("Failed to prepare query: {e}"),
938        })?;
939
940        let rows = stmt
941            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
942            .map_err(|e| RuntimeError {
943                code: "QUERY_FAILED".into(),
944                message: format!("Query failed: {e}"),
945            })?;
946
947        let mut result = Vec::new();
948        for row in rows {
949            if let Ok(val) = row {
950                result.push(val);
951            }
952        }
953        Ok(result)
954    }
955
956    /// Update a row by ID. Returns true if a row was found and updated.
957    ///
958    /// For entities with `crdt: true` (the default) the LoroDoc receives
959    /// the patch first; the SQLite UPDATE writes the same fields so the
960    /// materialized view stays in lockstep with the doc state.
961    pub fn update(
962        &self,
963        entity: &str,
964        id: &str,
965        data: &serde_json::Value,
966    ) -> Result<bool, RuntimeError> {
967        if let Some(pg) = self.pg_backend() {
968            let ent = self.require_entity(entity)?;
969            if ent.crdt {
970                // CRDT mode: snapshot apply + materialized update +
971                // FTS shadow rebuild all share one tx. Pre-fix the
972                // snapshot landed in autocommit and the row write in
973                // a separate one — a mid-write crash desynced them.
974                //
975                // The closure also FAILS the tx if `tx_update` returns
976                // false (no row matched). Without that, the snapshot
977                // would commit alone — orphaned state pointing at a
978                // row that doesn't exist. Codex flagged this. On
979                // rollback the runtime evicts the cached LoroDoc so
980                // the next read re-hydrates from the (unchanged)
981                // sidecar.
982                let crdt_fields = self.crdt_fields_for(ent)?;
983                let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
984                    pg.crdt
985                        .apply_patch(tx, entity, id, &crdt_fields, data)
986                        .map_err(|e| RuntimeError {
987                            code: "CRDT_APPLY_FAILED".into(),
988                            message: format!("crdt update {entity}/{id}: {e}"),
989                        })?;
990                    let updated = pylon_storage::pg_tx_store::tx_update(
991                        tx,
992                        &self.manifest,
993                        entity,
994                        id,
995                        data,
996                    )
997                    .map_err(data_err_to_runtime)?;
998                    if !updated {
999                        // Roll back via Err so the snapshot doesn't
1000                        // commit against a missing row.
1001                        return Err(RuntimeError {
1002                            code: "ENTITY_NOT_FOUND".into(),
1003                            message: format!(
1004                                "Update on {entity}/{id} found no row — refusing to commit \
1005                                 a CRDT snapshot that would orphan."
1006                            ),
1007                        });
1008                    }
1009                    // Refresh the cache from the just-persisted
1010                    // snapshot so post-commit reads on this process
1011                    // skip the re-hydration round-trip.
1012                    pg.crdt.cache_after_commit(tx, entity, id);
1013                    Ok(updated)
1014                });
1015                if result.is_err() {
1016                    pg.crdt.evict(entity, id);
1017                    // ENTITY_NOT_FOUND from the inner closure is the
1018                    // intended return for "no such row" — translate
1019                    // into Ok(false) so callers see the same shape
1020                    // the SQLite path returns. Real errors (CRDT
1021                    // apply failed, BEGIN/COMMIT failed) propagate.
1022                    if let Err(ref e) = result {
1023                        if e.code == "ENTITY_NOT_FOUND" {
1024                            return Ok(false);
1025                        }
1026                    }
1027                }
1028                return result;
1029            }
1030            return pylon_http::DataStore::update(&pg.store, entity, id, data)
1031                .map_err(data_err_to_runtime);
1032        }
1033        let ent = self.require_entity(entity)?;
1034        let conn = self.lock_write_conn()?;
1035
1036        let obj = data.as_object().ok_or_else(|| RuntimeError {
1037            code: "INVALID_DATA".into(),
1038            message: "Update data must be a JSON object".into(),
1039        })?;
1040
1041        // Validate up-front and exit cheap if there's nothing to write.
1042        for key in obj.keys() {
1043            if key != "id" {
1044                validate_column_name(key, ent)?;
1045            }
1046        }
1047        let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
1048        if writable_keys.is_empty() {
1049            return Ok(false);
1050        }
1051
1052        // SQLite-only path past this point — see note in `insert()`.
1053        let sb = self.sqlite_backend()?;
1054
1055        // Atomic block — same shape as insert. CRDT snapshot, SQL UPDATE,
1056        // and FTS maintenance all commit together.
1057        let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
1058            if ent.crdt {
1059                let crdt_fields = self.crdt_fields_for(ent)?;
1060                sb.crdt
1061                    .apply_patch(&conn, entity, id, &crdt_fields, data)
1062                    .map_err(|e| RuntimeError {
1063                        code: "CRDT_APPLY_FAILED".into(),
1064                        message: format!("crdt write {entity}/{id}: {e}"),
1065                    })?;
1066            }
1067
1068            let mut set_clauses = Vec::new();
1069            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1070            let mut idx = 1;
1071            for key in &writable_keys {
1072                set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1073                values.push(json_to_sql(&obj[key.as_str()]));
1074                idx += 1;
1075            }
1076
1077            // Capture pre-UPDATE row for search-maintenance diff INSIDE the
1078            // tx. Matches the contract of search_maintenance::apply_update
1079            // — old state must be read before the UPDATE lands.
1080            let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1081            let old_row = if searchable {
1082                self.get_by_id_with_conn(&conn, entity, id)?
1083            } else {
1084                None
1085            };
1086
1087            values.push(Box::new(id.to_string()));
1088            let sql = format!(
1089                "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1090                quote_ident(entity),
1091                set_clauses.join(", ")
1092            );
1093
1094            let params: Vec<&dyn rusqlite::types::ToSql> =
1095                values.iter().map(|v| v.as_ref()).collect();
1096            let affected = conn
1097                .execute(&sql, params.as_slice())
1098                .map_err(|e| RuntimeError {
1099                    code: "UPDATE_FAILED".into(),
1100                    message: format!("Update {entity}/{id} failed: {e}"),
1101                })? as i64;
1102
1103            if affected > 0 && searchable {
1104                if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1105                    pylon_storage::search_maintenance::apply_update(
1106                        &conn, entity, id, &old, data, cfg,
1107                    )
1108                    .map_err(|e| RuntimeError {
1109                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1110                        message: format!("search index update on update {entity}: {e}"),
1111                    })?;
1112                }
1113            }
1114            Ok(affected)
1115        })?;
1116
1117        Ok(affected > 0)
1118    }
1119
1120    /// Delete a row by ID. Returns true if a row was actually deleted.
1121    pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
1122        if let Some(pg) = self.pg_backend() {
1123            let ent = self.require_entity(entity)?;
1124            if ent.crdt {
1125                // Sidecar delete + entity delete + FTS shadow delete
1126                // share one tx. Eviction of the in-memory cache runs
1127                // AFTER commit so a rolled-back delete leaves the
1128                // cache valid (the snapshot is still on disk).
1129                let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
1130                    tx.execute(
1131                        "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
1132                        &[&entity, &id],
1133                    )
1134                    .map_err(|e| RuntimeError {
1135                        code: "CRDT_SIDECAR_DELETE_FAILED".into(),
1136                        message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
1137                    })?;
1138                    pylon_storage::pg_tx_store::tx_delete(tx, &self.manifest, entity, id)
1139                        .map_err(data_err_to_runtime)
1140                });
1141                // Evict regardless of whether tx_delete found a row —
1142                // we issued the sidecar DELETE inside the same tx, so
1143                // any cached doc is now stale even if the entity row
1144                // was already gone (orphan sidecar case codex flagged).
1145                // Only skip eviction if the WHOLE tx rolled back.
1146                if result.is_ok() {
1147                    pg.crdt.evict(entity, id);
1148                }
1149                return result;
1150            }
1151            return pylon_http::DataStore::delete(&pg.store, entity, id)
1152                .map_err(data_err_to_runtime);
1153        }
1154        let ent = self.require_entity(entity)?;
1155        let conn = self.lock_write_conn()?;
1156
1157        // Apply search-maintenance BEFORE the DELETE — we still need
1158        // the row's facet values to clear the bitmap bits.
1159        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1160        if searchable {
1161            if let (Some(cfg), Ok(Some(row))) = (
1162                ent.search.as_ref(),
1163                self.get_by_id_with_conn(&conn, entity, id),
1164            ) {
1165                pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
1166                    .map_err(|e| RuntimeError {
1167                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1168                        message: format!("search index update on delete {entity}: {e}"),
1169                    })?;
1170            }
1171        }
1172
1173        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1174        let affected = conn
1175            .execute(&sql, rusqlite::params![id])
1176            .map_err(|e| RuntimeError {
1177                code: "DELETE_FAILED".into(),
1178                message: format!("Delete {entity}/{id} failed: {e}"),
1179            })?;
1180
1181        Ok(affected > 0)
1182    }
1183
1184    /// Lookup a single row by a field value (e.g., email).
1185    pub fn lookup(
1186        &self,
1187        entity: &str,
1188        field: &str,
1189        value: &str,
1190    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1191        if let Some(pg) = self.pg_backend() {
1192            return pylon_http::DataStore::lookup(&pg.store, entity, field, value)
1193                .map_err(data_err_to_runtime);
1194        }
1195        let ent = self.require_entity(entity)?;
1196        validate_column_name(field, ent)?;
1197        let conn = self.lock_read_conn()?;
1198
1199        let sql = format!(
1200            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1201            quote_ident(entity),
1202            quote_ident(field)
1203        );
1204        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1205
1206        let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1207            stmt.query_row(rusqlite::params![value], |row| {
1208                Ok(row_to_json(row, &columns))
1209            })
1210            .ok()
1211        });
1212
1213        Ok(result)
1214    }
1215
1216    /// Link two entities by setting a foreign-key field.
1217    pub fn link(
1218        &self,
1219        entity: &str,
1220        id: &str,
1221        relation: &str,
1222        target_id: &str,
1223    ) -> Result<bool, RuntimeError> {
1224        let ent = self.require_entity(entity)?;
1225
1226        // Find the relation definition to determine which field to set.
1227        let rel = ent
1228            .relations
1229            .iter()
1230            .find(|r| r.name == relation)
1231            .ok_or_else(|| RuntimeError {
1232                code: "RELATION_NOT_FOUND".into(),
1233                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1234            })?;
1235
1236        let data = serde_json::json!({ rel.field.clone(): target_id });
1237        self.update(entity, id, &data)
1238    }
1239
1240    /// Unlink a relation by setting the foreign-key field to null.
1241    pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
1242        let ent = self.require_entity(entity)?;
1243
1244        let rel = ent
1245            .relations
1246            .iter()
1247            .find(|r| r.name == relation)
1248            .ok_or_else(|| RuntimeError {
1249                code: "RELATION_NOT_FOUND".into(),
1250                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1251            })?;
1252
1253        let data = serde_json::json!({ rel.field.clone(): null });
1254        self.update(entity, id, &data)
1255    }
1256
1257    /// Execute a filtered query with operators ($not, $gt, $in, $like, $order, $limit).
1258    pub fn query_filtered(
1259        &self,
1260        entity: &str,
1261        filter: &serde_json::Value,
1262    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1263        if let Some(pg) = self.pg_backend() {
1264            return pylon_http::DataStore::query_filtered(&pg.store, entity, filter)
1265                .map_err(data_err_to_runtime);
1266        }
1267        let ent = self.require_entity(entity)?;
1268        let conn = self.lock_read_conn()?;
1269
1270        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1271        let obj = filter
1272            .as_object()
1273            .unwrap_or(&serde_json::Map::new())
1274            .clone();
1275
1276        let mut where_clauses = Vec::new();
1277        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1278        let mut order_clause = String::new();
1279        let mut limit_clause = String::new();
1280        let mut join_clause = String::new();
1281        let mut fts_order = false;
1282        let mut idx = 1;
1283
1284        for (key, val) in &obj {
1285            match key.as_str() {
1286                "$order" => {
1287                    if let Some(order_obj) = val.as_object() {
1288                        let mut parts: Vec<String> = Vec::new();
1289                        for (col, dir) in order_obj {
1290                            validate_column_name(col, ent)?;
1291                            let d = match dir.as_str().unwrap_or("asc") {
1292                                "desc" | "DESC" => "DESC",
1293                                _ => "ASC",
1294                            };
1295                            parts.push(format!("{} {d}", quote_ident(col)));
1296                        }
1297                        if !parts.is_empty() {
1298                            order_clause = format!(" ORDER BY {}", parts.join(", "));
1299                        }
1300                    }
1301                }
1302                "$limit" => {
1303                    if let Some(n) = val.as_u64() {
1304                        limit_clause = format!(" LIMIT {n}");
1305                    }
1306                }
1307                "$offset" => {
1308                    if let Some(n) = val.as_u64() {
1309                        // SQLite requires LIMIT before OFFSET; add a default.
1310                        if limit_clause.is_empty() {
1311                            limit_clause = " LIMIT -1".into();
1312                        }
1313                        limit_clause = format!("{limit_clause} OFFSET {n}");
1314                    }
1315                }
1316                "$search" => {
1317                    if let Some(q) = val.as_str() {
1318                        // Join against the entity's FTS5 virtual table.
1319                        let fts = format!("{}_fts", entity);
1320                        join_clause = format!(
1321                            " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
1322                            fts = quote_ident(&fts),
1323                            ent = quote_ident(entity),
1324                        );
1325                        where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
1326                        values.push(Box::new(q.to_string()));
1327                        fts_order = true;
1328                        idx += 1;
1329                    }
1330                }
1331                _ => {
1332                    validate_column_name(key, ent)?;
1333                    let quoted_key = quote_ident(key);
1334
1335                    if let Some(op_obj) = val.as_object() {
1336                        for (op, op_val) in op_obj {
1337                            match op.as_str() {
1338                                "$not" => {
1339                                    where_clauses.push(format!("{quoted_key} != ?{idx}"));
1340                                    values.push(json_to_sql(op_val));
1341                                    idx += 1;
1342                                }
1343                                "$gt" => {
1344                                    where_clauses.push(format!("{quoted_key} > ?{idx}"));
1345                                    values.push(json_to_sql(op_val));
1346                                    idx += 1;
1347                                }
1348                                "$gte" => {
1349                                    where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1350                                    values.push(json_to_sql(op_val));
1351                                    idx += 1;
1352                                }
1353                                "$lt" => {
1354                                    where_clauses.push(format!("{quoted_key} < ?{idx}"));
1355                                    values.push(json_to_sql(op_val));
1356                                    idx += 1;
1357                                }
1358                                "$lte" => {
1359                                    where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1360                                    values.push(json_to_sql(op_val));
1361                                    idx += 1;
1362                                }
1363                                "$like" => {
1364                                    where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1365                                    let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1366                                    values.push(Box::new(pattern));
1367                                    idx += 1;
1368                                }
1369                                "$in" => {
1370                                    if let Some(arr) = op_val.as_array() {
1371                                        if arr.is_empty() {
1372                                            // Empty $in matches nothing.
1373                                            // Previously SQLite SKIPPED the
1374                                            // predicate (returning ALL rows)
1375                                            // while PG short-circuited to
1376                                            // FALSE — a real cross-backend
1377                                            // drift bug codex caught. Both
1378                                            // now emit `0` (false) so empty
1379                                            // $in returns an empty set.
1380                                            where_clauses.push("0".into());
1381                                        } else {
1382                                            let placeholders: Vec<String> = arr
1383                                                .iter()
1384                                                .map(|v| {
1385                                                    let p = format!("?{idx}");
1386                                                    values.push(json_to_sql(v));
1387                                                    idx += 1;
1388                                                    p
1389                                                })
1390                                                .collect();
1391                                            where_clauses.push(format!(
1392                                                "{quoted_key} IN ({})",
1393                                                placeholders.join(", ")
1394                                            ));
1395                                        }
1396                                    }
1397                                }
1398                                _ => {}
1399                            }
1400                        }
1401                    } else {
1402                        // Simple equality.
1403                        where_clauses.push(format!("{quoted_key} = ?{idx}"));
1404                        values.push(json_to_sql(val));
1405                        idx += 1;
1406                    }
1407                }
1408            }
1409        }
1410
1411        let where_sql = if where_clauses.is_empty() {
1412            String::new()
1413        } else {
1414            format!(" WHERE {}", where_clauses.join(" AND "))
1415        };
1416
1417        if order_clause.is_empty() {
1418            order_clause = if fts_order {
1419                // FTS joins default-order by bm25 relevance.
1420                " ORDER BY bm25(".to_string() + &quote_ident(&format!("{}_fts", entity)) + ")"
1421            } else {
1422                format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1423            };
1424        }
1425
1426        let select_prefix = format!("{}.*", quote_ident(entity));
1427        let sql = format!(
1428            "SELECT {} FROM {}{}{}{}{}",
1429            select_prefix,
1430            quote_ident(entity),
1431            join_clause,
1432            where_sql,
1433            order_clause,
1434            limit_clause
1435        );
1436        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1437            values.iter().map(|v| v.as_ref()).collect();
1438
1439        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1440            code: "QUERY_FAILED".into(),
1441            message: format!("Failed to prepare filtered query: {e}"),
1442        })?;
1443
1444        let rows = stmt
1445            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1446            .map_err(|e| RuntimeError {
1447                code: "QUERY_FAILED".into(),
1448                message: format!("Filtered query failed: {e}"),
1449            })?;
1450
1451        let mut result = Vec::new();
1452        for row in rows {
1453            if let Ok(val) = row {
1454                result.push(val);
1455            }
1456        }
1457        Ok(result)
1458    }
1459
1460    /// Execute a graph-style query.
1461    ///
1462    /// Input: `{ "User": { "where": { "email": "..." }, "include": { "posts": {} } } }`
1463    /// Returns nested results following relations.
1464    pub fn query_graph(
1465        &self,
1466        query: &serde_json::Value,
1467    ) -> Result<serde_json::Value, RuntimeError> {
1468        let obj = query.as_object().ok_or_else(|| RuntimeError {
1469            code: "INVALID_QUERY".into(),
1470            message: "Graph query must be a JSON object".into(),
1471        })?;
1472
1473        let mut results = serde_json::Map::new();
1474
1475        for (entity_name, query_opts) in obj {
1476            let _ent = self.require_entity(entity_name)?;
1477
1478            // Apply where clause if present.
1479            let filter = query_opts
1480                .get("where")
1481                .cloned()
1482                .unwrap_or(serde_json::json!({}));
1483            let rows = self.query_filtered(entity_name, &filter)?;
1484
1485            // Apply includes (relations) if present.
1486            let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1487            {
1488                // Internal invariant: if query_filtered succeeded above, the
1489                // entity must exist. Previously this used .unwrap() which
1490                // would panic if the invariant broke — a panic inside the
1491                // handler path poisons the connection mutex and takes down
1492                // all subsequent reads. Fail the request cleanly instead.
1493                let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1494                    code: "INVARIANT_BROKEN".into(),
1495                    message: format!(
1496                        "entity \"{entity_name}\" missing from registry during include expansion"
1497                    ),
1498                })?;
1499                rows.into_iter()
1500                    .map(|mut row| {
1501                        for (rel_name, _sub_query) in include {
1502                            if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1503                                let fk_value = row
1504                                    .get(&rel.field)
1505                                    .and_then(|v| v.as_str())
1506                                    .map(|s| s.to_string());
1507                                if let Some(fk) = fk_value {
1508                                    if rel.many {
1509                                        // One-to-many: find rows in target where field matches id.
1510                                        let sub_filter = serde_json::json!({ &rel.field: &fk });
1511                                        if let Ok(related) =
1512                                            self.query_filtered(&rel.target, &sub_filter)
1513                                        {
1514                                            row[rel_name] = serde_json::json!(related);
1515                                        }
1516                                    } else {
1517                                        // One-to-one / many-to-one: get by id.
1518                                        if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1519                                        {
1520                                            row[rel_name] = related;
1521                                        }
1522                                    }
1523                                }
1524                            }
1525                        }
1526                        row
1527                    })
1528                    .collect()
1529            } else {
1530                rows
1531            };
1532
1533            // Apply limit if present.
1534            let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1535                rows.into_iter().take(limit as usize).collect()
1536            } else {
1537                rows
1538            };
1539
1540            results.insert(entity_name.clone(), serde_json::json!(rows));
1541        }
1542
1543        Ok(serde_json::Value::Object(results))
1544    }
1545
1546    // -----------------------------------------------------------------------
1547    // Transaction-safe variants (use a pre-held connection guard)
1548    // -----------------------------------------------------------------------
1549
1550    /// Insert using an already-locked connection (for transactions).
1551    pub fn insert_with_conn(
1552        &self,
1553        conn: &Connection,
1554        entity: &str,
1555        data: &serde_json::Value,
1556    ) -> Result<String, RuntimeError> {
1557        let ent = self.require_entity(entity)?;
1558        let id = generate_id();
1559        let obj = data.as_object().ok_or_else(|| RuntimeError {
1560            code: "INVALID_DATA".into(),
1561            message: "Insert data must be a JSON object".into(),
1562        })?;
1563
1564        let mut col_names = vec![quote_ident("id")];
1565        let mut placeholders = vec!["?1".to_string()];
1566        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1567        let mut idx = 2;
1568        for (key, val) in obj {
1569            if key == "id" {
1570                continue;
1571            }
1572            validate_column_name(key, ent)?;
1573            col_names.push(quote_ident(key));
1574            placeholders.push(format!("?{idx}"));
1575            values.push(json_to_sql(val));
1576            idx += 1;
1577        }
1578
1579        let sql = format!(
1580            "INSERT INTO {} ({}) VALUES ({})",
1581            quote_ident(entity),
1582            col_names.join(", "),
1583            placeholders.join(", ")
1584        );
1585        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1586        conn.execute(&sql, params.as_slice())
1587            .map_err(|e| RuntimeError {
1588                code: "INSERT_FAILED".into(),
1589                message: format!("Insert into {entity} failed: {e}"),
1590            })?;
1591
1592        // Faceted-search maintenance in the same transaction. Skipped
1593        // for entities that don't declare `search:` in their schema.
1594        if let Some(cfg) = ent.search.as_ref() {
1595            if !cfg.is_empty() {
1596                pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1597                    .map_err(|e| RuntimeError {
1598                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1599                        message: format!("search index update on insert {entity}: {e}"),
1600                    })?;
1601            }
1602        }
1603
1604        Ok(id)
1605    }
1606
1607    /// Update using an already-locked connection (for transactions).
1608    pub fn update_with_conn(
1609        &self,
1610        conn: &Connection,
1611        entity: &str,
1612        id: &str,
1613        data: &serde_json::Value,
1614    ) -> Result<bool, RuntimeError> {
1615        let ent = self.require_entity(entity)?;
1616        let obj = data.as_object().ok_or_else(|| RuntimeError {
1617            code: "INVALID_DATA".into(),
1618            message: "Update data must be a JSON object".into(),
1619        })?;
1620
1621        let mut set_clauses = Vec::new();
1622        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1623        let mut idx = 1;
1624        for (key, val) in obj {
1625            if key == "id" {
1626                continue;
1627            }
1628            validate_column_name(key, ent)?;
1629            set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1630            values.push(json_to_sql(val));
1631            idx += 1;
1632        }
1633        if set_clauses.is_empty() {
1634            return Ok(false);
1635        }
1636
1637        // Capture the pre-UPDATE row if we need to diff facet values.
1638        // Read happens before the UPDATE so apply_update sees the OLD
1639        // state of any facet field. Cheap — single-row lookup on the
1640        // `id` primary-key index.
1641        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1642        let old_row = if searchable {
1643            self.get_by_id_with_conn(conn, entity, id)?
1644        } else {
1645            None
1646        };
1647
1648        values.push(Box::new(id.to_string()));
1649        let sql = format!(
1650            "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1651            quote_ident(entity),
1652            set_clauses.join(", ")
1653        );
1654        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1655        let affected = conn
1656            .execute(&sql, params.as_slice())
1657            .map_err(|e| RuntimeError {
1658                code: "UPDATE_FAILED".into(),
1659                message: format!("Update {entity}/{id} failed: {e}"),
1660            })?;
1661
1662        if affected > 0 && searchable {
1663            if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1664                pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1665                    .map_err(|e| RuntimeError {
1666                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1667                        message: format!("search index update on update {entity}: {e}"),
1668                    })?;
1669            }
1670        }
1671
1672        Ok(affected > 0)
1673    }
1674
1675    /// Delete using an already-locked connection (for transactions).
1676    pub fn delete_with_conn(
1677        &self,
1678        conn: &Connection,
1679        entity: &str,
1680        id: &str,
1681    ) -> Result<bool, RuntimeError> {
1682        let ent = self.require_entity(entity)?;
1683
1684        // Apply search maintenance BEFORE the DELETE so we still have
1685        // the row's facet values to diff against.
1686        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1687        if searchable {
1688            if let (Some(cfg), Ok(Some(row))) = (
1689                ent.search.as_ref(),
1690                self.get_by_id_with_conn(conn, entity, id),
1691            ) {
1692                pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1693                    .map_err(|e| RuntimeError {
1694                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1695                        message: format!("search index update on delete {entity}: {e}"),
1696                    })?;
1697            }
1698        }
1699
1700        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1701        let affected = conn
1702            .execute(&sql, rusqlite::params![id])
1703            .map_err(|e| RuntimeError {
1704                code: "DELETE_FAILED".into(),
1705                message: format!("Delete {entity}/{id} failed: {e}"),
1706            })?;
1707        Ok(affected > 0)
1708    }
1709
1710    /// Read a row by id using a pre-held connection (for transactions).
1711    pub fn get_by_id_with_conn(
1712        &self,
1713        conn: &Connection,
1714        entity: &str,
1715        id: &str,
1716    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1717        let ent = self.require_entity(entity)?;
1718        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1719        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1720            code: "QUERY_FAILED".into(),
1721            message: format!("Failed to prepare query: {e}"),
1722        })?;
1723        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1724        Ok(stmt
1725            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1726            .ok())
1727    }
1728
1729    /// List rows using a pre-held connection (for transactions).
1730    pub fn list_with_conn(
1731        &self,
1732        conn: &Connection,
1733        entity: &str,
1734    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1735        let ent = self.require_entity(entity)?;
1736        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1737        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1738            code: "QUERY_FAILED".into(),
1739            message: format!("Failed to prepare query: {e}"),
1740        })?;
1741        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1742        let rows = stmt
1743            .query_map([], |row| Ok(row_to_json(row, &columns)))
1744            .map_err(|e| RuntimeError {
1745                code: "QUERY_FAILED".into(),
1746                message: format!("Query failed: {e}"),
1747            })?;
1748        Ok(rows.flatten().collect())
1749    }
1750
1751    /// List after cursor using a pre-held connection (for transactions).
1752    pub fn list_after_with_conn(
1753        &self,
1754        conn: &Connection,
1755        entity: &str,
1756        after: Option<&str>,
1757        limit: usize,
1758    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1759        let ent = self.require_entity(entity)?;
1760        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1761        let table = quote_ident(entity);
1762        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1763            Some(cursor) => (
1764                format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1765                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1766            ),
1767            None => (
1768                format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1769                vec![Box::new(limit as i64)],
1770            ),
1771        };
1772        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1773            params.iter().map(|v| v.as_ref()).collect();
1774        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1775            code: "QUERY_FAILED".into(),
1776            message: format!("Failed to prepare: {e}"),
1777        })?;
1778        let rows = stmt
1779            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1780            .map_err(|e| RuntimeError {
1781                code: "QUERY_FAILED".into(),
1782                message: format!("Query failed: {e}"),
1783            })?;
1784        Ok(rows.flatten().collect())
1785    }
1786
1787    /// Lookup by field using a pre-held connection (for transactions).
1788    pub fn lookup_with_conn(
1789        &self,
1790        conn: &Connection,
1791        entity: &str,
1792        field: &str,
1793        value: &str,
1794    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1795        let ent = self.require_entity(entity)?;
1796        validate_column_name(field, ent)?;
1797        let sql = format!(
1798            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1799            quote_ident(entity),
1800            quote_ident(field)
1801        );
1802        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1803        Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1804            stmt.query_row(rusqlite::params![value], |row| {
1805                Ok(row_to_json(row, &columns))
1806            })
1807            .ok()
1808        }))
1809    }
1810
1811    /// Link relation using a pre-held connection (for transactions).
1812    pub fn link_with_conn(
1813        &self,
1814        conn: &Connection,
1815        entity: &str,
1816        id: &str,
1817        relation: &str,
1818        target_id: &str,
1819    ) -> Result<bool, RuntimeError> {
1820        let ent = self.require_entity(entity)?;
1821        let rel = ent
1822            .relations
1823            .iter()
1824            .find(|r| r.name == relation)
1825            .ok_or_else(|| RuntimeError {
1826                code: "RELATION_NOT_FOUND".into(),
1827                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1828            })?;
1829        let data = serde_json::json!({ rel.field.clone(): target_id });
1830        self.update_with_conn(conn, entity, id, &data)
1831    }
1832
1833    /// Unlink relation using a pre-held connection (for transactions).
1834    pub fn unlink_with_conn(
1835        &self,
1836        conn: &Connection,
1837        entity: &str,
1838        id: &str,
1839        relation: &str,
1840    ) -> Result<bool, RuntimeError> {
1841        let ent = self.require_entity(entity)?;
1842        let rel = ent
1843            .relations
1844            .iter()
1845            .find(|r| r.name == relation)
1846            .ok_or_else(|| RuntimeError {
1847                code: "RELATION_NOT_FOUND".into(),
1848                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1849            })?;
1850        let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1851        self.update_with_conn(conn, entity, id, &data)
1852    }
1853
1854    /// Query with filters using a pre-held connection (for transactions).
1855    ///
1856    /// Shares the filter-building logic with [`query_filtered`] by executing
1857    /// against the provided connection rather than acquiring one.
1858    pub fn query_filtered_with_conn(
1859        &self,
1860        conn: &Connection,
1861        entity: &str,
1862        filter: &serde_json::Value,
1863    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1864        let ent = self.require_entity(entity)?;
1865        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1866        let empty = serde_json::Map::new();
1867        let obj = filter.as_object().unwrap_or(&empty);
1868
1869        let mut where_clauses = Vec::new();
1870        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1871        let mut order_clause = String::new();
1872        let mut limit_clause = String::new();
1873        let mut idx = 1;
1874
1875        for (key, val) in obj {
1876            match key.as_str() {
1877                "$order" => {
1878                    if let Some(o) = val.as_object() {
1879                        let mut parts: Vec<String> = Vec::new();
1880                        for (col, dir) in o {
1881                            validate_column_name(col, ent)?;
1882                            let d = match dir.as_str().unwrap_or("asc") {
1883                                "desc" | "DESC" => "DESC",
1884                                _ => "ASC",
1885                            };
1886                            parts.push(format!("{} {d}", quote_ident(col)));
1887                        }
1888                        if !parts.is_empty() {
1889                            order_clause = format!(" ORDER BY {}", parts.join(", "));
1890                        }
1891                    }
1892                }
1893                "$limit" => {
1894                    if let Some(n) = val.as_u64() {
1895                        limit_clause = format!(" LIMIT {n}");
1896                    }
1897                }
1898                "$offset" => {
1899                    if let Some(n) = val.as_u64() {
1900                        if limit_clause.is_empty() {
1901                            limit_clause = " LIMIT -1".into();
1902                        }
1903                        limit_clause = format!("{limit_clause} OFFSET {n}");
1904                    }
1905                }
1906                _ => {
1907                    validate_column_name(key, ent)?;
1908                    let qk = quote_ident(key);
1909                    if let Some(op_obj) = val.as_object() {
1910                        for (op, op_val) in op_obj {
1911                            match op.as_str() {
1912                                "$not" => {
1913                                    where_clauses.push(format!("{qk} != ?{idx}"));
1914                                    values.push(json_to_sql(op_val));
1915                                    idx += 1;
1916                                }
1917                                "$gt" => {
1918                                    where_clauses.push(format!("{qk} > ?{idx}"));
1919                                    values.push(json_to_sql(op_val));
1920                                    idx += 1;
1921                                }
1922                                "$gte" => {
1923                                    where_clauses.push(format!("{qk} >= ?{idx}"));
1924                                    values.push(json_to_sql(op_val));
1925                                    idx += 1;
1926                                }
1927                                "$lt" => {
1928                                    where_clauses.push(format!("{qk} < ?{idx}"));
1929                                    values.push(json_to_sql(op_val));
1930                                    idx += 1;
1931                                }
1932                                "$lte" => {
1933                                    where_clauses.push(format!("{qk} <= ?{idx}"));
1934                                    values.push(json_to_sql(op_val));
1935                                    idx += 1;
1936                                }
1937                                "$like" => {
1938                                    where_clauses.push(format!("{qk} LIKE ?{idx}"));
1939                                    let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1940                                    values.push(Box::new(p));
1941                                    idx += 1;
1942                                }
1943                                "$in" => {
1944                                    if let Some(arr) = op_val.as_array() {
1945                                        let ph: Vec<String> = arr
1946                                            .iter()
1947                                            .map(|v| {
1948                                                let p = format!("?{idx}");
1949                                                values.push(json_to_sql(v));
1950                                                idx += 1;
1951                                                p
1952                                            })
1953                                            .collect();
1954                                        if !ph.is_empty() {
1955                                            where_clauses
1956                                                .push(format!("{qk} IN ({})", ph.join(", ")));
1957                                        }
1958                                    }
1959                                }
1960                                _ => {}
1961                            }
1962                        }
1963                    } else {
1964                        where_clauses.push(format!("{qk} = ?{idx}"));
1965                        values.push(json_to_sql(val));
1966                        idx += 1;
1967                    }
1968                }
1969            }
1970        }
1971
1972        let where_sql = if where_clauses.is_empty() {
1973            String::new()
1974        } else {
1975            format!(" WHERE {}", where_clauses.join(" AND "))
1976        };
1977        if order_clause.is_empty() {
1978            order_clause = " ORDER BY \"id\"".into();
1979        }
1980
1981        let sql = format!(
1982            "SELECT * FROM {}{}{}{}",
1983            quote_ident(entity),
1984            where_sql,
1985            order_clause,
1986            limit_clause
1987        );
1988        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1989            values.iter().map(|v| v.as_ref()).collect();
1990        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1991            code: "QUERY_FAILED".into(),
1992            message: format!("Failed to prepare: {e}"),
1993        })?;
1994        let rows = stmt
1995            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1996            .map_err(|e| RuntimeError {
1997                code: "QUERY_FAILED".into(),
1998                message: format!("Query failed: {e}"),
1999            })?;
2000        Ok(rows.flatten().collect())
2001    }
2002
2003    /// Graph query using a pre-held connection (for transactions).
2004    pub fn query_graph_with_conn(
2005        &self,
2006        conn: &Connection,
2007        query: &serde_json::Value,
2008    ) -> Result<serde_json::Value, RuntimeError> {
2009        let obj = query.as_object().ok_or_else(|| RuntimeError {
2010            code: "INVALID_QUERY".into(),
2011            message: "Graph query must be a JSON object".into(),
2012        })?;
2013        let mut results = serde_json::Map::new();
2014        for (entity_name, query_opts) in obj {
2015            let _ent = self.require_entity(entity_name)?;
2016            let filter = query_opts
2017                .get("where")
2018                .cloned()
2019                .unwrap_or(serde_json::json!({}));
2020            let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
2021            results.insert(entity_name.clone(), serde_json::json!(rows));
2022        }
2023        Ok(serde_json::Value::Object(results))
2024    }
2025
2026    // -----------------------------------------------------------------------
2027    // Aggregations — count, sum, avg, min, max, group by
2028    // -----------------------------------------------------------------------
2029
2030    /// Run an aggregation query. See [`pylon_http::DataStore::aggregate`]
2031    /// for the spec shape.
2032    pub fn aggregate(
2033        &self,
2034        entity: &str,
2035        spec: &serde_json::Value,
2036    ) -> Result<serde_json::Value, RuntimeError> {
2037        if let Some(pg) = self.pg_backend() {
2038            return pylon_http::DataStore::aggregate(&pg.store, entity, spec)
2039                .map_err(data_err_to_runtime);
2040        }
2041        let ent = self.require_entity(entity)?;
2042        let conn = self.lock_read_conn()?;
2043        let obj = spec.as_object().ok_or_else(|| RuntimeError {
2044            code: "INVALID_QUERY".into(),
2045            message: "aggregate spec must be an object".into(),
2046        })?;
2047
2048        // Build the SELECT list.
2049        let mut select_parts: Vec<String> = Vec::new();
2050        let mut result_fields: Vec<String> = Vec::new();
2051
2052        if let Some(count) = obj.get("count") {
2053            match count {
2054                serde_json::Value::String(s) if s == "*" => {
2055                    select_parts.push("COUNT(*) AS count".into());
2056                    result_fields.push("count".into());
2057                }
2058                serde_json::Value::String(field) => {
2059                    validate_column_name(field, ent)?;
2060                    let alias = format!("count_{field}");
2061                    select_parts.push(format!(
2062                        "COUNT({}) AS {}",
2063                        quote_ident(field),
2064                        quote_ident(&alias)
2065                    ));
2066                    result_fields.push(alias);
2067                }
2068                _ => {}
2069            }
2070        }
2071
2072        for (fn_name, alias_prefix) in [
2073            ("sum", "sum_"),
2074            ("avg", "avg_"),
2075            ("min", "min_"),
2076            ("max", "max_"),
2077        ] {
2078            if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
2079                for field in fields {
2080                    if let Some(f) = field.as_str() {
2081                        validate_column_name(f, ent)?;
2082                        let alias = format!("{alias_prefix}{f}");
2083                        let sql_fn = fn_name.to_uppercase();
2084                        select_parts.push(format!(
2085                            "{}({}) AS {}",
2086                            sql_fn,
2087                            quote_ident(f),
2088                            quote_ident(&alias)
2089                        ));
2090                        result_fields.push(alias);
2091                    }
2092                }
2093            }
2094        }
2095
2096        // countDistinct — separate handler because COUNT(DISTINCT) is a
2097        // distinct SQL form from COUNT(field). Lets dashboards ask "how
2098        // many unique customers placed orders this month" without a
2099        // client-side post-processing pass.
2100        if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
2101            for field in fields {
2102                if let Some(f) = field.as_str() {
2103                    validate_column_name(f, ent)?;
2104                    let alias = format!("count_distinct_{f}");
2105                    select_parts.push(format!(
2106                        "COUNT(DISTINCT {}) AS {}",
2107                        quote_ident(f),
2108                        quote_ident(&alias)
2109                    ));
2110                    result_fields.push(alias);
2111                }
2112            }
2113        }
2114
2115        // Group-by fields come first in the SELECT so each row is identifiable.
2116        // Each entry is either a plain column name (string) or a date-bucket
2117        // spec — `{ field: "createdAt", bucket: "day" }`. Buckets map to
2118        // SQLite strftime patterns so aggregation keys collapse to the
2119        // bucket boundary (hour / day / week / month / year).
2120        let mut group_by: Vec<String> = Vec::new();
2121        let mut group_select: Vec<String> = Vec::new();
2122        let mut group_field_names: Vec<String> = Vec::new();
2123        if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
2124            for g in groups {
2125                if let Some(f) = g.as_str() {
2126                    validate_column_name(f, ent)?;
2127                    let quoted = quote_ident(f);
2128                    group_by.push(quoted.clone());
2129                    group_select.push(quoted);
2130                    group_field_names.push(f.to_string());
2131                } else if let Some(spec) = g.as_object() {
2132                    let field =
2133                        spec.get("field")
2134                            .and_then(|v| v.as_str())
2135                            .ok_or_else(|| RuntimeError {
2136                                code: "INVALID_QUERY".into(),
2137                                message: "groupBy object spec requires `field`".into(),
2138                            })?;
2139                    validate_column_name(field, ent)?;
2140                    let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
2141                    let fmt = match bucket {
2142                        "hour" => "%Y-%m-%d %H:00:00",
2143                        "day" => "%Y-%m-%d",
2144                        "month" => "%Y-%m",
2145                        "year" => "%Y",
2146                        "week" => "%Y-W%W",
2147                        _ => {
2148                            return Err(RuntimeError {
2149                                code: "INVALID_QUERY".into(),
2150                                message: format!(
2151                                    "bucket must be one of hour/day/week/month/year, got {bucket}"
2152                                ),
2153                            });
2154                        }
2155                    };
2156                    let alias = format!("{field}_{bucket}");
2157                    let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
2158                    group_by.push(expr.clone());
2159                    group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
2160                    group_field_names.push(alias);
2161                }
2162            }
2163        }
2164        let mut full_select = group_select.clone();
2165        full_select.extend(select_parts.iter().cloned());
2166        if full_select.is_empty() {
2167            return Err(RuntimeError {
2168                code: "INVALID_QUERY".into(),
2169                message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
2170            });
2171        }
2172
2173        // WHERE clause (reuse filter syntax, but only simple equality for now).
2174        let mut where_clauses = Vec::new();
2175        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
2176        let mut idx = 1;
2177        if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
2178            for (k, v) in where_obj {
2179                validate_column_name(k, ent)?;
2180                where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
2181                values.push(json_to_sql(v));
2182                idx += 1;
2183            }
2184        }
2185        let where_sql = if where_clauses.is_empty() {
2186            String::new()
2187        } else {
2188            format!(" WHERE {}", where_clauses.join(" AND "))
2189        };
2190
2191        let group_sql = if group_by.is_empty() {
2192            String::new()
2193        } else {
2194            format!(" GROUP BY {}", group_by.join(", "))
2195        };
2196
2197        let sql = format!(
2198            "SELECT {} FROM {}{}{}",
2199            full_select.join(", "),
2200            quote_ident(entity),
2201            where_sql,
2202            group_sql
2203        );
2204
2205        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
2206            values.iter().map(|v| v.as_ref()).collect();
2207        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
2208            code: "QUERY_FAILED".into(),
2209            message: format!("Failed to prepare aggregate: {e}"),
2210        })?;
2211
2212        let column_names: Vec<String> = {
2213            let mut v = group_field_names.clone();
2214            v.extend(result_fields.iter().cloned());
2215            v
2216        };
2217
2218        let rows = stmt
2219            .query_map(param_refs.as_slice(), |row| {
2220                let mut obj = serde_json::Map::new();
2221                for (i, name) in column_names.iter().enumerate() {
2222                    // Try int first (counts/sums), then float, then string, then null.
2223                    if let Ok(n) = row.get::<_, i64>(i) {
2224                        obj.insert(name.clone(), serde_json::Value::Number(n.into()));
2225                    } else if let Ok(f) = row.get::<_, f64>(i) {
2226                        if let Some(num) = serde_json::Number::from_f64(f) {
2227                            obj.insert(name.clone(), serde_json::Value::Number(num));
2228                        } else {
2229                            obj.insert(name.clone(), serde_json::Value::Null);
2230                        }
2231                    } else if let Ok(s) = row.get::<_, String>(i) {
2232                        obj.insert(name.clone(), serde_json::Value::String(s));
2233                    } else {
2234                        obj.insert(name.clone(), serde_json::Value::Null);
2235                    }
2236                }
2237                Ok(serde_json::Value::Object(obj))
2238            })
2239            .map_err(|e| RuntimeError {
2240                code: "QUERY_FAILED".into(),
2241                message: format!("Aggregate failed: {e}"),
2242            })?;
2243
2244        let mut result = Vec::new();
2245        for row in rows {
2246            if let Ok(val) = row {
2247                result.push(val);
2248            }
2249        }
2250        Ok(serde_json::json!({ "rows": result }))
2251    }
2252
2253    // -----------------------------------------------------------------------
2254    // Helpers
2255    // -----------------------------------------------------------------------
2256
2257    fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
2258        self.entities.get(name).ok_or_else(|| RuntimeError {
2259            code: "ENTITY_NOT_FOUND".into(),
2260            message: format!("Unknown entity: \"{name}\""),
2261        })
2262    }
2263
2264    /// Acquire the write connection. Used for INSERT, UPDATE, DELETE.
2265    /// SQLite-only — Postgres callers should never reach this (each
2266    /// public CRUD method branches at the top and dispatches to
2267    /// `PostgresDataStore` first). Returns `NOT_SQLITE_BACKEND` if
2268    /// invoked on a Postgres runtime, which indicates a missing dispatch.
2269    fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
2270        let sb = self.sqlite_backend()?;
2271        sb.write_conn.lock().map_err(|e| RuntimeError {
2272            code: "LOCK_FAILED".into(),
2273            message: format!("Failed to acquire write connection lock: {e}"),
2274        })
2275    }
2276
2277    /// Acquire a read connection. Uses the read pool if available (file-backed
2278    /// databases), otherwise falls back to the write connection (in-memory).
2279    /// Connections are selected round-robin to spread load evenly. SQLite-only.
2280    fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
2281        let sb = self.sqlite_backend()?;
2282        if !sb.read_pool.is_empty() {
2283            let idx = sb.read_counter.fetch_add(1, Ordering::Relaxed) % sb.read_pool.len();
2284            let guard = sb.read_pool[idx].lock().map_err(|e| RuntimeError {
2285                code: "LOCK_FAILED".into(),
2286                message: format!("Failed to acquire read connection: {e}"),
2287            })?;
2288            Ok(ReadConnGuard::Pooled(guard))
2289        } else {
2290            // Fall back to write connection for in-memory DBs.
2291            let guard = sb.write_conn.lock().map_err(|e| RuntimeError {
2292                code: "LOCK_FAILED".into(),
2293                message: format!("Failed to acquire connection: {e}"),
2294            })?;
2295            Ok(ReadConnGuard::Write(guard))
2296        }
2297    }
2298
2299    /// Borrow the SQLite backend, or fail with `NOT_SQLITE_BACKEND` if
2300    /// this runtime is Postgres-backed. Used by every SQLite-specific
2301    /// helper as a single point of dispatch.
2302    fn sqlite_backend(&self) -> Result<&SqliteBackend, RuntimeError> {
2303        match &self.backend {
2304            RuntimeBackend::Sqlite(sb) => Ok(sb),
2305            RuntimeBackend::Postgres(_) => Err(RuntimeError {
2306                code: "NOT_SQLITE_BACKEND".into(),
2307                message: "this operation requires a SQLite-backed Runtime".into(),
2308            }),
2309        }
2310    }
2311
2312    /// Borrow the Postgres backend, or `None` for SQLite. Used by the
2313    /// per-method dispatch at the top of each entity-CRUD function
2314    /// AND by the `DataStore` impl in `datastore.rs` to reach the
2315    /// CRDT sidecar.
2316    pub(crate) fn pg_backend(&self) -> Option<&PgBackend> {
2317        match &self.backend {
2318            RuntimeBackend::Sqlite(_) => None,
2319            RuntimeBackend::Postgres(pg) => Some(pg),
2320        }
2321    }
2322
2323    /// Borrow the underlying Postgres `DataStore` if this runtime is
2324    /// Postgres-backed. Used by the `DataStore` adapter in `datastore.rs`
2325    /// to delegate `transact`/`search` etc. without re-implementing them.
2326    /// Accessor for the underlying PostgresDataStore. Used by
2327    /// integration tests to exercise in-tx primitives directly
2328    /// without going through a TS function handler. Also useful for
2329    /// callers that need to drop down to raw PG (e.g. running an
2330    /// EXPLAIN against the live cluster from an admin tool).
2331    /// Returns None on SQLite-backed runtimes.
2332    pub fn pg_data_store_pub(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2333        self.pg_data_store()
2334    }
2335
2336    #[doc(hidden)]
2337    pub fn pg_data_store_for_tests(&self) -> &pylon_storage::pg_datastore::PostgresDataStore {
2338        self.pg_data_store().expect("pg backend")
2339    }
2340
2341    /// Test-only: run a closure inside a PG mutation tx with the
2342    /// CRDT hook installed — same code path FnOpsImpl::call uses
2343    /// for `Mutation` handlers. Lets integration tests verify the
2344    /// hook without spinning up a Bun runtime.
2345    #[doc(hidden)]
2346    pub fn run_in_pg_mutation_tx_for_tests<F, T, E>(&self, body: F) -> Result<T, E>
2347    where
2348        F: FnOnce(&dyn pylon_http::DataStore) -> Result<T, E>,
2349        E: From<pylon_http::DataError>,
2350    {
2351        let pg_backend = self.pg_backend().expect("pg backend");
2352        let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
2353            std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2354                crdt: std::sync::Arc::clone(&pg_backend.crdt),
2355                manifest: std::sync::Arc::new(self.manifest.clone()),
2356            });
2357        pg_backend
2358            .store
2359            .with_transaction_crdt(crdt_hook, body)
2360    }
2361
2362    pub(crate) fn pg_data_store(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2363        self.pg_backend().map(|pg| &pg.store)
2364    }
2365}
2366
2367// ---------------------------------------------------------------------------
2368// Helpers
2369// ---------------------------------------------------------------------------
2370
2371/// Generate a lex-sortable, monotonic-ish unique ID.
2372///
2373/// Same shape as `pylon_storage::postgres::generate_id` — fixed-width hex
2374/// of nanoseconds + 8-hex per-process counter (40 chars total). The fixed
2375/// width is what makes `WHERE id > $1 ORDER BY id` correct for cursor
2376/// pagination: variable-width hex sorts incorrectly at width boundaries
2377/// (e.g. `"ff"` lex-sorts after `"100"`).
2378/// Run `body` inside a SQLite transaction on `conn`. Commits on `Ok`,
2379/// rolls back on `Err` (or if `body` panics).
2380///
2381/// Used to make the multi-statement CRDT write paths (LoroDoc snapshot
2382/// upsert into `_pylon_crdt_snapshots` + the materialized entity row
2383/// INSERT/UPDATE + FTS / facet maintenance) atomic so a crash mid-write
2384/// can never leave the materialized view stale relative to the CRDT
2385/// snapshot. Uses unmanaged BEGIN/COMMIT/ROLLBACK rather than rusqlite's
2386/// `Transaction` API because the existing call sites borrow `conn`
2387/// through inner closures and the lifetime juggling for a `Transaction`
2388/// guard would force more refactoring than the explicit BEGIN/COMMIT.
2389///
2390/// `BEGIN IMMEDIATE` (vs the default `BEGIN DEFERRED`) takes the SQLite
2391/// reserved lock on entry instead of escalating later — matches the
2392/// pattern in `datastore.rs::transact` and avoids a SQLITE_BUSY race
2393/// where a concurrent reader prevents the lock upgrade mid-write.
2394fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
2395where
2396    F: FnOnce() -> Result<T, RuntimeError>,
2397{
2398    conn.execute("BEGIN IMMEDIATE", [])
2399        .map_err(|e| RuntimeError {
2400            code: "TX_BEGIN_FAILED".into(),
2401            message: format!("BEGIN: {e}"),
2402        })?;
2403    match body() {
2404        Ok(v) => {
2405            conn.execute("COMMIT", []).map_err(|e| RuntimeError {
2406                code: "TX_COMMIT_FAILED".into(),
2407                message: format!("COMMIT: {e}"),
2408            })?;
2409            Ok(v)
2410        }
2411        Err(e) => {
2412            // Best-effort rollback; if even ROLLBACK fails we surface
2413            // the *original* error since that's the more actionable one.
2414            let _ = conn.execute("ROLLBACK", []);
2415            Err(e)
2416        }
2417    }
2418}
2419
2420fn generate_id() -> String {
2421    use std::sync::atomic::{AtomicU32, Ordering};
2422    use std::time::{SystemTime, UNIX_EPOCH};
2423    static COUNTER: AtomicU32 = AtomicU32::new(0);
2424    let nanos = SystemTime::now()
2425        .duration_since(UNIX_EPOCH)
2426        .unwrap_or_default()
2427        .as_nanos();
2428    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2429    format!("{nanos:032x}{seq:08x}")
2430}
2431
2432/// Convert a `serde_json::Value` to a boxed `ToSql` for rusqlite.
2433fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2434    match val {
2435        serde_json::Value::Null => Box::new(rusqlite::types::Null),
2436        serde_json::Value::Bool(b) => Box::new(*b as i32),
2437        serde_json::Value::Number(n) => {
2438            if let Some(i) = n.as_i64() {
2439                Box::new(i)
2440            } else if let Some(f) = n.as_f64() {
2441                Box::new(f)
2442            } else {
2443                Box::new(n.to_string())
2444            }
2445        }
2446        serde_json::Value::String(s) => Box::new(s.clone()),
2447        other => Box::new(other.to_string()),
2448    }
2449}
2450
2451/// Convert a rusqlite row to a JSON value.
2452///
2453/// Reads columns by NAME (via the row's actual column metadata) rather
2454/// than by positional index. The previous implementation assumed the
2455/// SQLite table column order matched the manifest field order, which
2456/// silently breaks when a new field is inserted in the middle of the
2457/// manifest: SQLite's `ALTER TABLE ADD COLUMN` always appends to the
2458/// end of the table, so existing data lands in the wrong field on
2459/// every read.
2460///
2461/// `field_names` is still passed (unused in the body, kept for API
2462/// stability with callers that compute it from the manifest) — the
2463/// name set comes from the row itself now, which always matches the
2464/// SELECT's actual column shape.
2465fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2466    let mut obj = serde_json::Map::new();
2467
2468    let stmt = row.as_ref();
2469    let count = stmt.column_count();
2470    for i in 0..count {
2471        // Column names are short string slices into the prepared
2472        // statement; copy out into owned Strings before inserting into
2473        // the map (the slice borrow can't outlive the row).
2474        let name = match stmt.column_name(i) {
2475            Ok(n) => n.to_string(),
2476            Err(_) => continue,
2477        };
2478        let value = if let Ok(s) = row.get::<_, String>(i) {
2479            serde_json::Value::String(s)
2480        } else if let Ok(n) = row.get::<_, i64>(i) {
2481            serde_json::Value::Number(serde_json::Number::from(n))
2482        } else if let Ok(f) = row.get::<_, f64>(i) {
2483            serde_json::Number::from_f64(f)
2484                .map(serde_json::Value::Number)
2485                .unwrap_or(serde_json::Value::Null)
2486        } else {
2487            serde_json::Value::Null
2488        };
2489        obj.insert(name, value);
2490    }
2491
2492    serde_json::Value::Object(obj)
2493}
2494
2495#[cfg(test)]
2496mod tests {
2497    use super::*;
2498    use pylon_kernel::{ManifestField, ManifestIndex};
2499
2500    fn test_manifest() -> AppManifest {
2501        AppManifest {
2502            manifest_version: 1,
2503            name: "Test".into(),
2504            version: "0.1.0".into(),
2505            entities: vec![pylon_kernel::ManifestEntity {
2506                name: "User".into(),
2507                fields: vec![
2508                    ManifestField {
2509                        name: "email".into(),
2510                        field_type: "string".into(),
2511                        optional: false,
2512                        unique: true,
2513                        crdt: None,
2514                    },
2515                    ManifestField {
2516                        name: "displayName".into(),
2517                        field_type: "string".into(),
2518                        optional: false,
2519                        unique: false,
2520                        crdt: None,
2521                    },
2522                ],
2523                indexes: vec![ManifestIndex {
2524                    name: "user_email".into(),
2525                    fields: vec!["email".into()],
2526                    unique: true,
2527                }],
2528                relations: vec![],
2529                search: None,
2530                crdt: true,
2531            }],
2532            routes: vec![],
2533            queries: vec![],
2534            actions: vec![],
2535            policies: vec![],
2536            auth: Default::default(),
2537        }
2538    }
2539
2540    #[test]
2541    fn reset_for_tests_wipes_in_memory() {
2542        let rt = Runtime::in_memory(test_manifest()).unwrap();
2543        rt.insert(
2544            "User",
2545            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2546        )
2547        .unwrap();
2548        assert_eq!(rt.list("User").unwrap().len(), 1);
2549        rt.reset_for_tests().unwrap();
2550        assert_eq!(rt.list("User").unwrap().len(), 0);
2551    }
2552
2553    #[test]
2554    fn reset_for_tests_refuses_file_db() {
2555        let dir = std::env::temp_dir().join("pylon-reset-refuse");
2556        let _ = std::fs::create_dir_all(&dir);
2557        let db_path = dir.join("db.sqlite");
2558        let _ = std::fs::remove_file(&db_path);
2559        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2560        let err = rt.reset_for_tests().unwrap_err();
2561        assert_eq!(err.code, "RESET_REFUSED");
2562        let _ = std::fs::remove_file(&db_path);
2563    }
2564
2565    #[test]
2566    fn insert_and_get() {
2567        let rt = Runtime::in_memory(test_manifest()).unwrap();
2568        let id = rt
2569            .insert(
2570                "User",
2571                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2572            )
2573            .unwrap();
2574        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2575        assert_eq!(row["email"], "a@b.com");
2576    }
2577
2578    /// Regression: when a new field is added in the middle of a manifest,
2579    /// SQLite ALTER TABLE ADD COLUMN appends it to the end of the table.
2580    /// The previous `row_to_json` read columns by positional index in
2581    /// manifest order, so existing data shifted into the wrong fields
2582    /// on every read (createdAt's value showed up as the new field's,
2583    /// and vice versa). row_to_json now reads by column name from the
2584    /// row's own metadata, so the bug can't recur regardless of
2585    /// migration order.
2586    #[test]
2587    fn row_to_json_handles_columns_added_out_of_manifest_order() {
2588        // Manifest: id, email, displayName, avatarColor, createdAt
2589        let mut manifest = test_manifest();
2590        manifest.entities[0].fields = vec![
2591            ManifestField {
2592                name: "email".into(),
2593                field_type: "string".into(),
2594                optional: false,
2595                unique: true,
2596                crdt: None,
2597            },
2598            ManifestField {
2599                name: "displayName".into(),
2600                field_type: "string".into(),
2601                optional: false,
2602                unique: false,
2603                crdt: None,
2604            },
2605            ManifestField {
2606                name: "avatarColor".into(),
2607                field_type: "string".into(),
2608                optional: true,
2609                unique: false,
2610                crdt: None,
2611            },
2612            ManifestField {
2613                name: "createdAt".into(),
2614                field_type: "datetime".into(),
2615                optional: true,
2616                unique: false,
2617                crdt: None,
2618            },
2619        ];
2620        // Important: turn off CRDT mode for this test — CRDT mode writes
2621        // the projection back to SQLite explicitly per-field, so it
2622        // wouldn't exercise the column-order bug we're regressing
2623        // against. The bug bites the legacy path that still does
2624        // `INSERT (id, email, displayName, ...) VALUES (...)` and then
2625        // `SELECT * ... → row_to_json` to read it back.
2626        manifest.entities[0].crdt = false;
2627        let rt = Runtime::in_memory(manifest).unwrap();
2628        let id = rt
2629            .insert(
2630                "User",
2631                &serde_json::json!({
2632                    "email": "a@b.com",
2633                    "displayName": "Alice",
2634                    "avatarColor": "#abc",
2635                    "createdAt": "2026-01-01T00:00:00Z",
2636                }),
2637            )
2638            .unwrap();
2639
2640        // Simulate an ALTER TABLE ADD COLUMN that appends a new field
2641        // at the end of the SQLite table even though the manifest
2642        // places it in the middle. This is the exact shape of what
2643        // happens when a user adds a new field between existing ones
2644        // and pylon dev migrates the table forward.
2645        {
2646            let conn = rt.lock_write_conn().unwrap();
2647            conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2648                .unwrap();
2649            conn.execute(
2650                "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2651                rusqlite::params!["hashed-password", &id],
2652            )
2653            .unwrap();
2654        }
2655        // Update the in-memory manifest to reflect the new field
2656        // sitting between avatarColor and createdAt — this is what the
2657        // regenerated manifest would look like.
2658        // (We mutate via the storage path to mirror the actual flow.)
2659
2660        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2661        // The crucial assertions: each column maps to its own value,
2662        // not the value of whichever column happens to share its
2663        // SQLite position.
2664        assert_eq!(row["email"], "a@b.com");
2665        assert_eq!(row["displayName"], "Alice");
2666        assert_eq!(row["avatarColor"], "#abc");
2667        assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2668        assert_eq!(row["passwordHash"], "hashed-password");
2669    }
2670
2671    /// CRDT-mode entities (the default) populate the sidecar snapshot
2672    /// table on every write — the LoroDoc is the source of truth, the
2673    /// SQLite row is the materialized projection. This proves the CRDT
2674    /// branch in `insert` actually fires.
2675    #[test]
2676    fn crdt_default_writes_through_loro_store() {
2677        let rt = Runtime::in_memory(test_manifest()).unwrap();
2678        let id = rt
2679            .insert(
2680                "User",
2681                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2682            )
2683            .unwrap();
2684
2685        // Sidecar contains exactly one snapshot for the new row.
2686        let conn = rt.lock_write_conn().unwrap();
2687        let snap_count: i64 = conn
2688            .query_row(
2689                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2690                 WHERE entity = ?1 AND row_id = ?2",
2691                rusqlite::params!["User", &id],
2692                |r| r.get(0),
2693            )
2694            .unwrap();
2695        assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2696
2697        // Loro doc is cached in memory after the write — proves
2698        // get_or_hydrate ran during apply_patch.
2699        assert!(rt.crdt_store().cached_rows() >= 1);
2700
2701        // SQLite materialized view has the projected row.
2702        drop(conn);
2703        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2704        assert_eq!(row["email"], "x@y.com");
2705        assert_eq!(row["displayName"], "Eric");
2706    }
2707
2708    /// Updates write through the LoroDoc as well — verifies the sidecar
2709    /// snapshot grows (Loro tracks new ops) and the materialized row
2710    /// reflects the new value.
2711    #[test]
2712    fn crdt_update_persists_new_snapshot() {
2713        let rt = Runtime::in_memory(test_manifest()).unwrap();
2714        let id = rt
2715            .insert(
2716                "User",
2717                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2718            )
2719            .unwrap();
2720
2721        let snap_after_insert: Vec<u8> = {
2722            let conn = rt.lock_write_conn().unwrap();
2723            conn.query_row(
2724                "SELECT snapshot FROM _pylon_crdt_snapshots
2725                 WHERE entity = 'User' AND row_id = ?1",
2726                rusqlite::params![&id],
2727                |r| r.get(0),
2728            )
2729            .unwrap()
2730        };
2731
2732        rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2733            .unwrap();
2734
2735        let snap_after_update: Vec<u8> = {
2736            let conn = rt.lock_write_conn().unwrap();
2737            conn.query_row(
2738                "SELECT snapshot FROM _pylon_crdt_snapshots
2739                 WHERE entity = 'User' AND row_id = ?1",
2740                rusqlite::params![&id],
2741                |r| r.get(0),
2742            )
2743            .unwrap()
2744        };
2745
2746        assert_ne!(
2747            snap_after_insert, snap_after_update,
2748            "snapshot bytes should change after an update"
2749        );
2750
2751        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2752        assert_eq!(row["displayName"], "Eric C");
2753        assert_eq!(row["email"], "x@y.com");
2754    }
2755
2756    /// Regression: when the SQL INSERT step inside Runtime::insert fails
2757    /// (UNIQUE-constraint violation here), the LoroDoc snapshot must
2758    /// also roll back — neither half lands. Previously the LoroStore
2759    /// wrote first and committed independently, so a doomed INSERT left
2760    /// a sidecar row pointing at a doc that the materialized table
2761    /// never knew about.
2762    #[test]
2763    fn crdt_insert_rolls_back_when_sql_step_fails() {
2764        let rt = Runtime::in_memory(test_manifest()).unwrap();
2765        // Seed a row.
2766        rt.insert(
2767            "User",
2768            &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2769        )
2770        .unwrap();
2771
2772        // Snapshot the sidecar row count BEFORE the failing insert.
2773        let snap_count_before: i64 = {
2774            let conn = rt.lock_write_conn().unwrap();
2775            conn.query_row(
2776                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2777                [],
2778                |r| r.get(0),
2779            )
2780            .unwrap()
2781        };
2782
2783        // Attempt a duplicate-email insert. SQL UNIQUE rejects.
2784        let err = rt
2785            .insert(
2786                "User",
2787                &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2788            )
2789            .expect_err("duplicate email must fail");
2790        assert_eq!(err.code, "INSERT_FAILED");
2791
2792        // Sidecar row count unchanged — the LoroDoc snapshot the CRDT
2793        // path wrote was rolled back along with the failed SQL INSERT.
2794        let snap_count_after: i64 = {
2795            let conn = rt.lock_write_conn().unwrap();
2796            conn.query_row(
2797                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2798                [],
2799                |r| r.get(0),
2800            )
2801            .unwrap()
2802        };
2803        assert_eq!(
2804            snap_count_after, snap_count_before,
2805            "failed insert should not leave a sidecar snapshot behind"
2806        );
2807    }
2808
2809    /// Entities with `crdt: false` skip the LoroDoc entirely — no sidecar
2810    /// row, no Loro cache entry. Proves the opt-out actually opts out.
2811    #[test]
2812    fn crdt_false_skips_loro_store() {
2813        let mut manifest = test_manifest();
2814        // Flip the User entity to LWW-only mode.
2815        manifest.entities[0].crdt = false;
2816        let rt = Runtime::in_memory(manifest).unwrap();
2817
2818        let id = rt
2819            .insert(
2820                "User",
2821                &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2822            )
2823            .unwrap();
2824
2825        let conn = rt.lock_write_conn().unwrap();
2826        let snap_count: i64 = conn
2827            .query_row(
2828                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2829                 WHERE entity = 'User' AND row_id = ?1",
2830                rusqlite::params![&id],
2831                |r| r.get(0),
2832            )
2833            .unwrap();
2834        assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2835        assert_eq!(
2836            rt.crdt_store().cached_rows(),
2837            0,
2838            "crdt:false should not warm the cache"
2839        );
2840
2841        // SQLite path still works — the row landed via the legacy
2842        // direct-write path.
2843        drop(conn);
2844        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2845        assert_eq!(row["email"], "lww@example.com");
2846    }
2847
2848    #[test]
2849    fn list_entities() {
2850        let rt = Runtime::in_memory(test_manifest()).unwrap();
2851        rt.insert(
2852            "User",
2853            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2854        )
2855        .unwrap();
2856        rt.insert(
2857            "User",
2858            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2859        )
2860        .unwrap();
2861        let rows = rt.list("User").unwrap();
2862        assert_eq!(rows.len(), 2);
2863    }
2864
2865    #[test]
2866    fn update_entity() {
2867        let rt = Runtime::in_memory(test_manifest()).unwrap();
2868        let id = rt
2869            .insert(
2870                "User",
2871                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2872            )
2873            .unwrap();
2874        let updated = rt
2875            .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2876            .unwrap();
2877        assert!(updated);
2878        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2879        assert_eq!(row["displayName"], "Updated");
2880    }
2881
2882    #[test]
2883    fn delete_entity() {
2884        let rt = Runtime::in_memory(test_manifest()).unwrap();
2885        let id = rt
2886            .insert(
2887                "User",
2888                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2889            )
2890            .unwrap();
2891        let deleted = rt.delete("User", &id).unwrap();
2892        assert!(deleted);
2893        assert!(rt.get_by_id("User", &id).unwrap().is_none());
2894    }
2895
2896    #[test]
2897    fn lookup_by_field() {
2898        let rt = Runtime::in_memory(test_manifest()).unwrap();
2899        rt.insert(
2900            "User",
2901            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2902        )
2903        .unwrap();
2904        let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2905        assert_eq!(row["displayName"], "A");
2906    }
2907
2908    #[test]
2909    fn unknown_entity_returns_error() {
2910        let rt = Runtime::in_memory(test_manifest()).unwrap();
2911        let err = rt.list("Nonexistent").unwrap_err();
2912        assert_eq!(err.code, "ENTITY_NOT_FOUND");
2913    }
2914
2915    #[test]
2916    fn insert_rejects_unknown_column() {
2917        let rt = Runtime::in_memory(test_manifest()).unwrap();
2918        let err = rt
2919            .insert(
2920                "User",
2921                &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2922            )
2923            .unwrap_err();
2924        assert_eq!(err.code, "INVALID_COLUMN");
2925    }
2926
2927    #[test]
2928    fn update_rejects_unknown_column() {
2929        let rt = Runtime::in_memory(test_manifest()).unwrap();
2930        let id = rt
2931            .insert(
2932                "User",
2933                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2934            )
2935            .unwrap();
2936        let err = rt
2937            .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2938            .unwrap_err();
2939        assert_eq!(err.code, "INVALID_COLUMN");
2940    }
2941
2942    #[test]
2943    fn lookup_rejects_unknown_column() {
2944        let rt = Runtime::in_memory(test_manifest()).unwrap();
2945        let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2946        assert_eq!(err.code, "INVALID_COLUMN");
2947    }
2948
2949    #[test]
2950    fn query_filtered_rejects_unknown_column() {
2951        let rt = Runtime::in_memory(test_manifest()).unwrap();
2952        let err = rt
2953            .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2954            .unwrap_err();
2955        assert_eq!(err.code, "INVALID_COLUMN");
2956    }
2957
2958    #[test]
2959    fn query_filtered_rejects_unknown_order_column() {
2960        let rt = Runtime::in_memory(test_manifest()).unwrap();
2961        let err = rt
2962            .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2963            .unwrap_err();
2964        assert_eq!(err.code, "INVALID_COLUMN");
2965    }
2966
2967    #[test]
2968    fn query_filtered_sanitizes_order_direction() {
2969        let rt = Runtime::in_memory(test_manifest()).unwrap();
2970        rt.insert(
2971            "User",
2972            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2973        )
2974        .unwrap();
2975        // Even a malicious direction value should be normalized to ASC.
2976        let rows = rt
2977            .query_filtered(
2978                "User",
2979                &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2980            )
2981            .unwrap();
2982        assert_eq!(rows.len(), 1);
2983    }
2984
2985    #[test]
2986    fn in_memory_has_no_read_pool() {
2987        let rt = Runtime::in_memory(test_manifest()).unwrap();
2988        assert_eq!(rt.read_pool_size(), 0);
2989    }
2990
2991    #[test]
2992    fn open_creates_read_pool() {
2993        let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2994        std::fs::create_dir_all(&dir).unwrap();
2995        let db_path = dir.join("test_read_pool.db");
2996
2997        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2998        assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
2999
3000        // Write then read through the pool.
3001        let id = rt
3002            .insert(
3003                "User",
3004                &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
3005            )
3006            .unwrap();
3007        let row = rt.get_by_id("User", &id).unwrap().unwrap();
3008        assert_eq!(row["email"], "pool@test.com");
3009
3010        // Clean up.
3011        let _ = std::fs::remove_dir_all(&dir);
3012    }
3013
3014    #[test]
3015    fn concurrent_reads_dont_block_on_write() {
3016        use std::sync::Arc;
3017
3018        let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
3019        std::fs::create_dir_all(&dir).unwrap();
3020        let db_path = dir.join("test_concurrent.db");
3021
3022        let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
3023
3024        // Seed some data so reads have something to return.
3025        rt.insert(
3026            "User",
3027            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
3028        )
3029        .unwrap();
3030        rt.insert(
3031            "User",
3032            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
3033        )
3034        .unwrap();
3035
3036        // Hold the write lock to simulate a long write.
3037        let write_guard = rt.lock_write_conn().unwrap();
3038
3039        // Spawn reader threads that should succeed despite the held write lock.
3040        let mut handles = Vec::new();
3041        for _ in 0..4 {
3042            let rt_clone = Arc::clone(&rt);
3043            handles.push(std::thread::spawn(move || {
3044                let rows = rt_clone.list("User").unwrap();
3045                assert_eq!(rows.len(), 2);
3046            }));
3047        }
3048
3049        for h in handles {
3050            h.join().expect("reader thread panicked");
3051        }
3052
3053        // Release the write lock.
3054        drop(write_guard);
3055
3056        // Clean up.
3057        let _ = std::fs::remove_dir_all(&dir);
3058    }
3059}