Skip to main content

pylon_runtime/
lib.rs

1pub mod cache_handlers;
2pub mod cache_server;
3pub mod config;
4pub mod cron;
5pub mod datastore;
6pub mod ip_limit;
7pub mod job_store;
8pub mod jobs;
9pub mod log;
10pub mod loro_store;
11pub mod metrics;
12pub mod oauth_backend;
13pub mod openapi;
14pub mod presence;
15pub mod pubsub;
16pub mod rate_limit;
17pub mod resp;
18pub mod resp_server;
19pub mod rooms;
20pub mod scheduler;
21pub mod server;
22pub mod session_backend;
23pub mod shard_ws;
24pub mod sse;
25pub mod tls;
26pub mod workflow_store;
27pub mod workflows;
28pub mod ws;
29
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Mutex;
33
34use pylon_kernel::{AppManifest, ManifestEntity};
35use rusqlite::Connection;
36
37// ---------------------------------------------------------------------------
38// Runtime errors
39// ---------------------------------------------------------------------------
40
41#[derive(Debug, Clone)]
42pub struct RuntimeError {
43    pub code: String,
44    pub message: String,
45}
46
47impl std::fmt::Display for RuntimeError {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        write!(f, "[{}] {}", self.code, self.message)
50    }
51}
52
53impl std::error::Error for RuntimeError {}
54
55// ---------------------------------------------------------------------------
56// SQL safety helpers
57// ---------------------------------------------------------------------------
58
59/// Quote a SQL identifier with double quotes to prevent injection.
60/// Any embedded double quotes are escaped by doubling them (SQL standard).
61fn quote_ident(name: &str) -> String {
62    format!("\"{}\"", name.replace('"', "\"\""))
63}
64
65/// Validate that `name` is a known column on the given entity.
66/// Always allows "id" (the primary key). Returns an error listing valid
67/// columns when validation fails.
68fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
69    if name == "id" {
70        return Ok(());
71    }
72    if entity.fields.iter().any(|f| f.name == name) {
73        return Ok(());
74    }
75    Err(RuntimeError {
76        code: "INVALID_COLUMN".into(),
77        message: format!(
78            "Unknown column \"{}\" -- valid columns: id, {}",
79            name,
80            entity
81                .fields
82                .iter()
83                .map(|f| f.name.as_str())
84                .collect::<Vec<_>>()
85                .join(", ")
86        ),
87    })
88}
89
90// ---------------------------------------------------------------------------
91// Connection tuning
92// ---------------------------------------------------------------------------
93
94/// Apply the production pragma set on a SQLite connection. Identical
95/// values to `pylon_storage::sqlite::tune_connection` — kept here too
96/// because the Runtime opens its own connections directly (write +
97/// read pool) without going through the storage adapter.
98///
99/// See `crates/storage/src/sqlite.rs` for the rationale on each
100/// pragma. Skipping it on writes drops throughput by 5–10×.
101fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
102    let pragmas: &[(&str, &str)] = if in_memory {
103        &[
104            ("temp_store", "MEMORY"),
105            ("cache_size", "-65536"),
106            ("foreign_keys", "ON"),
107        ]
108    } else {
109        &[
110            ("journal_mode", "WAL"),
111            ("synchronous", "NORMAL"),
112            ("cache_size", "-65536"),
113            ("mmap_size", "268435456"),
114            ("temp_store", "MEMORY"),
115            ("busy_timeout", "5000"),
116            ("foreign_keys", "ON"),
117            ("wal_autocheckpoint", "1000"),
118        ]
119    };
120    for (key, value) in pragmas {
121        let _ = conn.pragma_update(None, key, value);
122    }
123}
124
125// ---------------------------------------------------------------------------
126// Read connection guard
127// ---------------------------------------------------------------------------
128
129/// A guard that dereferences to a `Connection`, abstracting over whether
130/// it came from the read pool or fell back to the write connection.
131enum ReadConnGuard<'a> {
132    Pooled(std::sync::MutexGuard<'a, Connection>),
133    Write(std::sync::MutexGuard<'a, Connection>),
134}
135
136impl<'a> std::ops::Deref for ReadConnGuard<'a> {
137    type Target = Connection;
138    fn deref(&self) -> &Connection {
139        match self {
140            ReadConnGuard::Pooled(g) => g,
141            ReadConnGuard::Write(g) => g,
142        }
143    }
144}
145
146// ---------------------------------------------------------------------------
147// Runtime — the core execution engine
148// ---------------------------------------------------------------------------
149
150/// A minimal runtime that executes CRUD operations against a SQLite database
151/// based on a manifest contract.
152///
153/// In WAL mode SQLite allows one writer and multiple concurrent readers.
154/// This struct exploits that by keeping a single write connection behind a
155/// mutex and a pool of read-only connections that can be acquired in
156/// parallel, so read operations never block on (or are blocked by) writes.
157pub struct Runtime {
158    /// Write connection — single mutex, serializes writes.
159    write_conn: Mutex<Connection>,
160    /// Read connections — pool of connections for concurrent reads.
161    /// Empty for in-memory databases where extra connections are not possible.
162    read_pool: Vec<Mutex<Connection>>,
163    /// Counter for round-robin read pool selection.
164    read_counter: AtomicUsize,
165    manifest: AppManifest,
166    entities: HashMap<String, ManifestEntity>,
167    /// Set by the constructor — NOT derived from `conn.path()` (that path
168    /// conflates "no filename" with "in-memory"; see `is_in_memory` doc).
169    is_in_memory: bool,
170    /// Per-row LoroDoc cache + sidecar persistence. Used for entities with
171    /// `crdt: true` (the default). Reads still hit SQLite directly via the
172    /// read pool — the LoroDoc just produces the projected JSON that gets
173    /// materialized into SQLite columns on every write.
174    crdt: crate::loro_store::LoroStore,
175}
176
177/// Number of read-only connections to open in the pool.
178const READ_POOL_SIZE: usize = 4;
179
180impl Runtime {
181    /// Open a runtime against an existing SQLite database.
182    pub fn open(db_path: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
183        let conn = Connection::open(db_path).map_err(|e| RuntimeError {
184            code: "RUNTIME_OPEN_FAILED".into(),
185            message: format!("Failed to open database: {e}"),
186        })?;
187        Self::from_connection(conn, manifest, false)
188    }
189
190    /// Returns true if this runtime is backed by an in-memory SQLite DB.
191    ///
192    /// Stored at open time rather than queried via `conn.path()` because
193    /// the path-based check conflates "no filename" with "in-memory":
194    /// `Connection::open("")` yields a file-backed DB with empty path,
195    /// and would falsely pass as in-memory. Since we always know at
196    /// construction time which constructor was used, track the bit.
197    ///
198    /// Gates the test-reset endpoint — a false positive here would let
199    /// `/api/__test__/reset` truncate real tables.
200    pub fn is_in_memory(&self) -> bool {
201        self.is_in_memory
202    }
203
204    /// Filesystem path to the SQLite database, if this runtime is file-backed.
205    /// Returns `None` for in-memory runtimes. Used by the server bootstrap to
206    /// derive companion paths (session store, change log persistence) without
207    /// requiring the caller to pass them in.
208    pub fn db_path(&self) -> Option<String> {
209        if self.is_in_memory {
210            return None;
211        }
212        let conn = self.write_conn.lock().ok()?;
213        conn.path().filter(|p| !p.is_empty()).map(String::from)
214    }
215
216    /// Drop every row from every entity table. Intended for test harnesses
217    /// that call `/api/__test__/reset` between cases; refuses to run on
218    /// anything but an in-memory database.
219    ///
220    /// Does NOT drop the tables themselves — schema stays, indexes stay,
221    /// triggers stay. Just truncates user data + the change log.
222    pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
223        if !self.is_in_memory() {
224            return Err(RuntimeError {
225                code: "RESET_REFUSED".into(),
226                message: "reset_for_tests is only available on in-memory databases".into(),
227            });
228        }
229        let conn = self.lock_write_conn()?;
230        let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
231        for name in entity_names {
232            let sql = format!("DELETE FROM {}", quote_ident(&name));
233            let _ = conn.execute(&sql, []);
234            // Also clear any FTS5 shadow table if present.
235            let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
236            let _ = conn.execute(&fts_sql, []);
237        }
238        Ok(())
239    }
240
241    /// Create an in-memory runtime (useful for tests and benchmarks).
242    pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
243        let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
244            code: "RUNTIME_OPEN_FAILED".into(),
245            message: format!("Failed to open in-memory database: {e}"),
246        })?;
247        Self::from_connection(conn, manifest, true)
248    }
249
250    fn from_connection(
251        conn: Connection,
252        manifest: AppManifest,
253        is_in_memory: bool,
254    ) -> Result<Self, RuntimeError> {
255        // Apply the production pragma set on the write connection.
256        tune_runtime_connection(&conn, is_in_memory);
257
258        // Build entity lookup map.
259        let entities: HashMap<String, ManifestEntity> = manifest
260            .entities
261            .iter()
262            .map(|e| (e.name.clone(), e.clone()))
263            .collect();
264
265        // Create tables for all entities.
266        for entity in &manifest.entities {
267            let fields: Vec<String> = entity
268                .fields
269                .iter()
270                .map(|f| {
271                    let col_type = match f.field_type.as_str() {
272                        "int" => "INTEGER",
273                        "float" => "REAL",
274                        "bool" => "INTEGER",
275                        _ => "TEXT",
276                    };
277                    let not_null = if f.optional { "" } else { " NOT NULL" };
278                    let unique = if f.unique { " UNIQUE" } else { "" };
279                    format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
280                })
281                .collect();
282
283            let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
284            cols.extend(fields);
285            let sql = format!(
286                "CREATE TABLE IF NOT EXISTS {} ({})",
287                quote_ident(&entity.name),
288                cols.join(", ")
289            );
290            conn.execute(&sql, []).map_err(|e| RuntimeError {
291                code: "SCHEMA_INIT_FAILED".into(),
292                message: format!("Failed to create table {}: {e}", entity.name),
293            })?;
294
295            // Create indexes.
296            for idx in &entity.indexes {
297                let unique_kw = if idx.unique { "UNIQUE " } else { "" };
298                let quoted_fields: Vec<String> =
299                    idx.fields.iter().map(|f| quote_ident(f)).collect();
300                let idx_sql = format!(
301                    "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
302                    quote_ident(&idx.name),
303                    quote_ident(&entity.name),
304                    quoted_fields.join(", ")
305                );
306                conn.execute(&idx_sql, []).ok();
307            }
308
309            // Create an FTS5 virtual table over all text-ish fields so clients
310            // can do full-text search via the `$search` query operator.
311            //
312            // Fields that look like "string" / "richtext" / "text" are indexed.
313            // The FTS table is a contentless external-content table pointed at
314            // the entity table, so SQLite keeps it consistent via triggers we
315            // install below.
316            let text_fields: Vec<&str> = entity
317                .fields
318                .iter()
319                .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
320                .map(|f| f.name.as_str())
321                .collect();
322            if !text_fields.is_empty() {
323                let fts_name = format!("{}_fts", entity.name);
324                let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
325                let fts_sql = format!(
326                    "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
327                    quote_ident(&fts_name),
328                    quoted_cols.join(", "),
329                    quote_ident(&entity.name),
330                );
331                // FTS5 may not be compiled in; ignore errors so those builds
332                // still work (queries using $search will return empty).
333                let fts_ok = conn.execute(&fts_sql, []).is_ok();
334
335                if fts_ok {
336                    // Sync triggers: keep FTS index current on INSERT/UPDATE/DELETE.
337                    //
338                    // Subtle bug fixed: the trigger NAME must be built from
339                    // the raw `fts_name` + suffix and THEN quoted once.
340                    // Previously this code quoted `fts_name` first and then
341                    // appended `_ai`/`_ad`/`_au` AFTER the closing quote,
342                    // producing invalid SQL like `"foo_fts"_ai`. The
343                    // `.ok()` after execute silently ate the error, so the
344                    // triggers were never created and FTS stayed out of
345                    // sync on writes.
346                    let tbl = quote_ident(&entity.name);
347                    let ftb = quote_ident(&fts_name);
348                    let cols_list = quoted_cols.join(", ");
349                    let new_list: Vec<String> = text_fields
350                        .iter()
351                        .map(|f| format!("new.{}", quote_ident(f)))
352                        .collect();
353                    let old_list: Vec<String> = text_fields
354                        .iter()
355                        .map(|f| format!("old.{}", quote_ident(f)))
356                        .collect();
357
358                    let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
359                    let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
360                    let trigger_au = quote_ident(&format!("{}_au", fts_name));
361
362                    let trigger_ins = format!(
363                        "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
364                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
365                        new_vals = new_list.join(", "),
366                    );
367                    let trigger_del = format!(
368                        "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
369                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
370                        old_vals = old_list.join(", "),
371                    );
372                    let trigger_upd = format!(
373                        "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
374                         INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
375                         INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
376                        new_vals = new_list.join(", "),
377                        old_vals = old_list.join(", "),
378                    );
379                    // Log failures instead of silently dropping — FTS going
380                    // stale should be visible to operators.
381                    for (label, sql) in [
382                        ("ai", &trigger_ins),
383                        ("ad", &trigger_del),
384                        ("au", &trigger_upd),
385                    ] {
386                        if let Err(e) = conn.execute(sql, []) {
387                            tracing::warn!(
388                                "[fts] failed to create {label} trigger for {}: {e}",
389                                entity.name
390                            );
391                        }
392                    }
393                }
394            }
395        }
396
397        // Open read-only connection pool for file-backed databases.
398        // In-memory databases cannot share connections, so the pool stays empty
399        // and reads fall back to the write connection.
400        let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
401
402        let read_pool = if let Some(ref path) = db_path {
403            let mut pool = Vec::with_capacity(READ_POOL_SIZE);
404            for _ in 0..READ_POOL_SIZE {
405                let read_conn = Connection::open_with_flags(
406                    path,
407                    rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
408                        | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
409                )
410                .map_err(|e| RuntimeError {
411                    code: "POOL_OPEN_FAILED".into(),
412                    message: format!("Failed to open read connection: {e}"),
413                })?;
414                tune_runtime_connection(&read_conn, false);
415                pool.push(Mutex::new(read_conn));
416            }
417            pool
418        } else {
419            // In-memory DB — no separate read connections possible.
420            Vec::new()
421        };
422
423        // Sidecar table for CRDT snapshots — created always so toggling
424        // `crdt: true` on an entity post-deploy doesn't need a migration.
425        crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
426            code: "CRDT_SIDECAR_FAILED".into(),
427            message: format!("create CRDT sidecar table: {e}"),
428        })?;
429
430        Ok(Self {
431            write_conn: Mutex::new(conn),
432            read_pool,
433            read_counter: AtomicUsize::new(0),
434            manifest,
435            entities,
436            is_in_memory,
437            crdt: crate::loro_store::LoroStore::new(),
438        })
439    }
440
441    /// Create the search index tables (`_facet_bitmap`, per-entity
442    /// `_fts_<Entity>`, and a covering index for each declared
443    /// sortable field) for every searchable entity in the manifest.
444    ///
445    /// Production deployments do this via the storage adapter's
446    /// `apply_schema` / migration plan; that path also handles
447    /// adding/removing the tables when a `search:` block is added or
448    /// removed across deploys. This method is a quick path for tests
449    /// and benchmarks that build a `Runtime::in_memory(...)` directly
450    /// without going through the schema-plan pipeline.
451    pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
452        let conn = self.lock_write_conn()?;
453        conn.execute(pylon_storage::search::create_facet_table_sql(), [])
454            .map_err(|e| RuntimeError {
455                code: "FACET_TABLE_FAILED".into(),
456                message: format!("create _facet_bitmap: {e}"),
457            })?;
458        for entity in &self.manifest.entities {
459            if let Some(cfg) = &entity.search {
460                if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
461                    conn.execute(&sql, []).map_err(|e| RuntimeError {
462                        code: "FTS_TABLE_FAILED".into(),
463                        message: format!("create FTS table for {}: {e}", entity.name),
464                    })?;
465                }
466                for field in &cfg.sortable {
467                    let idx_sql = format!(
468                        "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
469                        entity.name, entity.name,
470                    );
471                    conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
472                        code: "SORT_INDEX_FAILED".into(),
473                        message: format!("create sort index for {}.{field}: {e}", entity.name),
474                    })?;
475                }
476            }
477        }
478        Ok(())
479    }
480
481    /// Return a reference to the app manifest.
482    pub fn manifest(&self) -> &AppManifest {
483        &self.manifest
484    }
485
486    /// Expose the write connection mutex for transactional operations.
487    pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
488        self.lock_write_conn()
489    }
490
491    /// Return the number of read connections in the pool (0 for in-memory DBs).
492    pub fn read_pool_size(&self) -> usize {
493        self.read_pool.len()
494    }
495
496    // -----------------------------------------------------------------------
497    // CRDT helpers
498    // -----------------------------------------------------------------------
499
500    /// Map an entity's manifest fields → the [`pylon_crdt::CrdtField`] vec
501    /// the LoroStore needs. Resolves each field's CRDT shape from the
502    /// (type, annotation) pair via `pylon_crdt::field_kind`. Caches
503    /// nothing yet — called per write, fine at our entity counts.
504    pub(crate) fn crdt_fields_for(
505        &self,
506        ent: &ManifestEntity,
507    ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
508        let mut out = Vec::with_capacity(ent.fields.len());
509        for f in &ent.fields {
510            // Skip the implicit `id` column — it's the row key, not a
511            // CRDT-managed value. SQLite's PRIMARY KEY constraint owns it.
512            if f.name == "id" {
513                continue;
514            }
515            let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
516                code: "INVALID_CRDT_FIELD".into(),
517                message: format!(
518                    "{}.{}: {e} (declared type={}, crdt={:?})",
519                    ent.name, f.name, f.field_type, f.crdt
520                ),
521            })?;
522            out.push(pylon_crdt::CrdtField {
523                name: f.name.clone(),
524                kind,
525            });
526        }
527        Ok(out)
528    }
529
530    /// Borrow the CRDT store. Tests use this to inspect cache state and
531    /// the WS handler will use it to fetch snapshots on subscribe.
532    pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
533        &self.crdt
534    }
535
536    // -----------------------------------------------------------------------
537    // CRUD operations
538    // -----------------------------------------------------------------------
539
540    /// Insert a new row. Returns the generated ID.
541    ///
542    /// For entities with `crdt: true` (the default) the LoroDoc snapshot
543    /// + the SQLite materialized row are committed together in a single
544    /// SQLite transaction so a crash between the two leaves neither.
545    /// `crdt: false` entities skip the LoroDoc and use a direct write
546    /// (legacy LWW path). Both produce the same on-disk row shape, so
547    /// reads, indexes, FTS, and policies don't change between modes.
548    pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
549        let ent = self.require_entity(entity)?;
550        let conn = self.lock_write_conn()?;
551
552        let id = generate_id();
553
554        let obj = data.as_object().ok_or_else(|| RuntimeError {
555            code: "INVALID_DATA".into(),
556            message: "Insert data must be a JSON object".into(),
557        })?;
558
559        // Validate columns up-front so we don't even open a transaction
560        // for a patch that the SQL INSERT will reject.
561        for key in obj.keys() {
562            if key != "id" {
563                validate_column_name(key, ent)?;
564            }
565        }
566
567        // Atomic block — CRDT sidecar snapshot + materialized SQL row +
568        // search-index maintenance all land together or none does. SQLite's
569        // rollback journal makes this crash-safe end-to-end.
570        with_write_tx(&conn, || {
571            if ent.crdt {
572                let crdt_fields = self.crdt_fields_for(ent)?;
573                self.crdt
574                    .apply_patch(&conn, entity, &id, &crdt_fields, data)
575                    .map_err(|e| RuntimeError {
576                        code: "CRDT_APPLY_FAILED".into(),
577                        message: format!("crdt write {entity}/{id}: {e}"),
578                    })?;
579            }
580
581            let mut col_names = vec![quote_ident("id")];
582            let mut placeholders = vec!["?1".to_string()];
583            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
584
585            let mut idx = 2;
586            for (key, val) in obj {
587                if key == "id" {
588                    continue;
589                }
590                col_names.push(quote_ident(key));
591                placeholders.push(format!("?{idx}"));
592                values.push(json_to_sql(val));
593                idx += 1;
594            }
595
596            let sql = format!(
597                "INSERT INTO {} ({}) VALUES ({})",
598                quote_ident(entity),
599                col_names.join(", "),
600                placeholders.join(", ")
601            );
602
603            let params: Vec<&dyn rusqlite::types::ToSql> =
604                values.iter().map(|v| v.as_ref()).collect();
605            conn.execute(&sql, params.as_slice())
606                .map_err(|e| RuntimeError {
607                    code: "INSERT_FAILED".into(),
608                    message: format!("Insert into {entity} failed: {e}"),
609                })?;
610
611            // Search-index maintenance lives inside the same tx so a
612            // crash between the row insert and the FTS update can't leave
613            // the search index inconsistent with the row table.
614            if let Some(cfg) = ent.search.as_ref() {
615                if !cfg.is_empty() {
616                    pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
617                        .map_err(|e| RuntimeError {
618                            code: "SEARCH_MAINTENANCE_FAILED".into(),
619                            message: format!("search index update on insert {entity}: {e}"),
620                        })?;
621                }
622            }
623            Ok(())
624        })?;
625
626        Ok(id)
627    }
628
629    /// Get a single row by ID.
630    pub fn get_by_id(
631        &self,
632        entity: &str,
633        id: &str,
634    ) -> Result<Option<serde_json::Value>, RuntimeError> {
635        let ent = self.require_entity(entity)?;
636        let conn = self.lock_read_conn()?;
637
638        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
639        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
640            code: "QUERY_FAILED".into(),
641            message: format!("Failed to prepare query: {e}"),
642        })?;
643
644        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
645
646        let result = stmt
647            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
648            .ok();
649
650        Ok(result)
651    }
652
653    /// List all rows for an entity.
654    pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
655        let ent = self.require_entity(entity)?;
656        let conn = self.lock_read_conn()?;
657
658        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
659        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
660            code: "QUERY_FAILED".into(),
661            message: format!("Failed to prepare query: {e}"),
662        })?;
663
664        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
665
666        let rows = stmt
667            .query_map([], |row| Ok(row_to_json(row, &columns)))
668            .map_err(|e| RuntimeError {
669                code: "QUERY_FAILED".into(),
670                message: format!("Query failed: {e}"),
671            })?;
672
673        let mut result = Vec::new();
674        for row in rows {
675            if let Ok(val) = row {
676                result.push(val);
677            }
678        }
679        Ok(result)
680    }
681
682    /// List rows after a cursor ID (for cursor-based pagination).
683    pub fn list_after(
684        &self,
685        entity: &str,
686        after: Option<&str>,
687        limit: usize,
688    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
689        let ent = self.require_entity(entity)?;
690        let conn = self.lock_read_conn()?;
691
692        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
693        let table = quote_ident(entity);
694
695        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
696            Some(cursor) => (
697                format!(
698                    "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
699                    table
700                ),
701                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
702            ),
703            None => (
704                format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
705                vec![Box::new(limit as i64)],
706            ),
707        };
708
709        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
710            params.iter().map(|v| v.as_ref()).collect();
711
712        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
713            code: "QUERY_FAILED".into(),
714            message: format!("Failed to prepare query: {e}"),
715        })?;
716
717        let rows = stmt
718            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
719            .map_err(|e| RuntimeError {
720                code: "QUERY_FAILED".into(),
721                message: format!("Query failed: {e}"),
722            })?;
723
724        let mut result = Vec::new();
725        for row in rows {
726            if let Ok(val) = row {
727                result.push(val);
728            }
729        }
730        Ok(result)
731    }
732
733    /// Update a row by ID. Returns true if a row was found and updated.
734    ///
735    /// For entities with `crdt: true` (the default) the LoroDoc receives
736    /// the patch first; the SQLite UPDATE writes the same fields so the
737    /// materialized view stays in lockstep with the doc state.
738    pub fn update(
739        &self,
740        entity: &str,
741        id: &str,
742        data: &serde_json::Value,
743    ) -> Result<bool, RuntimeError> {
744        let ent = self.require_entity(entity)?;
745        let conn = self.lock_write_conn()?;
746
747        let obj = data.as_object().ok_or_else(|| RuntimeError {
748            code: "INVALID_DATA".into(),
749            message: "Update data must be a JSON object".into(),
750        })?;
751
752        // Validate up-front and exit cheap if there's nothing to write.
753        for key in obj.keys() {
754            if key != "id" {
755                validate_column_name(key, ent)?;
756            }
757        }
758        let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
759        if writable_keys.is_empty() {
760            return Ok(false);
761        }
762
763        // Atomic block — same shape as insert. CRDT snapshot, SQL UPDATE,
764        // and FTS maintenance all commit together.
765        let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
766            if ent.crdt {
767                let crdt_fields = self.crdt_fields_for(ent)?;
768                self.crdt
769                    .apply_patch(&conn, entity, id, &crdt_fields, data)
770                    .map_err(|e| RuntimeError {
771                        code: "CRDT_APPLY_FAILED".into(),
772                        message: format!("crdt write {entity}/{id}: {e}"),
773                    })?;
774            }
775
776            let mut set_clauses = Vec::new();
777            let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
778            let mut idx = 1;
779            for key in &writable_keys {
780                set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
781                values.push(json_to_sql(&obj[key.as_str()]));
782                idx += 1;
783            }
784
785            // Capture pre-UPDATE row for search-maintenance diff INSIDE the
786            // tx. Matches the contract of search_maintenance::apply_update
787            // — old state must be read before the UPDATE lands.
788            let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
789            let old_row = if searchable {
790                self.get_by_id_with_conn(&conn, entity, id)?
791            } else {
792                None
793            };
794
795            values.push(Box::new(id.to_string()));
796            let sql = format!(
797                "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
798                quote_ident(entity),
799                set_clauses.join(", ")
800            );
801
802            let params: Vec<&dyn rusqlite::types::ToSql> =
803                values.iter().map(|v| v.as_ref()).collect();
804            let affected = conn
805                .execute(&sql, params.as_slice())
806                .map_err(|e| RuntimeError {
807                    code: "UPDATE_FAILED".into(),
808                    message: format!("Update {entity}/{id} failed: {e}"),
809                })? as i64;
810
811            if affected > 0 && searchable {
812                if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
813                    pylon_storage::search_maintenance::apply_update(
814                        &conn, entity, id, &old, data, cfg,
815                    )
816                    .map_err(|e| RuntimeError {
817                        code: "SEARCH_MAINTENANCE_FAILED".into(),
818                        message: format!("search index update on update {entity}: {e}"),
819                    })?;
820                }
821            }
822            Ok(affected)
823        })?;
824
825        Ok(affected > 0)
826    }
827
828    /// Delete a row by ID. Returns true if a row was actually deleted.
829    pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
830        let ent = self.require_entity(entity)?;
831        let conn = self.lock_write_conn()?;
832
833        // Apply search-maintenance BEFORE the DELETE — we still need
834        // the row's facet values to clear the bitmap bits.
835        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
836        if searchable {
837            if let (Some(cfg), Ok(Some(row))) = (
838                ent.search.as_ref(),
839                self.get_by_id_with_conn(&conn, entity, id),
840            ) {
841                pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
842                    .map_err(|e| RuntimeError {
843                        code: "SEARCH_MAINTENANCE_FAILED".into(),
844                        message: format!("search index update on delete {entity}: {e}"),
845                    })?;
846            }
847        }
848
849        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
850        let affected = conn
851            .execute(&sql, rusqlite::params![id])
852            .map_err(|e| RuntimeError {
853                code: "DELETE_FAILED".into(),
854                message: format!("Delete {entity}/{id} failed: {e}"),
855            })?;
856
857        Ok(affected > 0)
858    }
859
860    /// Lookup a single row by a field value (e.g., email).
861    pub fn lookup(
862        &self,
863        entity: &str,
864        field: &str,
865        value: &str,
866    ) -> Result<Option<serde_json::Value>, RuntimeError> {
867        let ent = self.require_entity(entity)?;
868        validate_column_name(field, ent)?;
869        let conn = self.lock_read_conn()?;
870
871        let sql = format!(
872            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
873            quote_ident(entity),
874            quote_ident(field)
875        );
876        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
877
878        let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
879            stmt.query_row(rusqlite::params![value], |row| {
880                Ok(row_to_json(row, &columns))
881            })
882            .ok()
883        });
884
885        Ok(result)
886    }
887
888    /// Link two entities by setting a foreign-key field.
889    pub fn link(
890        &self,
891        entity: &str,
892        id: &str,
893        relation: &str,
894        target_id: &str,
895    ) -> Result<bool, RuntimeError> {
896        let ent = self.require_entity(entity)?;
897
898        // Find the relation definition to determine which field to set.
899        let rel = ent
900            .relations
901            .iter()
902            .find(|r| r.name == relation)
903            .ok_or_else(|| RuntimeError {
904                code: "RELATION_NOT_FOUND".into(),
905                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
906            })?;
907
908        let data = serde_json::json!({ rel.field.clone(): target_id });
909        self.update(entity, id, &data)
910    }
911
912    /// Unlink a relation by setting the foreign-key field to null.
913    pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
914        let ent = self.require_entity(entity)?;
915
916        let rel = ent
917            .relations
918            .iter()
919            .find(|r| r.name == relation)
920            .ok_or_else(|| RuntimeError {
921                code: "RELATION_NOT_FOUND".into(),
922                message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
923            })?;
924
925        let data = serde_json::json!({ rel.field.clone(): null });
926        self.update(entity, id, &data)
927    }
928
929    /// Execute a filtered query with operators ($not, $gt, $in, $like, $order, $limit).
930    pub fn query_filtered(
931        &self,
932        entity: &str,
933        filter: &serde_json::Value,
934    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
935        let ent = self.require_entity(entity)?;
936        let conn = self.lock_read_conn()?;
937
938        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
939        let obj = filter
940            .as_object()
941            .unwrap_or(&serde_json::Map::new())
942            .clone();
943
944        let mut where_clauses = Vec::new();
945        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
946        let mut order_clause = String::new();
947        let mut limit_clause = String::new();
948        let mut join_clause = String::new();
949        let mut fts_order = false;
950        let mut idx = 1;
951
952        for (key, val) in &obj {
953            match key.as_str() {
954                "$order" => {
955                    if let Some(order_obj) = val.as_object() {
956                        let mut parts: Vec<String> = Vec::new();
957                        for (col, dir) in order_obj {
958                            validate_column_name(col, ent)?;
959                            let d = match dir.as_str().unwrap_or("asc") {
960                                "desc" | "DESC" => "DESC",
961                                _ => "ASC",
962                            };
963                            parts.push(format!("{} {d}", quote_ident(col)));
964                        }
965                        if !parts.is_empty() {
966                            order_clause = format!(" ORDER BY {}", parts.join(", "));
967                        }
968                    }
969                }
970                "$limit" => {
971                    if let Some(n) = val.as_u64() {
972                        limit_clause = format!(" LIMIT {n}");
973                    }
974                }
975                "$offset" => {
976                    if let Some(n) = val.as_u64() {
977                        // SQLite requires LIMIT before OFFSET; add a default.
978                        if limit_clause.is_empty() {
979                            limit_clause = " LIMIT -1".into();
980                        }
981                        limit_clause = format!("{limit_clause} OFFSET {n}");
982                    }
983                }
984                "$search" => {
985                    if let Some(q) = val.as_str() {
986                        // Join against the entity's FTS5 virtual table.
987                        let fts = format!("{}_fts", entity);
988                        join_clause = format!(
989                            " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
990                            fts = quote_ident(&fts),
991                            ent = quote_ident(entity),
992                        );
993                        where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
994                        values.push(Box::new(q.to_string()));
995                        fts_order = true;
996                        idx += 1;
997                    }
998                }
999                _ => {
1000                    validate_column_name(key, ent)?;
1001                    let quoted_key = quote_ident(key);
1002
1003                    if let Some(op_obj) = val.as_object() {
1004                        for (op, op_val) in op_obj {
1005                            match op.as_str() {
1006                                "$not" => {
1007                                    where_clauses.push(format!("{quoted_key} != ?{idx}"));
1008                                    values.push(json_to_sql(op_val));
1009                                    idx += 1;
1010                                }
1011                                "$gt" => {
1012                                    where_clauses.push(format!("{quoted_key} > ?{idx}"));
1013                                    values.push(json_to_sql(op_val));
1014                                    idx += 1;
1015                                }
1016                                "$gte" => {
1017                                    where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1018                                    values.push(json_to_sql(op_val));
1019                                    idx += 1;
1020                                }
1021                                "$lt" => {
1022                                    where_clauses.push(format!("{quoted_key} < ?{idx}"));
1023                                    values.push(json_to_sql(op_val));
1024                                    idx += 1;
1025                                }
1026                                "$lte" => {
1027                                    where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1028                                    values.push(json_to_sql(op_val));
1029                                    idx += 1;
1030                                }
1031                                "$like" => {
1032                                    where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1033                                    let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1034                                    values.push(Box::new(pattern));
1035                                    idx += 1;
1036                                }
1037                                "$in" => {
1038                                    if let Some(arr) = op_val.as_array() {
1039                                        let placeholders: Vec<String> = arr
1040                                            .iter()
1041                                            .map(|v| {
1042                                                let p = format!("?{idx}");
1043                                                values.push(json_to_sql(v));
1044                                                idx += 1;
1045                                                p
1046                                            })
1047                                            .collect();
1048                                        if !placeholders.is_empty() {
1049                                            where_clauses.push(format!(
1050                                                "{quoted_key} IN ({})",
1051                                                placeholders.join(", ")
1052                                            ));
1053                                        }
1054                                    }
1055                                }
1056                                _ => {}
1057                            }
1058                        }
1059                    } else {
1060                        // Simple equality.
1061                        where_clauses.push(format!("{quoted_key} = ?{idx}"));
1062                        values.push(json_to_sql(val));
1063                        idx += 1;
1064                    }
1065                }
1066            }
1067        }
1068
1069        let where_sql = if where_clauses.is_empty() {
1070            String::new()
1071        } else {
1072            format!(" WHERE {}", where_clauses.join(" AND "))
1073        };
1074
1075        if order_clause.is_empty() {
1076            order_clause = if fts_order {
1077                // FTS joins default-order by bm25 relevance.
1078                " ORDER BY bm25(".to_string() + &quote_ident(&format!("{}_fts", entity)) + ")"
1079            } else {
1080                format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1081            };
1082        }
1083
1084        let select_prefix = format!("{}.*", quote_ident(entity));
1085        let sql = format!(
1086            "SELECT {} FROM {}{}{}{}{}",
1087            select_prefix,
1088            quote_ident(entity),
1089            join_clause,
1090            where_sql,
1091            order_clause,
1092            limit_clause
1093        );
1094        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1095            values.iter().map(|v| v.as_ref()).collect();
1096
1097        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1098            code: "QUERY_FAILED".into(),
1099            message: format!("Failed to prepare filtered query: {e}"),
1100        })?;
1101
1102        let rows = stmt
1103            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1104            .map_err(|e| RuntimeError {
1105                code: "QUERY_FAILED".into(),
1106                message: format!("Filtered query failed: {e}"),
1107            })?;
1108
1109        let mut result = Vec::new();
1110        for row in rows {
1111            if let Ok(val) = row {
1112                result.push(val);
1113            }
1114        }
1115        Ok(result)
1116    }
1117
1118    /// Execute a graph-style query.
1119    ///
1120    /// Input: `{ "User": { "where": { "email": "..." }, "include": { "posts": {} } } }`
1121    /// Returns nested results following relations.
1122    pub fn query_graph(
1123        &self,
1124        query: &serde_json::Value,
1125    ) -> Result<serde_json::Value, RuntimeError> {
1126        let obj = query.as_object().ok_or_else(|| RuntimeError {
1127            code: "INVALID_QUERY".into(),
1128            message: "Graph query must be a JSON object".into(),
1129        })?;
1130
1131        let mut results = serde_json::Map::new();
1132
1133        for (entity_name, query_opts) in obj {
1134            let _ent = self.require_entity(entity_name)?;
1135
1136            // Apply where clause if present.
1137            let filter = query_opts
1138                .get("where")
1139                .cloned()
1140                .unwrap_or(serde_json::json!({}));
1141            let rows = self.query_filtered(entity_name, &filter)?;
1142
1143            // Apply includes (relations) if present.
1144            let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1145            {
1146                // Internal invariant: if query_filtered succeeded above, the
1147                // entity must exist. Previously this used .unwrap() which
1148                // would panic if the invariant broke — a panic inside the
1149                // handler path poisons the connection mutex and takes down
1150                // all subsequent reads. Fail the request cleanly instead.
1151                let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1152                    code: "INVARIANT_BROKEN".into(),
1153                    message: format!(
1154                        "entity \"{entity_name}\" missing from registry during include expansion"
1155                    ),
1156                })?;
1157                rows.into_iter()
1158                    .map(|mut row| {
1159                        for (rel_name, _sub_query) in include {
1160                            if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1161                                let fk_value = row
1162                                    .get(&rel.field)
1163                                    .and_then(|v| v.as_str())
1164                                    .map(|s| s.to_string());
1165                                if let Some(fk) = fk_value {
1166                                    if rel.many {
1167                                        // One-to-many: find rows in target where field matches id.
1168                                        let sub_filter = serde_json::json!({ &rel.field: &fk });
1169                                        if let Ok(related) =
1170                                            self.query_filtered(&rel.target, &sub_filter)
1171                                        {
1172                                            row[rel_name] = serde_json::json!(related);
1173                                        }
1174                                    } else {
1175                                        // One-to-one / many-to-one: get by id.
1176                                        if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1177                                        {
1178                                            row[rel_name] = related;
1179                                        }
1180                                    }
1181                                }
1182                            }
1183                        }
1184                        row
1185                    })
1186                    .collect()
1187            } else {
1188                rows
1189            };
1190
1191            // Apply limit if present.
1192            let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1193                rows.into_iter().take(limit as usize).collect()
1194            } else {
1195                rows
1196            };
1197
1198            results.insert(entity_name.clone(), serde_json::json!(rows));
1199        }
1200
1201        Ok(serde_json::Value::Object(results))
1202    }
1203
1204    // -----------------------------------------------------------------------
1205    // Transaction-safe variants (use a pre-held connection guard)
1206    // -----------------------------------------------------------------------
1207
1208    /// Insert using an already-locked connection (for transactions).
1209    pub fn insert_with_conn(
1210        &self,
1211        conn: &Connection,
1212        entity: &str,
1213        data: &serde_json::Value,
1214    ) -> Result<String, RuntimeError> {
1215        let ent = self.require_entity(entity)?;
1216        let id = generate_id();
1217        let obj = data.as_object().ok_or_else(|| RuntimeError {
1218            code: "INVALID_DATA".into(),
1219            message: "Insert data must be a JSON object".into(),
1220        })?;
1221
1222        let mut col_names = vec![quote_ident("id")];
1223        let mut placeholders = vec!["?1".to_string()];
1224        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1225        let mut idx = 2;
1226        for (key, val) in obj {
1227            if key == "id" {
1228                continue;
1229            }
1230            validate_column_name(key, ent)?;
1231            col_names.push(quote_ident(key));
1232            placeholders.push(format!("?{idx}"));
1233            values.push(json_to_sql(val));
1234            idx += 1;
1235        }
1236
1237        let sql = format!(
1238            "INSERT INTO {} ({}) VALUES ({})",
1239            quote_ident(entity),
1240            col_names.join(", "),
1241            placeholders.join(", ")
1242        );
1243        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1244        conn.execute(&sql, params.as_slice())
1245            .map_err(|e| RuntimeError {
1246                code: "INSERT_FAILED".into(),
1247                message: format!("Insert into {entity} failed: {e}"),
1248            })?;
1249
1250        // Faceted-search maintenance in the same transaction. Skipped
1251        // for entities that don't declare `search:` in their schema.
1252        if let Some(cfg) = ent.search.as_ref() {
1253            if !cfg.is_empty() {
1254                pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1255                    .map_err(|e| RuntimeError {
1256                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1257                        message: format!("search index update on insert {entity}: {e}"),
1258                    })?;
1259            }
1260        }
1261
1262        Ok(id)
1263    }
1264
1265    /// Update using an already-locked connection (for transactions).
1266    pub fn update_with_conn(
1267        &self,
1268        conn: &Connection,
1269        entity: &str,
1270        id: &str,
1271        data: &serde_json::Value,
1272    ) -> Result<bool, RuntimeError> {
1273        let ent = self.require_entity(entity)?;
1274        let obj = data.as_object().ok_or_else(|| RuntimeError {
1275            code: "INVALID_DATA".into(),
1276            message: "Update data must be a JSON object".into(),
1277        })?;
1278
1279        let mut set_clauses = Vec::new();
1280        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1281        let mut idx = 1;
1282        for (key, val) in obj {
1283            if key == "id" {
1284                continue;
1285            }
1286            validate_column_name(key, ent)?;
1287            set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1288            values.push(json_to_sql(val));
1289            idx += 1;
1290        }
1291        if set_clauses.is_empty() {
1292            return Ok(false);
1293        }
1294
1295        // Capture the pre-UPDATE row if we need to diff facet values.
1296        // Read happens before the UPDATE so apply_update sees the OLD
1297        // state of any facet field. Cheap — single-row lookup on the
1298        // `id` primary-key index.
1299        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1300        let old_row = if searchable {
1301            self.get_by_id_with_conn(conn, entity, id)?
1302        } else {
1303            None
1304        };
1305
1306        values.push(Box::new(id.to_string()));
1307        let sql = format!(
1308            "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1309            quote_ident(entity),
1310            set_clauses.join(", ")
1311        );
1312        let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1313        let affected = conn
1314            .execute(&sql, params.as_slice())
1315            .map_err(|e| RuntimeError {
1316                code: "UPDATE_FAILED".into(),
1317                message: format!("Update {entity}/{id} failed: {e}"),
1318            })?;
1319
1320        if affected > 0 && searchable {
1321            if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1322                pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1323                    .map_err(|e| RuntimeError {
1324                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1325                        message: format!("search index update on update {entity}: {e}"),
1326                    })?;
1327            }
1328        }
1329
1330        Ok(affected > 0)
1331    }
1332
1333    /// Delete using an already-locked connection (for transactions).
1334    pub fn delete_with_conn(
1335        &self,
1336        conn: &Connection,
1337        entity: &str,
1338        id: &str,
1339    ) -> Result<bool, RuntimeError> {
1340        let ent = self.require_entity(entity)?;
1341
1342        // Apply search maintenance BEFORE the DELETE so we still have
1343        // the row's facet values to diff against.
1344        let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1345        if searchable {
1346            if let (Some(cfg), Ok(Some(row))) = (
1347                ent.search.as_ref(),
1348                self.get_by_id_with_conn(conn, entity, id),
1349            ) {
1350                pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1351                    .map_err(|e| RuntimeError {
1352                        code: "SEARCH_MAINTENANCE_FAILED".into(),
1353                        message: format!("search index update on delete {entity}: {e}"),
1354                    })?;
1355            }
1356        }
1357
1358        let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1359        let affected = conn
1360            .execute(&sql, rusqlite::params![id])
1361            .map_err(|e| RuntimeError {
1362                code: "DELETE_FAILED".into(),
1363                message: format!("Delete {entity}/{id} failed: {e}"),
1364            })?;
1365        Ok(affected > 0)
1366    }
1367
1368    /// Read a row by id using a pre-held connection (for transactions).
1369    pub fn get_by_id_with_conn(
1370        &self,
1371        conn: &Connection,
1372        entity: &str,
1373        id: &str,
1374    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1375        let ent = self.require_entity(entity)?;
1376        let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1377        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1378            code: "QUERY_FAILED".into(),
1379            message: format!("Failed to prepare query: {e}"),
1380        })?;
1381        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1382        Ok(stmt
1383            .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1384            .ok())
1385    }
1386
1387    /// List rows using a pre-held connection (for transactions).
1388    pub fn list_with_conn(
1389        &self,
1390        conn: &Connection,
1391        entity: &str,
1392    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1393        let ent = self.require_entity(entity)?;
1394        let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1395        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1396            code: "QUERY_FAILED".into(),
1397            message: format!("Failed to prepare query: {e}"),
1398        })?;
1399        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1400        let rows = stmt
1401            .query_map([], |row| Ok(row_to_json(row, &columns)))
1402            .map_err(|e| RuntimeError {
1403                code: "QUERY_FAILED".into(),
1404                message: format!("Query failed: {e}"),
1405            })?;
1406        Ok(rows.flatten().collect())
1407    }
1408
1409    /// List after cursor using a pre-held connection (for transactions).
1410    pub fn list_after_with_conn(
1411        &self,
1412        conn: &Connection,
1413        entity: &str,
1414        after: Option<&str>,
1415        limit: usize,
1416    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1417        let ent = self.require_entity(entity)?;
1418        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1419        let table = quote_ident(entity);
1420        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1421            Some(cursor) => (
1422                format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1423                vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1424            ),
1425            None => (
1426                format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1427                vec![Box::new(limit as i64)],
1428            ),
1429        };
1430        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1431            params.iter().map(|v| v.as_ref()).collect();
1432        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1433            code: "QUERY_FAILED".into(),
1434            message: format!("Failed to prepare: {e}"),
1435        })?;
1436        let rows = stmt
1437            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1438            .map_err(|e| RuntimeError {
1439                code: "QUERY_FAILED".into(),
1440                message: format!("Query failed: {e}"),
1441            })?;
1442        Ok(rows.flatten().collect())
1443    }
1444
1445    /// Lookup by field using a pre-held connection (for transactions).
1446    pub fn lookup_with_conn(
1447        &self,
1448        conn: &Connection,
1449        entity: &str,
1450        field: &str,
1451        value: &str,
1452    ) -> Result<Option<serde_json::Value>, RuntimeError> {
1453        let ent = self.require_entity(entity)?;
1454        validate_column_name(field, ent)?;
1455        let sql = format!(
1456            "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1457            quote_ident(entity),
1458            quote_ident(field)
1459        );
1460        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1461        Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1462            stmt.query_row(rusqlite::params![value], |row| {
1463                Ok(row_to_json(row, &columns))
1464            })
1465            .ok()
1466        }))
1467    }
1468
1469    /// Link relation using a pre-held connection (for transactions).
1470    pub fn link_with_conn(
1471        &self,
1472        conn: &Connection,
1473        entity: &str,
1474        id: &str,
1475        relation: &str,
1476        target_id: &str,
1477    ) -> Result<bool, RuntimeError> {
1478        let ent = self.require_entity(entity)?;
1479        let rel = ent
1480            .relations
1481            .iter()
1482            .find(|r| r.name == relation)
1483            .ok_or_else(|| RuntimeError {
1484                code: "RELATION_NOT_FOUND".into(),
1485                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1486            })?;
1487        let data = serde_json::json!({ rel.field.clone(): target_id });
1488        self.update_with_conn(conn, entity, id, &data)
1489    }
1490
1491    /// Unlink relation using a pre-held connection (for transactions).
1492    pub fn unlink_with_conn(
1493        &self,
1494        conn: &Connection,
1495        entity: &str,
1496        id: &str,
1497        relation: &str,
1498    ) -> Result<bool, RuntimeError> {
1499        let ent = self.require_entity(entity)?;
1500        let rel = ent
1501            .relations
1502            .iter()
1503            .find(|r| r.name == relation)
1504            .ok_or_else(|| RuntimeError {
1505                code: "RELATION_NOT_FOUND".into(),
1506                message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1507            })?;
1508        let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1509        self.update_with_conn(conn, entity, id, &data)
1510    }
1511
1512    /// Query with filters using a pre-held connection (for transactions).
1513    ///
1514    /// Shares the filter-building logic with [`query_filtered`] by executing
1515    /// against the provided connection rather than acquiring one.
1516    pub fn query_filtered_with_conn(
1517        &self,
1518        conn: &Connection,
1519        entity: &str,
1520        filter: &serde_json::Value,
1521    ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1522        let ent = self.require_entity(entity)?;
1523        let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1524        let empty = serde_json::Map::new();
1525        let obj = filter.as_object().unwrap_or(&empty);
1526
1527        let mut where_clauses = Vec::new();
1528        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1529        let mut order_clause = String::new();
1530        let mut limit_clause = String::new();
1531        let mut idx = 1;
1532
1533        for (key, val) in obj {
1534            match key.as_str() {
1535                "$order" => {
1536                    if let Some(o) = val.as_object() {
1537                        let mut parts: Vec<String> = Vec::new();
1538                        for (col, dir) in o {
1539                            validate_column_name(col, ent)?;
1540                            let d = match dir.as_str().unwrap_or("asc") {
1541                                "desc" | "DESC" => "DESC",
1542                                _ => "ASC",
1543                            };
1544                            parts.push(format!("{} {d}", quote_ident(col)));
1545                        }
1546                        if !parts.is_empty() {
1547                            order_clause = format!(" ORDER BY {}", parts.join(", "));
1548                        }
1549                    }
1550                }
1551                "$limit" => {
1552                    if let Some(n) = val.as_u64() {
1553                        limit_clause = format!(" LIMIT {n}");
1554                    }
1555                }
1556                "$offset" => {
1557                    if let Some(n) = val.as_u64() {
1558                        if limit_clause.is_empty() {
1559                            limit_clause = " LIMIT -1".into();
1560                        }
1561                        limit_clause = format!("{limit_clause} OFFSET {n}");
1562                    }
1563                }
1564                _ => {
1565                    validate_column_name(key, ent)?;
1566                    let qk = quote_ident(key);
1567                    if let Some(op_obj) = val.as_object() {
1568                        for (op, op_val) in op_obj {
1569                            match op.as_str() {
1570                                "$not" => {
1571                                    where_clauses.push(format!("{qk} != ?{idx}"));
1572                                    values.push(json_to_sql(op_val));
1573                                    idx += 1;
1574                                }
1575                                "$gt" => {
1576                                    where_clauses.push(format!("{qk} > ?{idx}"));
1577                                    values.push(json_to_sql(op_val));
1578                                    idx += 1;
1579                                }
1580                                "$gte" => {
1581                                    where_clauses.push(format!("{qk} >= ?{idx}"));
1582                                    values.push(json_to_sql(op_val));
1583                                    idx += 1;
1584                                }
1585                                "$lt" => {
1586                                    where_clauses.push(format!("{qk} < ?{idx}"));
1587                                    values.push(json_to_sql(op_val));
1588                                    idx += 1;
1589                                }
1590                                "$lte" => {
1591                                    where_clauses.push(format!("{qk} <= ?{idx}"));
1592                                    values.push(json_to_sql(op_val));
1593                                    idx += 1;
1594                                }
1595                                "$like" => {
1596                                    where_clauses.push(format!("{qk} LIKE ?{idx}"));
1597                                    let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1598                                    values.push(Box::new(p));
1599                                    idx += 1;
1600                                }
1601                                "$in" => {
1602                                    if let Some(arr) = op_val.as_array() {
1603                                        let ph: Vec<String> = arr
1604                                            .iter()
1605                                            .map(|v| {
1606                                                let p = format!("?{idx}");
1607                                                values.push(json_to_sql(v));
1608                                                idx += 1;
1609                                                p
1610                                            })
1611                                            .collect();
1612                                        if !ph.is_empty() {
1613                                            where_clauses
1614                                                .push(format!("{qk} IN ({})", ph.join(", ")));
1615                                        }
1616                                    }
1617                                }
1618                                _ => {}
1619                            }
1620                        }
1621                    } else {
1622                        where_clauses.push(format!("{qk} = ?{idx}"));
1623                        values.push(json_to_sql(val));
1624                        idx += 1;
1625                    }
1626                }
1627            }
1628        }
1629
1630        let where_sql = if where_clauses.is_empty() {
1631            String::new()
1632        } else {
1633            format!(" WHERE {}", where_clauses.join(" AND "))
1634        };
1635        if order_clause.is_empty() {
1636            order_clause = " ORDER BY \"id\"".into();
1637        }
1638
1639        let sql = format!(
1640            "SELECT * FROM {}{}{}{}",
1641            quote_ident(entity),
1642            where_sql,
1643            order_clause,
1644            limit_clause
1645        );
1646        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1647            values.iter().map(|v| v.as_ref()).collect();
1648        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1649            code: "QUERY_FAILED".into(),
1650            message: format!("Failed to prepare: {e}"),
1651        })?;
1652        let rows = stmt
1653            .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1654            .map_err(|e| RuntimeError {
1655                code: "QUERY_FAILED".into(),
1656                message: format!("Query failed: {e}"),
1657            })?;
1658        Ok(rows.flatten().collect())
1659    }
1660
1661    /// Graph query using a pre-held connection (for transactions).
1662    pub fn query_graph_with_conn(
1663        &self,
1664        conn: &Connection,
1665        query: &serde_json::Value,
1666    ) -> Result<serde_json::Value, RuntimeError> {
1667        let obj = query.as_object().ok_or_else(|| RuntimeError {
1668            code: "INVALID_QUERY".into(),
1669            message: "Graph query must be a JSON object".into(),
1670        })?;
1671        let mut results = serde_json::Map::new();
1672        for (entity_name, query_opts) in obj {
1673            let _ent = self.require_entity(entity_name)?;
1674            let filter = query_opts
1675                .get("where")
1676                .cloned()
1677                .unwrap_or(serde_json::json!({}));
1678            let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
1679            results.insert(entity_name.clone(), serde_json::json!(rows));
1680        }
1681        Ok(serde_json::Value::Object(results))
1682    }
1683
1684    // -----------------------------------------------------------------------
1685    // Aggregations — count, sum, avg, min, max, group by
1686    // -----------------------------------------------------------------------
1687
1688    /// Run an aggregation query. See [`pylon_http::DataStore::aggregate`]
1689    /// for the spec shape.
1690    pub fn aggregate(
1691        &self,
1692        entity: &str,
1693        spec: &serde_json::Value,
1694    ) -> Result<serde_json::Value, RuntimeError> {
1695        let ent = self.require_entity(entity)?;
1696        let conn = self.lock_read_conn()?;
1697        let obj = spec.as_object().ok_or_else(|| RuntimeError {
1698            code: "INVALID_QUERY".into(),
1699            message: "aggregate spec must be an object".into(),
1700        })?;
1701
1702        // Build the SELECT list.
1703        let mut select_parts: Vec<String> = Vec::new();
1704        let mut result_fields: Vec<String> = Vec::new();
1705
1706        if let Some(count) = obj.get("count") {
1707            match count {
1708                serde_json::Value::String(s) if s == "*" => {
1709                    select_parts.push("COUNT(*) AS count".into());
1710                    result_fields.push("count".into());
1711                }
1712                serde_json::Value::String(field) => {
1713                    validate_column_name(field, ent)?;
1714                    let alias = format!("count_{field}");
1715                    select_parts.push(format!(
1716                        "COUNT({}) AS {}",
1717                        quote_ident(field),
1718                        quote_ident(&alias)
1719                    ));
1720                    result_fields.push(alias);
1721                }
1722                _ => {}
1723            }
1724        }
1725
1726        for (fn_name, alias_prefix) in [
1727            ("sum", "sum_"),
1728            ("avg", "avg_"),
1729            ("min", "min_"),
1730            ("max", "max_"),
1731        ] {
1732            if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1733                for field in fields {
1734                    if let Some(f) = field.as_str() {
1735                        validate_column_name(f, ent)?;
1736                        let alias = format!("{alias_prefix}{f}");
1737                        let sql_fn = fn_name.to_uppercase();
1738                        select_parts.push(format!(
1739                            "{}({}) AS {}",
1740                            sql_fn,
1741                            quote_ident(f),
1742                            quote_ident(&alias)
1743                        ));
1744                        result_fields.push(alias);
1745                    }
1746                }
1747            }
1748        }
1749
1750        // countDistinct — separate handler because COUNT(DISTINCT) is a
1751        // distinct SQL form from COUNT(field). Lets dashboards ask "how
1752        // many unique customers placed orders this month" without a
1753        // client-side post-processing pass.
1754        if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1755            for field in fields {
1756                if let Some(f) = field.as_str() {
1757                    validate_column_name(f, ent)?;
1758                    let alias = format!("count_distinct_{f}");
1759                    select_parts.push(format!(
1760                        "COUNT(DISTINCT {}) AS {}",
1761                        quote_ident(f),
1762                        quote_ident(&alias)
1763                    ));
1764                    result_fields.push(alias);
1765                }
1766            }
1767        }
1768
1769        // Group-by fields come first in the SELECT so each row is identifiable.
1770        // Each entry is either a plain column name (string) or a date-bucket
1771        // spec — `{ field: "createdAt", bucket: "day" }`. Buckets map to
1772        // SQLite strftime patterns so aggregation keys collapse to the
1773        // bucket boundary (hour / day / week / month / year).
1774        let mut group_by: Vec<String> = Vec::new();
1775        let mut group_select: Vec<String> = Vec::new();
1776        let mut group_field_names: Vec<String> = Vec::new();
1777        if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1778            for g in groups {
1779                if let Some(f) = g.as_str() {
1780                    validate_column_name(f, ent)?;
1781                    let quoted = quote_ident(f);
1782                    group_by.push(quoted.clone());
1783                    group_select.push(quoted);
1784                    group_field_names.push(f.to_string());
1785                } else if let Some(spec) = g.as_object() {
1786                    let field =
1787                        spec.get("field")
1788                            .and_then(|v| v.as_str())
1789                            .ok_or_else(|| RuntimeError {
1790                                code: "INVALID_QUERY".into(),
1791                                message: "groupBy object spec requires `field`".into(),
1792                            })?;
1793                    validate_column_name(field, ent)?;
1794                    let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1795                    let fmt = match bucket {
1796                        "hour" => "%Y-%m-%d %H:00:00",
1797                        "day" => "%Y-%m-%d",
1798                        "month" => "%Y-%m",
1799                        "year" => "%Y",
1800                        "week" => "%Y-W%W",
1801                        _ => {
1802                            return Err(RuntimeError {
1803                                code: "INVALID_QUERY".into(),
1804                                message: format!(
1805                                    "bucket must be one of hour/day/week/month/year, got {bucket}"
1806                                ),
1807                            });
1808                        }
1809                    };
1810                    let alias = format!("{field}_{bucket}");
1811                    let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
1812                    group_by.push(expr.clone());
1813                    group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1814                    group_field_names.push(alias);
1815                }
1816            }
1817        }
1818        let mut full_select = group_select.clone();
1819        full_select.extend(select_parts.iter().cloned());
1820        if full_select.is_empty() {
1821            return Err(RuntimeError {
1822                code: "INVALID_QUERY".into(),
1823                message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1824            });
1825        }
1826
1827        // WHERE clause (reuse filter syntax, but only simple equality for now).
1828        let mut where_clauses = Vec::new();
1829        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1830        let mut idx = 1;
1831        if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
1832            for (k, v) in where_obj {
1833                validate_column_name(k, ent)?;
1834                where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
1835                values.push(json_to_sql(v));
1836                idx += 1;
1837            }
1838        }
1839        let where_sql = if where_clauses.is_empty() {
1840            String::new()
1841        } else {
1842            format!(" WHERE {}", where_clauses.join(" AND "))
1843        };
1844
1845        let group_sql = if group_by.is_empty() {
1846            String::new()
1847        } else {
1848            format!(" GROUP BY {}", group_by.join(", "))
1849        };
1850
1851        let sql = format!(
1852            "SELECT {} FROM {}{}{}",
1853            full_select.join(", "),
1854            quote_ident(entity),
1855            where_sql,
1856            group_sql
1857        );
1858
1859        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1860            values.iter().map(|v| v.as_ref()).collect();
1861        let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1862            code: "QUERY_FAILED".into(),
1863            message: format!("Failed to prepare aggregate: {e}"),
1864        })?;
1865
1866        let column_names: Vec<String> = {
1867            let mut v = group_field_names.clone();
1868            v.extend(result_fields.iter().cloned());
1869            v
1870        };
1871
1872        let rows = stmt
1873            .query_map(param_refs.as_slice(), |row| {
1874                let mut obj = serde_json::Map::new();
1875                for (i, name) in column_names.iter().enumerate() {
1876                    // Try int first (counts/sums), then float, then string, then null.
1877                    if let Ok(n) = row.get::<_, i64>(i) {
1878                        obj.insert(name.clone(), serde_json::Value::Number(n.into()));
1879                    } else if let Ok(f) = row.get::<_, f64>(i) {
1880                        if let Some(num) = serde_json::Number::from_f64(f) {
1881                            obj.insert(name.clone(), serde_json::Value::Number(num));
1882                        } else {
1883                            obj.insert(name.clone(), serde_json::Value::Null);
1884                        }
1885                    } else if let Ok(s) = row.get::<_, String>(i) {
1886                        obj.insert(name.clone(), serde_json::Value::String(s));
1887                    } else {
1888                        obj.insert(name.clone(), serde_json::Value::Null);
1889                    }
1890                }
1891                Ok(serde_json::Value::Object(obj))
1892            })
1893            .map_err(|e| RuntimeError {
1894                code: "QUERY_FAILED".into(),
1895                message: format!("Aggregate failed: {e}"),
1896            })?;
1897
1898        let mut result = Vec::new();
1899        for row in rows {
1900            if let Ok(val) = row {
1901                result.push(val);
1902            }
1903        }
1904        Ok(serde_json::json!({ "rows": result }))
1905    }
1906
1907    // -----------------------------------------------------------------------
1908    // Helpers
1909    // -----------------------------------------------------------------------
1910
1911    fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
1912        self.entities.get(name).ok_or_else(|| RuntimeError {
1913            code: "ENTITY_NOT_FOUND".into(),
1914            message: format!("Unknown entity: \"{name}\""),
1915        })
1916    }
1917
1918    /// Acquire the write connection. Used for INSERT, UPDATE, DELETE.
1919    fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
1920        self.write_conn.lock().map_err(|e| RuntimeError {
1921            code: "LOCK_FAILED".into(),
1922            message: format!("Failed to acquire write connection lock: {e}"),
1923        })
1924    }
1925
1926    /// Acquire a read connection. Uses the read pool if available (file-backed
1927    /// databases), otherwise falls back to the write connection (in-memory).
1928    /// Connections are selected round-robin to spread load evenly.
1929    fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
1930        if !self.read_pool.is_empty() {
1931            let idx = self.read_counter.fetch_add(1, Ordering::Relaxed) % self.read_pool.len();
1932            let guard = self.read_pool[idx].lock().map_err(|e| RuntimeError {
1933                code: "LOCK_FAILED".into(),
1934                message: format!("Failed to acquire read connection: {e}"),
1935            })?;
1936            Ok(ReadConnGuard::Pooled(guard))
1937        } else {
1938            // Fall back to write connection for in-memory DBs.
1939            let guard = self.write_conn.lock().map_err(|e| RuntimeError {
1940                code: "LOCK_FAILED".into(),
1941                message: format!("Failed to acquire connection: {e}"),
1942            })?;
1943            Ok(ReadConnGuard::Write(guard))
1944        }
1945    }
1946}
1947
1948// ---------------------------------------------------------------------------
1949// Helpers
1950// ---------------------------------------------------------------------------
1951
1952/// Generate a lex-sortable, monotonic-ish unique ID.
1953///
1954/// Same shape as `pylon_storage::postgres::generate_id` — fixed-width hex
1955/// of nanoseconds + 8-hex per-process counter (40 chars total). The fixed
1956/// width is what makes `WHERE id > $1 ORDER BY id` correct for cursor
1957/// pagination: variable-width hex sorts incorrectly at width boundaries
1958/// (e.g. `"ff"` lex-sorts after `"100"`).
1959/// Run `body` inside a SQLite transaction on `conn`. Commits on `Ok`,
1960/// rolls back on `Err` (or if `body` panics).
1961///
1962/// Used to make the multi-statement CRDT write paths (LoroDoc snapshot
1963/// upsert into `_pylon_crdt_snapshots` + the materialized entity row
1964/// INSERT/UPDATE + FTS / facet maintenance) atomic so a crash mid-write
1965/// can never leave the materialized view stale relative to the CRDT
1966/// snapshot. Uses unmanaged BEGIN/COMMIT/ROLLBACK rather than rusqlite's
1967/// `Transaction` API because the existing call sites borrow `conn`
1968/// through inner closures and the lifetime juggling for a `Transaction`
1969/// guard would force more refactoring than the explicit BEGIN/COMMIT.
1970///
1971/// `BEGIN IMMEDIATE` (vs the default `BEGIN DEFERRED`) takes the SQLite
1972/// reserved lock on entry instead of escalating later — matches the
1973/// pattern in `datastore.rs::transact` and avoids a SQLITE_BUSY race
1974/// where a concurrent reader prevents the lock upgrade mid-write.
1975fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
1976where
1977    F: FnOnce() -> Result<T, RuntimeError>,
1978{
1979    conn.execute("BEGIN IMMEDIATE", [])
1980        .map_err(|e| RuntimeError {
1981            code: "TX_BEGIN_FAILED".into(),
1982            message: format!("BEGIN: {e}"),
1983        })?;
1984    match body() {
1985        Ok(v) => {
1986            conn.execute("COMMIT", []).map_err(|e| RuntimeError {
1987                code: "TX_COMMIT_FAILED".into(),
1988                message: format!("COMMIT: {e}"),
1989            })?;
1990            Ok(v)
1991        }
1992        Err(e) => {
1993            // Best-effort rollback; if even ROLLBACK fails we surface
1994            // the *original* error since that's the more actionable one.
1995            let _ = conn.execute("ROLLBACK", []);
1996            Err(e)
1997        }
1998    }
1999}
2000
2001fn generate_id() -> String {
2002    use std::sync::atomic::{AtomicU32, Ordering};
2003    use std::time::{SystemTime, UNIX_EPOCH};
2004    static COUNTER: AtomicU32 = AtomicU32::new(0);
2005    let nanos = SystemTime::now()
2006        .duration_since(UNIX_EPOCH)
2007        .unwrap_or_default()
2008        .as_nanos();
2009    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2010    format!("{nanos:032x}{seq:08x}")
2011}
2012
2013/// Convert a `serde_json::Value` to a boxed `ToSql` for rusqlite.
2014fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2015    match val {
2016        serde_json::Value::Null => Box::new(rusqlite::types::Null),
2017        serde_json::Value::Bool(b) => Box::new(*b as i32),
2018        serde_json::Value::Number(n) => {
2019            if let Some(i) = n.as_i64() {
2020                Box::new(i)
2021            } else if let Some(f) = n.as_f64() {
2022                Box::new(f)
2023            } else {
2024                Box::new(n.to_string())
2025            }
2026        }
2027        serde_json::Value::String(s) => Box::new(s.clone()),
2028        other => Box::new(other.to_string()),
2029    }
2030}
2031
2032/// Convert a rusqlite row to a JSON value.
2033///
2034/// Reads columns by NAME (via the row's actual column metadata) rather
2035/// than by positional index. The previous implementation assumed the
2036/// SQLite table column order matched the manifest field order, which
2037/// silently breaks when a new field is inserted in the middle of the
2038/// manifest: SQLite's `ALTER TABLE ADD COLUMN` always appends to the
2039/// end of the table, so existing data lands in the wrong field on
2040/// every read.
2041///
2042/// `field_names` is still passed (unused in the body, kept for API
2043/// stability with callers that compute it from the manifest) — the
2044/// name set comes from the row itself now, which always matches the
2045/// SELECT's actual column shape.
2046fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2047    let mut obj = serde_json::Map::new();
2048
2049    let stmt = row.as_ref();
2050    let count = stmt.column_count();
2051    for i in 0..count {
2052        // Column names are short string slices into the prepared
2053        // statement; copy out into owned Strings before inserting into
2054        // the map (the slice borrow can't outlive the row).
2055        let name = match stmt.column_name(i) {
2056            Ok(n) => n.to_string(),
2057            Err(_) => continue,
2058        };
2059        let value = if let Ok(s) = row.get::<_, String>(i) {
2060            serde_json::Value::String(s)
2061        } else if let Ok(n) = row.get::<_, i64>(i) {
2062            serde_json::Value::Number(serde_json::Number::from(n))
2063        } else if let Ok(f) = row.get::<_, f64>(i) {
2064            serde_json::Number::from_f64(f)
2065                .map(serde_json::Value::Number)
2066                .unwrap_or(serde_json::Value::Null)
2067        } else {
2068            serde_json::Value::Null
2069        };
2070        obj.insert(name, value);
2071    }
2072
2073    serde_json::Value::Object(obj)
2074}
2075
2076#[cfg(test)]
2077mod tests {
2078    use super::*;
2079    use pylon_kernel::{ManifestField, ManifestIndex};
2080
2081    fn test_manifest() -> AppManifest {
2082        AppManifest {
2083            manifest_version: 1,
2084            name: "Test".into(),
2085            version: "0.1.0".into(),
2086            entities: vec![pylon_kernel::ManifestEntity {
2087                name: "User".into(),
2088                fields: vec![
2089                    ManifestField {
2090                        name: "email".into(),
2091                        field_type: "string".into(),
2092                        optional: false,
2093                        unique: true,
2094                        crdt: None,
2095                    },
2096                    ManifestField {
2097                        name: "displayName".into(),
2098                        field_type: "string".into(),
2099                        optional: false,
2100                        unique: false,
2101                        crdt: None,
2102                    },
2103                ],
2104                indexes: vec![ManifestIndex {
2105                    name: "user_email".into(),
2106                    fields: vec!["email".into()],
2107                    unique: true,
2108                }],
2109                relations: vec![],
2110                search: None,
2111                crdt: true,
2112            }],
2113            routes: vec![],
2114            queries: vec![],
2115            actions: vec![],
2116            policies: vec![],
2117        }
2118    }
2119
2120    #[test]
2121    fn reset_for_tests_wipes_in_memory() {
2122        let rt = Runtime::in_memory(test_manifest()).unwrap();
2123        rt.insert(
2124            "User",
2125            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2126        )
2127        .unwrap();
2128        assert_eq!(rt.list("User").unwrap().len(), 1);
2129        rt.reset_for_tests().unwrap();
2130        assert_eq!(rt.list("User").unwrap().len(), 0);
2131    }
2132
2133    #[test]
2134    fn reset_for_tests_refuses_file_db() {
2135        let dir = std::env::temp_dir().join("pylon-reset-refuse");
2136        let _ = std::fs::create_dir_all(&dir);
2137        let db_path = dir.join("db.sqlite");
2138        let _ = std::fs::remove_file(&db_path);
2139        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2140        let err = rt.reset_for_tests().unwrap_err();
2141        assert_eq!(err.code, "RESET_REFUSED");
2142        let _ = std::fs::remove_file(&db_path);
2143    }
2144
2145    #[test]
2146    fn insert_and_get() {
2147        let rt = Runtime::in_memory(test_manifest()).unwrap();
2148        let id = rt
2149            .insert(
2150                "User",
2151                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2152            )
2153            .unwrap();
2154        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2155        assert_eq!(row["email"], "a@b.com");
2156    }
2157
2158    /// Regression: when a new field is added in the middle of a manifest,
2159    /// SQLite ALTER TABLE ADD COLUMN appends it to the end of the table.
2160    /// The previous `row_to_json` read columns by positional index in
2161    /// manifest order, so existing data shifted into the wrong fields
2162    /// on every read (createdAt's value showed up as the new field's,
2163    /// and vice versa). row_to_json now reads by column name from the
2164    /// row's own metadata, so the bug can't recur regardless of
2165    /// migration order.
2166    #[test]
2167    fn row_to_json_handles_columns_added_out_of_manifest_order() {
2168        // Manifest: id, email, displayName, avatarColor, createdAt
2169        let mut manifest = test_manifest();
2170        manifest.entities[0].fields = vec![
2171            ManifestField {
2172                name: "email".into(),
2173                field_type: "string".into(),
2174                optional: false,
2175                unique: true,
2176                crdt: None,
2177            },
2178            ManifestField {
2179                name: "displayName".into(),
2180                field_type: "string".into(),
2181                optional: false,
2182                unique: false,
2183                crdt: None,
2184            },
2185            ManifestField {
2186                name: "avatarColor".into(),
2187                field_type: "string".into(),
2188                optional: true,
2189                unique: false,
2190                crdt: None,
2191            },
2192            ManifestField {
2193                name: "createdAt".into(),
2194                field_type: "datetime".into(),
2195                optional: true,
2196                unique: false,
2197                crdt: None,
2198            },
2199        ];
2200        // Important: turn off CRDT mode for this test — CRDT mode writes
2201        // the projection back to SQLite explicitly per-field, so it
2202        // wouldn't exercise the column-order bug we're regressing
2203        // against. The bug bites the legacy path that still does
2204        // `INSERT (id, email, displayName, ...) VALUES (...)` and then
2205        // `SELECT * ... → row_to_json` to read it back.
2206        manifest.entities[0].crdt = false;
2207        let rt = Runtime::in_memory(manifest).unwrap();
2208        let id = rt
2209            .insert(
2210                "User",
2211                &serde_json::json!({
2212                    "email": "a@b.com",
2213                    "displayName": "Alice",
2214                    "avatarColor": "#abc",
2215                    "createdAt": "2026-01-01T00:00:00Z",
2216                }),
2217            )
2218            .unwrap();
2219
2220        // Simulate an ALTER TABLE ADD COLUMN that appends a new field
2221        // at the end of the SQLite table even though the manifest
2222        // places it in the middle. This is the exact shape of what
2223        // happens when a user adds a new field between existing ones
2224        // and pylon dev migrates the table forward.
2225        {
2226            let conn = rt.lock_write_conn().unwrap();
2227            conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2228                .unwrap();
2229            conn.execute(
2230                "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2231                rusqlite::params!["hashed-password", &id],
2232            )
2233            .unwrap();
2234        }
2235        // Update the in-memory manifest to reflect the new field
2236        // sitting between avatarColor and createdAt — this is what the
2237        // regenerated manifest would look like.
2238        // (We mutate via the storage path to mirror the actual flow.)
2239
2240        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2241        // The crucial assertions: each column maps to its own value,
2242        // not the value of whichever column happens to share its
2243        // SQLite position.
2244        assert_eq!(row["email"], "a@b.com");
2245        assert_eq!(row["displayName"], "Alice");
2246        assert_eq!(row["avatarColor"], "#abc");
2247        assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2248        assert_eq!(row["passwordHash"], "hashed-password");
2249    }
2250
2251    /// CRDT-mode entities (the default) populate the sidecar snapshot
2252    /// table on every write — the LoroDoc is the source of truth, the
2253    /// SQLite row is the materialized projection. This proves the CRDT
2254    /// branch in `insert` actually fires.
2255    #[test]
2256    fn crdt_default_writes_through_loro_store() {
2257        let rt = Runtime::in_memory(test_manifest()).unwrap();
2258        let id = rt
2259            .insert(
2260                "User",
2261                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2262            )
2263            .unwrap();
2264
2265        // Sidecar contains exactly one snapshot for the new row.
2266        let conn = rt.lock_write_conn().unwrap();
2267        let snap_count: i64 = conn
2268            .query_row(
2269                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2270                 WHERE entity = ?1 AND row_id = ?2",
2271                rusqlite::params!["User", &id],
2272                |r| r.get(0),
2273            )
2274            .unwrap();
2275        assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2276
2277        // Loro doc is cached in memory after the write — proves
2278        // get_or_hydrate ran during apply_patch.
2279        assert!(rt.crdt_store().cached_rows() >= 1);
2280
2281        // SQLite materialized view has the projected row.
2282        drop(conn);
2283        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2284        assert_eq!(row["email"], "x@y.com");
2285        assert_eq!(row["displayName"], "Eric");
2286    }
2287
2288    /// Updates write through the LoroDoc as well — verifies the sidecar
2289    /// snapshot grows (Loro tracks new ops) and the materialized row
2290    /// reflects the new value.
2291    #[test]
2292    fn crdt_update_persists_new_snapshot() {
2293        let rt = Runtime::in_memory(test_manifest()).unwrap();
2294        let id = rt
2295            .insert(
2296                "User",
2297                &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2298            )
2299            .unwrap();
2300
2301        let snap_after_insert: Vec<u8> = {
2302            let conn = rt.lock_write_conn().unwrap();
2303            conn.query_row(
2304                "SELECT snapshot FROM _pylon_crdt_snapshots
2305                 WHERE entity = 'User' AND row_id = ?1",
2306                rusqlite::params![&id],
2307                |r| r.get(0),
2308            )
2309            .unwrap()
2310        };
2311
2312        rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2313            .unwrap();
2314
2315        let snap_after_update: Vec<u8> = {
2316            let conn = rt.lock_write_conn().unwrap();
2317            conn.query_row(
2318                "SELECT snapshot FROM _pylon_crdt_snapshots
2319                 WHERE entity = 'User' AND row_id = ?1",
2320                rusqlite::params![&id],
2321                |r| r.get(0),
2322            )
2323            .unwrap()
2324        };
2325
2326        assert_ne!(
2327            snap_after_insert, snap_after_update,
2328            "snapshot bytes should change after an update"
2329        );
2330
2331        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2332        assert_eq!(row["displayName"], "Eric C");
2333        assert_eq!(row["email"], "x@y.com");
2334    }
2335
2336    /// Regression: when the SQL INSERT step inside Runtime::insert fails
2337    /// (UNIQUE-constraint violation here), the LoroDoc snapshot must
2338    /// also roll back — neither half lands. Previously the LoroStore
2339    /// wrote first and committed independently, so a doomed INSERT left
2340    /// a sidecar row pointing at a doc that the materialized table
2341    /// never knew about.
2342    #[test]
2343    fn crdt_insert_rolls_back_when_sql_step_fails() {
2344        let rt = Runtime::in_memory(test_manifest()).unwrap();
2345        // Seed a row.
2346        rt.insert(
2347            "User",
2348            &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2349        )
2350        .unwrap();
2351
2352        // Snapshot the sidecar row count BEFORE the failing insert.
2353        let snap_count_before: i64 = {
2354            let conn = rt.lock_write_conn().unwrap();
2355            conn.query_row(
2356                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2357                [],
2358                |r| r.get(0),
2359            )
2360            .unwrap()
2361        };
2362
2363        // Attempt a duplicate-email insert. SQL UNIQUE rejects.
2364        let err = rt
2365            .insert(
2366                "User",
2367                &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2368            )
2369            .expect_err("duplicate email must fail");
2370        assert_eq!(err.code, "INSERT_FAILED");
2371
2372        // Sidecar row count unchanged — the LoroDoc snapshot the CRDT
2373        // path wrote was rolled back along with the failed SQL INSERT.
2374        let snap_count_after: i64 = {
2375            let conn = rt.lock_write_conn().unwrap();
2376            conn.query_row(
2377                "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2378                [],
2379                |r| r.get(0),
2380            )
2381            .unwrap()
2382        };
2383        assert_eq!(
2384            snap_count_after, snap_count_before,
2385            "failed insert should not leave a sidecar snapshot behind"
2386        );
2387    }
2388
2389    /// Entities with `crdt: false` skip the LoroDoc entirely — no sidecar
2390    /// row, no Loro cache entry. Proves the opt-out actually opts out.
2391    #[test]
2392    fn crdt_false_skips_loro_store() {
2393        let mut manifest = test_manifest();
2394        // Flip the User entity to LWW-only mode.
2395        manifest.entities[0].crdt = false;
2396        let rt = Runtime::in_memory(manifest).unwrap();
2397
2398        let id = rt
2399            .insert(
2400                "User",
2401                &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2402            )
2403            .unwrap();
2404
2405        let conn = rt.lock_write_conn().unwrap();
2406        let snap_count: i64 = conn
2407            .query_row(
2408                "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2409                 WHERE entity = 'User' AND row_id = ?1",
2410                rusqlite::params![&id],
2411                |r| r.get(0),
2412            )
2413            .unwrap();
2414        assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2415        assert_eq!(
2416            rt.crdt_store().cached_rows(),
2417            0,
2418            "crdt:false should not warm the cache"
2419        );
2420
2421        // SQLite path still works — the row landed via the legacy
2422        // direct-write path.
2423        drop(conn);
2424        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2425        assert_eq!(row["email"], "lww@example.com");
2426    }
2427
2428    #[test]
2429    fn list_entities() {
2430        let rt = Runtime::in_memory(test_manifest()).unwrap();
2431        rt.insert(
2432            "User",
2433            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2434        )
2435        .unwrap();
2436        rt.insert(
2437            "User",
2438            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2439        )
2440        .unwrap();
2441        let rows = rt.list("User").unwrap();
2442        assert_eq!(rows.len(), 2);
2443    }
2444
2445    #[test]
2446    fn update_entity() {
2447        let rt = Runtime::in_memory(test_manifest()).unwrap();
2448        let id = rt
2449            .insert(
2450                "User",
2451                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2452            )
2453            .unwrap();
2454        let updated = rt
2455            .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2456            .unwrap();
2457        assert!(updated);
2458        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2459        assert_eq!(row["displayName"], "Updated");
2460    }
2461
2462    #[test]
2463    fn delete_entity() {
2464        let rt = Runtime::in_memory(test_manifest()).unwrap();
2465        let id = rt
2466            .insert(
2467                "User",
2468                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2469            )
2470            .unwrap();
2471        let deleted = rt.delete("User", &id).unwrap();
2472        assert!(deleted);
2473        assert!(rt.get_by_id("User", &id).unwrap().is_none());
2474    }
2475
2476    #[test]
2477    fn lookup_by_field() {
2478        let rt = Runtime::in_memory(test_manifest()).unwrap();
2479        rt.insert(
2480            "User",
2481            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2482        )
2483        .unwrap();
2484        let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2485        assert_eq!(row["displayName"], "A");
2486    }
2487
2488    #[test]
2489    fn unknown_entity_returns_error() {
2490        let rt = Runtime::in_memory(test_manifest()).unwrap();
2491        let err = rt.list("Nonexistent").unwrap_err();
2492        assert_eq!(err.code, "ENTITY_NOT_FOUND");
2493    }
2494
2495    #[test]
2496    fn insert_rejects_unknown_column() {
2497        let rt = Runtime::in_memory(test_manifest()).unwrap();
2498        let err = rt
2499            .insert(
2500                "User",
2501                &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2502            )
2503            .unwrap_err();
2504        assert_eq!(err.code, "INVALID_COLUMN");
2505    }
2506
2507    #[test]
2508    fn update_rejects_unknown_column() {
2509        let rt = Runtime::in_memory(test_manifest()).unwrap();
2510        let id = rt
2511            .insert(
2512                "User",
2513                &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2514            )
2515            .unwrap();
2516        let err = rt
2517            .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2518            .unwrap_err();
2519        assert_eq!(err.code, "INVALID_COLUMN");
2520    }
2521
2522    #[test]
2523    fn lookup_rejects_unknown_column() {
2524        let rt = Runtime::in_memory(test_manifest()).unwrap();
2525        let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2526        assert_eq!(err.code, "INVALID_COLUMN");
2527    }
2528
2529    #[test]
2530    fn query_filtered_rejects_unknown_column() {
2531        let rt = Runtime::in_memory(test_manifest()).unwrap();
2532        let err = rt
2533            .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2534            .unwrap_err();
2535        assert_eq!(err.code, "INVALID_COLUMN");
2536    }
2537
2538    #[test]
2539    fn query_filtered_rejects_unknown_order_column() {
2540        let rt = Runtime::in_memory(test_manifest()).unwrap();
2541        let err = rt
2542            .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2543            .unwrap_err();
2544        assert_eq!(err.code, "INVALID_COLUMN");
2545    }
2546
2547    #[test]
2548    fn query_filtered_sanitizes_order_direction() {
2549        let rt = Runtime::in_memory(test_manifest()).unwrap();
2550        rt.insert(
2551            "User",
2552            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2553        )
2554        .unwrap();
2555        // Even a malicious direction value should be normalized to ASC.
2556        let rows = rt
2557            .query_filtered(
2558                "User",
2559                &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2560            )
2561            .unwrap();
2562        assert_eq!(rows.len(), 1);
2563    }
2564
2565    #[test]
2566    fn in_memory_has_no_read_pool() {
2567        let rt = Runtime::in_memory(test_manifest()).unwrap();
2568        assert_eq!(rt.read_pool_size(), 0);
2569    }
2570
2571    #[test]
2572    fn open_creates_read_pool() {
2573        let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2574        std::fs::create_dir_all(&dir).unwrap();
2575        let db_path = dir.join("test_read_pool.db");
2576
2577        let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2578        assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
2579
2580        // Write then read through the pool.
2581        let id = rt
2582            .insert(
2583                "User",
2584                &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
2585            )
2586            .unwrap();
2587        let row = rt.get_by_id("User", &id).unwrap().unwrap();
2588        assert_eq!(row["email"], "pool@test.com");
2589
2590        // Clean up.
2591        let _ = std::fs::remove_dir_all(&dir);
2592    }
2593
2594    #[test]
2595    fn concurrent_reads_dont_block_on_write() {
2596        use std::sync::Arc;
2597
2598        let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
2599        std::fs::create_dir_all(&dir).unwrap();
2600        let db_path = dir.join("test_concurrent.db");
2601
2602        let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
2603
2604        // Seed some data so reads have something to return.
2605        rt.insert(
2606            "User",
2607            &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2608        )
2609        .unwrap();
2610        rt.insert(
2611            "User",
2612            &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2613        )
2614        .unwrap();
2615
2616        // Hold the write lock to simulate a long write.
2617        let write_guard = rt.lock_write_conn().unwrap();
2618
2619        // Spawn reader threads that should succeed despite the held write lock.
2620        let mut handles = Vec::new();
2621        for _ in 0..4 {
2622            let rt_clone = Arc::clone(&rt);
2623            handles.push(std::thread::spawn(move || {
2624                let rows = rt_clone.list("User").unwrap();
2625                assert_eq!(rows.len(), 2);
2626            }));
2627        }
2628
2629        for h in handles {
2630            h.join().expect("reader thread panicked");
2631        }
2632
2633        // Release the write lock.
2634        drop(write_guard);
2635
2636        // Clean up.
2637        let _ = std::fs::remove_dir_all(&dir);
2638    }
2639}