1pub mod account_backend;
2pub mod api_key_backend;
3pub mod cache_handlers;
4pub mod cache_server;
5pub mod config;
6pub mod cron;
7pub mod datastore;
8pub mod ip_limit;
9pub mod job_store;
10pub mod jobs;
11pub mod log;
12pub mod loro_store;
13pub mod pg_loro_store;
14pub mod magic_code_backend;
15pub mod metrics;
16pub mod oauth_backend;
17pub mod org_backend;
18pub mod verification_backend;
19pub mod openapi;
20pub mod presence;
21pub mod pubsub;
22pub mod rate_limit;
23pub mod resp;
24pub mod resp_server;
25pub mod rooms;
26pub mod scheduler;
27pub mod server;
28pub mod session_backend;
29pub mod shard_ws;
30pub mod sse;
31pub mod tls;
32pub mod workflow_store;
33pub mod workflows;
34pub mod ws;
35
36use std::collections::HashMap;
37use std::sync::atomic::{AtomicUsize, Ordering};
38use std::sync::Mutex;
39
40use pylon_kernel::{AppManifest, ManifestEntity};
41use rusqlite::Connection;
42
43#[derive(Debug, Clone)]
48pub struct RuntimeError {
49 pub code: String,
50 pub message: String,
51}
52
53impl std::fmt::Display for RuntimeError {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 write!(f, "[{}] {}", self.code, self.message)
56 }
57}
58
59impl std::error::Error for RuntimeError {}
60
61impl From<pylon_http::DataError> for RuntimeError {
66 fn from(e: pylon_http::DataError) -> Self {
67 RuntimeError {
68 code: e.code,
69 message: e.message,
70 }
71 }
72}
73
74fn quote_ident(name: &str) -> String {
81 format!("\"{}\"", name.replace('"', "\"\""))
82}
83
84fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
88 if name == "id" {
89 return Ok(());
90 }
91 if entity.fields.iter().any(|f| f.name == name) {
92 return Ok(());
93 }
94 Err(RuntimeError {
95 code: "INVALID_COLUMN".into(),
96 message: format!(
97 "Unknown column \"{}\" -- valid columns: id, {}",
98 name,
99 entity
100 .fields
101 .iter()
102 .map(|f| f.name.as_str())
103 .collect::<Vec<_>>()
104 .join(", ")
105 ),
106 })
107}
108
109fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
121 let pragmas: &[(&str, &str)] = if in_memory {
122 &[
123 ("temp_store", "MEMORY"),
124 ("cache_size", "-65536"),
125 ("foreign_keys", "ON"),
126 ]
127 } else {
128 &[
129 ("journal_mode", "WAL"),
130 ("synchronous", "NORMAL"),
131 ("cache_size", "-65536"),
132 ("mmap_size", "268435456"),
133 ("temp_store", "MEMORY"),
134 ("busy_timeout", "5000"),
135 ("foreign_keys", "ON"),
136 ("wal_autocheckpoint", "1000"),
137 ]
138 };
139 for (key, value) in pragmas {
140 let _ = conn.pragma_update(None, key, value);
141 }
142}
143
144enum ReadConnGuard<'a> {
151 Pooled(std::sync::MutexGuard<'a, Connection>),
152 Write(std::sync::MutexGuard<'a, Connection>),
153}
154
155impl<'a> std::ops::Deref for ReadConnGuard<'a> {
156 type Target = Connection;
157 fn deref(&self) -> &Connection {
158 match self {
159 ReadConnGuard::Pooled(g) => g,
160 ReadConnGuard::Write(g) => g,
161 }
162 }
163}
164
165pub struct Runtime {
183 backend: RuntimeBackend,
184 manifest: AppManifest,
185 entities: HashMap<String, ManifestEntity>,
186 is_in_memory: bool,
190}
191
192enum RuntimeBackend {
195 Sqlite(SqliteBackend),
196 Postgres(PgBackend),
197}
198
199struct SqliteBackend {
203 write_conn: Mutex<Connection>,
205 read_pool: Vec<Mutex<Connection>>,
208 read_counter: AtomicUsize,
210 crdt: crate::loro_store::LoroStore,
215}
216
217pub(crate) struct PgBackend {
220 pub(crate) store: pylon_storage::pg_datastore::PostgresDataStore,
221 pub(crate) crdt: std::sync::Arc<crate::pg_loro_store::PgLoroStore>,
226}
227
228const READ_POOL_SIZE: usize = 4;
230
231fn is_postgres_url(url: &str) -> bool {
235 let lower = url.to_ascii_lowercase();
236 lower.starts_with("postgres://") || lower.starts_with("postgresql://")
237}
238
239fn data_err_to_runtime(e: pylon_http::DataError) -> RuntimeError {
243 RuntimeError {
244 code: e.code,
245 message: e.message,
246 }
247}
248
249impl Runtime {
250 pub fn open(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
258 if is_postgres_url(url) {
259 Self::open_postgres(url, manifest)
260 } else {
261 let conn = Connection::open(url).map_err(|e| RuntimeError {
262 code: "RUNTIME_OPEN_FAILED".into(),
263 message: format!("Failed to open database: {e}"),
264 })?;
265 Self::from_connection(conn, manifest, false)
266 }
267 }
268
269 pub fn open_postgres(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
279 let store = pylon_storage::pg_datastore::PostgresDataStore::connect(url, manifest.clone())
280 .map_err(data_err_to_runtime)?;
281 store.with_client(|c| crate::pg_loro_store::ensure_sidecar(c)).map_err(|e| {
287 RuntimeError {
288 code: "CRDT_SIDECAR_BOOTSTRAP_FAILED".into(),
289 message: format!("ensure pg crdt sidecar: {e}"),
290 }
291 })?;
292 let entities: HashMap<String, ManifestEntity> = manifest
293 .entities
294 .iter()
295 .map(|e| (e.name.clone(), e.clone()))
296 .collect();
297 Ok(Self {
298 backend: RuntimeBackend::Postgres(PgBackend {
299 store,
300 crdt: std::sync::Arc::new(crate::pg_loro_store::PgLoroStore::new()),
301 }),
302 manifest,
303 entities,
304 is_in_memory: false,
305 })
306 }
307
308 pub fn is_in_memory(&self) -> bool {
319 self.is_in_memory
320 }
321
322 pub fn db_path(&self) -> Option<String> {
328 if self.is_in_memory {
329 return None;
330 }
331 let sb = match &self.backend {
332 RuntimeBackend::Sqlite(sb) => sb,
333 RuntimeBackend::Postgres(_) => return None,
334 };
335 let conn = sb.write_conn.lock().ok()?;
336 conn.path().filter(|p| !p.is_empty()).map(String::from)
337 }
338
339 pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
346 if !self.is_in_memory() {
347 return Err(RuntimeError {
348 code: "RESET_REFUSED".into(),
349 message: "reset_for_tests is only available on in-memory databases".into(),
350 });
351 }
352 let conn = self.lock_write_conn()?;
353 let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
354 for name in entity_names {
355 let sql = format!("DELETE FROM {}", quote_ident(&name));
356 let _ = conn.execute(&sql, []);
357 let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
359 let _ = conn.execute(&fts_sql, []);
360 }
361 Ok(())
362 }
363
364 pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
368 let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
369 code: "RUNTIME_OPEN_FAILED".into(),
370 message: format!("Failed to open in-memory database: {e}"),
371 })?;
372 Self::from_connection(conn, manifest, true)
373 }
374
375 fn from_connection(
376 conn: Connection,
377 manifest: AppManifest,
378 is_in_memory: bool,
379 ) -> Result<Self, RuntimeError> {
380 tune_runtime_connection(&conn, is_in_memory);
382
383 let entities: HashMap<String, ManifestEntity> = manifest
385 .entities
386 .iter()
387 .map(|e| (e.name.clone(), e.clone()))
388 .collect();
389
390 for entity in &manifest.entities {
392 let fields: Vec<String> = entity
393 .fields
394 .iter()
395 .map(|f| {
396 let col_type = match f.field_type.as_str() {
397 "int" => "INTEGER",
398 "float" => "REAL",
399 "bool" => "INTEGER",
400 _ => "TEXT",
401 };
402 let not_null = if f.optional { "" } else { " NOT NULL" };
403 let unique = if f.unique { " UNIQUE" } else { "" };
404 format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
405 })
406 .collect();
407
408 let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
409 cols.extend(fields);
410 let sql = format!(
411 "CREATE TABLE IF NOT EXISTS {} ({})",
412 quote_ident(&entity.name),
413 cols.join(", ")
414 );
415 conn.execute(&sql, []).map_err(|e| RuntimeError {
416 code: "SCHEMA_INIT_FAILED".into(),
417 message: format!("Failed to create table {}: {e}", entity.name),
418 })?;
419
420 for idx in &entity.indexes {
422 let unique_kw = if idx.unique { "UNIQUE " } else { "" };
423 let quoted_fields: Vec<String> =
424 idx.fields.iter().map(|f| quote_ident(f)).collect();
425 let idx_sql = format!(
426 "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
427 quote_ident(&idx.name),
428 quote_ident(&entity.name),
429 quoted_fields.join(", ")
430 );
431 conn.execute(&idx_sql, []).ok();
432 }
433
434 let text_fields: Vec<&str> = entity
442 .fields
443 .iter()
444 .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
445 .map(|f| f.name.as_str())
446 .collect();
447 if !text_fields.is_empty() {
448 let fts_name = format!("{}_fts", entity.name);
449 let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
450 let fts_sql = format!(
451 "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
452 quote_ident(&fts_name),
453 quoted_cols.join(", "),
454 quote_ident(&entity.name),
455 );
456 let fts_ok = conn.execute(&fts_sql, []).is_ok();
459
460 if fts_ok {
461 let tbl = quote_ident(&entity.name);
472 let ftb = quote_ident(&fts_name);
473 let cols_list = quoted_cols.join(", ");
474 let new_list: Vec<String> = text_fields
475 .iter()
476 .map(|f| format!("new.{}", quote_ident(f)))
477 .collect();
478 let old_list: Vec<String> = text_fields
479 .iter()
480 .map(|f| format!("old.{}", quote_ident(f)))
481 .collect();
482
483 let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
484 let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
485 let trigger_au = quote_ident(&format!("{}_au", fts_name));
486
487 let trigger_ins = format!(
488 "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
489 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
490 new_vals = new_list.join(", "),
491 );
492 let trigger_del = format!(
493 "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
494 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
495 old_vals = old_list.join(", "),
496 );
497 let trigger_upd = format!(
498 "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
499 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
500 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
501 new_vals = new_list.join(", "),
502 old_vals = old_list.join(", "),
503 );
504 for (label, sql) in [
507 ("ai", &trigger_ins),
508 ("ad", &trigger_del),
509 ("au", &trigger_upd),
510 ] {
511 if let Err(e) = conn.execute(sql, []) {
512 tracing::warn!(
513 "[fts] failed to create {label} trigger for {}: {e}",
514 entity.name
515 );
516 }
517 }
518 }
519 }
520 }
521
522 let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
526
527 let read_pool = if let Some(ref path) = db_path {
528 let mut pool = Vec::with_capacity(READ_POOL_SIZE);
529 for _ in 0..READ_POOL_SIZE {
530 let read_conn = Connection::open_with_flags(
531 path,
532 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
533 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
534 )
535 .map_err(|e| RuntimeError {
536 code: "POOL_OPEN_FAILED".into(),
537 message: format!("Failed to open read connection: {e}"),
538 })?;
539 tune_runtime_connection(&read_conn, false);
540 pool.push(Mutex::new(read_conn));
541 }
542 pool
543 } else {
544 Vec::new()
546 };
547
548 crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
551 code: "CRDT_SIDECAR_FAILED".into(),
552 message: format!("create CRDT sidecar table: {e}"),
553 })?;
554
555 Ok(Self {
556 backend: RuntimeBackend::Sqlite(SqliteBackend {
557 write_conn: Mutex::new(conn),
558 read_pool,
559 read_counter: AtomicUsize::new(0),
560 crdt: crate::loro_store::LoroStore::new(),
561 }),
562 manifest,
563 entities,
564 is_in_memory,
565 })
566 }
567
568 pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
579 if matches!(self.backend, RuntimeBackend::Postgres(_)) {
583 return Ok(());
584 }
585 let conn = self.lock_write_conn()?;
586 conn.execute(pylon_storage::search::create_facet_table_sql(), [])
587 .map_err(|e| RuntimeError {
588 code: "FACET_TABLE_FAILED".into(),
589 message: format!("create _facet_bitmap: {e}"),
590 })?;
591 for entity in &self.manifest.entities {
592 if let Some(cfg) = &entity.search {
593 if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
594 conn.execute(&sql, []).map_err(|e| RuntimeError {
595 code: "FTS_TABLE_FAILED".into(),
596 message: format!("create FTS table for {}: {e}", entity.name),
597 })?;
598 }
599 for field in &cfg.sortable {
600 let idx_sql = format!(
601 "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
602 entity.name, entity.name,
603 );
604 conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
605 code: "SORT_INDEX_FAILED".into(),
606 message: format!("create sort index for {}.{field}: {e}", entity.name),
607 })?;
608 }
609 }
610 }
611 Ok(())
612 }
613
614 pub fn manifest(&self) -> &AppManifest {
616 &self.manifest
617 }
618
619 pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
625 self.lock_write_conn()
626 }
627
628 pub fn read_pool_size(&self) -> usize {
633 match &self.backend {
634 RuntimeBackend::Sqlite(sb) => sb.read_pool.len(),
635 RuntimeBackend::Postgres(_) => 0,
636 }
637 }
638
639 pub fn is_postgres(&self) -> bool {
642 matches!(self.backend, RuntimeBackend::Postgres(_))
643 }
644
645 pub(crate) fn crdt_fields_for(
654 &self,
655 ent: &ManifestEntity,
656 ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
657 let mut out = Vec::with_capacity(ent.fields.len());
658 for f in &ent.fields {
659 if f.name == "id" {
662 continue;
663 }
664 let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
665 code: "INVALID_CRDT_FIELD".into(),
666 message: format!(
667 "{}.{}: {e} (declared type={}, crdt={:?})",
668 ent.name, f.name, f.field_type, f.crdt
669 ),
670 })?;
671 out.push(pylon_crdt::CrdtField {
672 name: f.name.clone(),
673 kind,
674 });
675 }
676 Ok(out)
677 }
678
679 pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
687 match &self.backend {
688 RuntimeBackend::Sqlite(sb) => &sb.crdt,
689 RuntimeBackend::Postgres(_) => {
690 panic!("crdt_store() called on Postgres-backed Runtime")
691 }
692 }
693 }
694
695 pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
708 if let Some(pg) = self.pg_backend() {
709 let ent = self.require_entity(entity)?;
710 if ent.crdt {
716 let crdt_fields = self.crdt_fields_for(ent)?;
717 let id = generate_id();
718 let mut row = data.clone();
721 if let Some(obj) = row.as_object_mut() {
722 obj.insert("id".into(), serde_json::Value::String(id.clone()));
723 }
724 let result = pg.store.with_transaction_raw(|tx| -> Result<(), RuntimeError> {
725 pg.crdt
726 .apply_patch(tx, entity, &id, &crdt_fields, data)
727 .map_err(|e| RuntimeError {
728 code: "CRDT_APPLY_FAILED".into(),
729 message: format!("crdt write {entity}/{id}: {e}"),
730 })?;
731 pylon_storage::pg_tx_store::tx_insert(tx, &self.manifest, entity, &row)
732 .map(|_| ())
733 .map_err(data_err_to_runtime)?;
734 pg.crdt.cache_after_commit(tx, entity, &id);
735 Ok(())
736 });
737 if result.is_err() {
738 pg.crdt.evict(entity, &id);
745 }
746 result?;
747 return Ok(id);
748 }
749 return pylon_http::DataStore::insert(&pg.store, entity, data)
753 .map_err(data_err_to_runtime);
754 }
755 let ent = self.require_entity(entity)?;
756 let conn = self.lock_write_conn()?;
757
758 let id = generate_id();
759
760 let obj = data.as_object().ok_or_else(|| RuntimeError {
761 code: "INVALID_DATA".into(),
762 message: "Insert data must be a JSON object".into(),
763 })?;
764
765 for key in obj.keys() {
768 if key != "id" {
769 validate_column_name(key, ent)?;
770 }
771 }
772
773 let sb = self.sqlite_backend()?;
778
779 with_write_tx(&conn, || {
783 if ent.crdt {
784 let crdt_fields = self.crdt_fields_for(ent)?;
785 sb.crdt
786 .apply_patch(&conn, entity, &id, &crdt_fields, data)
787 .map_err(|e| RuntimeError {
788 code: "CRDT_APPLY_FAILED".into(),
789 message: format!("crdt write {entity}/{id}: {e}"),
790 })?;
791 }
792
793 let mut col_names = vec![quote_ident("id")];
794 let mut placeholders = vec!["?1".to_string()];
795 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
796
797 let mut idx = 2;
798 for (key, val) in obj {
799 if key == "id" {
800 continue;
801 }
802 col_names.push(quote_ident(key));
803 placeholders.push(format!("?{idx}"));
804 values.push(json_to_sql(val));
805 idx += 1;
806 }
807
808 let sql = format!(
809 "INSERT INTO {} ({}) VALUES ({})",
810 quote_ident(entity),
811 col_names.join(", "),
812 placeholders.join(", ")
813 );
814
815 let params: Vec<&dyn rusqlite::types::ToSql> =
816 values.iter().map(|v| v.as_ref()).collect();
817 conn.execute(&sql, params.as_slice())
818 .map_err(|e| RuntimeError {
819 code: "INSERT_FAILED".into(),
820 message: format!("Insert into {entity} failed: {e}"),
821 })?;
822
823 if let Some(cfg) = ent.search.as_ref() {
827 if !cfg.is_empty() {
828 pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
829 .map_err(|e| RuntimeError {
830 code: "SEARCH_MAINTENANCE_FAILED".into(),
831 message: format!("search index update on insert {entity}: {e}"),
832 })?;
833 }
834 }
835 Ok(())
836 })?;
837
838 Ok(id)
839 }
840
841 pub fn get_by_id(
843 &self,
844 entity: &str,
845 id: &str,
846 ) -> Result<Option<serde_json::Value>, RuntimeError> {
847 if let Some(pg) = self.pg_backend() {
848 return pylon_http::DataStore::get_by_id(&pg.store, entity, id)
849 .map_err(data_err_to_runtime);
850 }
851 let ent = self.require_entity(entity)?;
852 let conn = self.lock_read_conn()?;
853
854 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
855 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
856 code: "QUERY_FAILED".into(),
857 message: format!("Failed to prepare query: {e}"),
858 })?;
859
860 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
861
862 let result = stmt
863 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
864 .ok();
865
866 Ok(result)
867 }
868
869 pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
871 if let Some(pg) = self.pg_backend() {
872 return pylon_http::DataStore::list(&pg.store, entity).map_err(data_err_to_runtime);
873 }
874 let ent = self.require_entity(entity)?;
875 let conn = self.lock_read_conn()?;
876
877 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
878 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
879 code: "QUERY_FAILED".into(),
880 message: format!("Failed to prepare query: {e}"),
881 })?;
882
883 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
884
885 let rows = stmt
886 .query_map([], |row| Ok(row_to_json(row, &columns)))
887 .map_err(|e| RuntimeError {
888 code: "QUERY_FAILED".into(),
889 message: format!("Query failed: {e}"),
890 })?;
891
892 let mut result = Vec::new();
893 for row in rows {
894 if let Ok(val) = row {
895 result.push(val);
896 }
897 }
898 Ok(result)
899 }
900
901 pub fn list_after(
903 &self,
904 entity: &str,
905 after: Option<&str>,
906 limit: usize,
907 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
908 if let Some(pg) = self.pg_backend() {
909 return pylon_http::DataStore::list_after(&pg.store, entity, after, limit)
910 .map_err(data_err_to_runtime);
911 }
912 let ent = self.require_entity(entity)?;
913 let conn = self.lock_read_conn()?;
914
915 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
916 let table = quote_ident(entity);
917
918 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
919 Some(cursor) => (
920 format!(
921 "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
922 table
923 ),
924 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
925 ),
926 None => (
927 format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
928 vec![Box::new(limit as i64)],
929 ),
930 };
931
932 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
933 params.iter().map(|v| v.as_ref()).collect();
934
935 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
936 code: "QUERY_FAILED".into(),
937 message: format!("Failed to prepare query: {e}"),
938 })?;
939
940 let rows = stmt
941 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
942 .map_err(|e| RuntimeError {
943 code: "QUERY_FAILED".into(),
944 message: format!("Query failed: {e}"),
945 })?;
946
947 let mut result = Vec::new();
948 for row in rows {
949 if let Ok(val) = row {
950 result.push(val);
951 }
952 }
953 Ok(result)
954 }
955
956 pub fn update(
962 &self,
963 entity: &str,
964 id: &str,
965 data: &serde_json::Value,
966 ) -> Result<bool, RuntimeError> {
967 if let Some(pg) = self.pg_backend() {
968 let ent = self.require_entity(entity)?;
969 if ent.crdt {
970 let crdt_fields = self.crdt_fields_for(ent)?;
983 let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
984 pg.crdt
985 .apply_patch(tx, entity, id, &crdt_fields, data)
986 .map_err(|e| RuntimeError {
987 code: "CRDT_APPLY_FAILED".into(),
988 message: format!("crdt update {entity}/{id}: {e}"),
989 })?;
990 let updated = pylon_storage::pg_tx_store::tx_update(
991 tx,
992 &self.manifest,
993 entity,
994 id,
995 data,
996 )
997 .map_err(data_err_to_runtime)?;
998 if !updated {
999 return Err(RuntimeError {
1002 code: "ENTITY_NOT_FOUND".into(),
1003 message: format!(
1004 "Update on {entity}/{id} found no row — refusing to commit \
1005 a CRDT snapshot that would orphan."
1006 ),
1007 });
1008 }
1009 pg.crdt.cache_after_commit(tx, entity, id);
1013 Ok(updated)
1014 });
1015 if result.is_err() {
1016 pg.crdt.evict(entity, id);
1017 if let Err(ref e) = result {
1023 if e.code == "ENTITY_NOT_FOUND" {
1024 return Ok(false);
1025 }
1026 }
1027 }
1028 return result;
1029 }
1030 return pylon_http::DataStore::update(&pg.store, entity, id, data)
1031 .map_err(data_err_to_runtime);
1032 }
1033 let ent = self.require_entity(entity)?;
1034 let conn = self.lock_write_conn()?;
1035
1036 let obj = data.as_object().ok_or_else(|| RuntimeError {
1037 code: "INVALID_DATA".into(),
1038 message: "Update data must be a JSON object".into(),
1039 })?;
1040
1041 for key in obj.keys() {
1043 if key != "id" {
1044 validate_column_name(key, ent)?;
1045 }
1046 }
1047 let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
1048 if writable_keys.is_empty() {
1049 return Ok(false);
1050 }
1051
1052 let sb = self.sqlite_backend()?;
1054
1055 let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
1058 if ent.crdt {
1059 let crdt_fields = self.crdt_fields_for(ent)?;
1060 sb.crdt
1061 .apply_patch(&conn, entity, id, &crdt_fields, data)
1062 .map_err(|e| RuntimeError {
1063 code: "CRDT_APPLY_FAILED".into(),
1064 message: format!("crdt write {entity}/{id}: {e}"),
1065 })?;
1066 }
1067
1068 let mut set_clauses = Vec::new();
1069 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1070 let mut idx = 1;
1071 for key in &writable_keys {
1072 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1073 values.push(json_to_sql(&obj[key.as_str()]));
1074 idx += 1;
1075 }
1076
1077 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1081 let old_row = if searchable {
1082 self.get_by_id_with_conn(&conn, entity, id)?
1083 } else {
1084 None
1085 };
1086
1087 values.push(Box::new(id.to_string()));
1088 let sql = format!(
1089 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1090 quote_ident(entity),
1091 set_clauses.join(", ")
1092 );
1093
1094 let params: Vec<&dyn rusqlite::types::ToSql> =
1095 values.iter().map(|v| v.as_ref()).collect();
1096 let affected = conn
1097 .execute(&sql, params.as_slice())
1098 .map_err(|e| RuntimeError {
1099 code: "UPDATE_FAILED".into(),
1100 message: format!("Update {entity}/{id} failed: {e}"),
1101 })? as i64;
1102
1103 if affected > 0 && searchable {
1104 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1105 pylon_storage::search_maintenance::apply_update(
1106 &conn, entity, id, &old, data, cfg,
1107 )
1108 .map_err(|e| RuntimeError {
1109 code: "SEARCH_MAINTENANCE_FAILED".into(),
1110 message: format!("search index update on update {entity}: {e}"),
1111 })?;
1112 }
1113 }
1114 Ok(affected)
1115 })?;
1116
1117 Ok(affected > 0)
1118 }
1119
1120 pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
1122 if let Some(pg) = self.pg_backend() {
1123 let ent = self.require_entity(entity)?;
1124 if ent.crdt {
1125 let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
1130 tx.execute(
1131 "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
1132 &[&entity, &id],
1133 )
1134 .map_err(|e| RuntimeError {
1135 code: "CRDT_SIDECAR_DELETE_FAILED".into(),
1136 message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
1137 })?;
1138 pylon_storage::pg_tx_store::tx_delete(tx, &self.manifest, entity, id)
1139 .map_err(data_err_to_runtime)
1140 });
1141 if result.is_ok() {
1147 pg.crdt.evict(entity, id);
1148 }
1149 return result;
1150 }
1151 return pylon_http::DataStore::delete(&pg.store, entity, id)
1152 .map_err(data_err_to_runtime);
1153 }
1154 let ent = self.require_entity(entity)?;
1155 let conn = self.lock_write_conn()?;
1156
1157 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1160 if searchable {
1161 if let (Some(cfg), Ok(Some(row))) = (
1162 ent.search.as_ref(),
1163 self.get_by_id_with_conn(&conn, entity, id),
1164 ) {
1165 pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
1166 .map_err(|e| RuntimeError {
1167 code: "SEARCH_MAINTENANCE_FAILED".into(),
1168 message: format!("search index update on delete {entity}: {e}"),
1169 })?;
1170 }
1171 }
1172
1173 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1174 let affected = conn
1175 .execute(&sql, rusqlite::params![id])
1176 .map_err(|e| RuntimeError {
1177 code: "DELETE_FAILED".into(),
1178 message: format!("Delete {entity}/{id} failed: {e}"),
1179 })?;
1180
1181 Ok(affected > 0)
1182 }
1183
1184 pub fn lookup(
1186 &self,
1187 entity: &str,
1188 field: &str,
1189 value: &str,
1190 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1191 if let Some(pg) = self.pg_backend() {
1192 return pylon_http::DataStore::lookup(&pg.store, entity, field, value)
1193 .map_err(data_err_to_runtime);
1194 }
1195 let ent = self.require_entity(entity)?;
1196 validate_column_name(field, ent)?;
1197 let conn = self.lock_read_conn()?;
1198
1199 let sql = format!(
1200 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1201 quote_ident(entity),
1202 quote_ident(field)
1203 );
1204 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1205
1206 let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1207 stmt.query_row(rusqlite::params![value], |row| {
1208 Ok(row_to_json(row, &columns))
1209 })
1210 .ok()
1211 });
1212
1213 Ok(result)
1214 }
1215
1216 pub fn link(
1218 &self,
1219 entity: &str,
1220 id: &str,
1221 relation: &str,
1222 target_id: &str,
1223 ) -> Result<bool, RuntimeError> {
1224 let ent = self.require_entity(entity)?;
1225
1226 let rel = ent
1228 .relations
1229 .iter()
1230 .find(|r| r.name == relation)
1231 .ok_or_else(|| RuntimeError {
1232 code: "RELATION_NOT_FOUND".into(),
1233 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1234 })?;
1235
1236 let data = serde_json::json!({ rel.field.clone(): target_id });
1237 self.update(entity, id, &data)
1238 }
1239
1240 pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
1242 let ent = self.require_entity(entity)?;
1243
1244 let rel = ent
1245 .relations
1246 .iter()
1247 .find(|r| r.name == relation)
1248 .ok_or_else(|| RuntimeError {
1249 code: "RELATION_NOT_FOUND".into(),
1250 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1251 })?;
1252
1253 let data = serde_json::json!({ rel.field.clone(): null });
1254 self.update(entity, id, &data)
1255 }
1256
1257 pub fn query_filtered(
1259 &self,
1260 entity: &str,
1261 filter: &serde_json::Value,
1262 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1263 if let Some(pg) = self.pg_backend() {
1264 return pylon_http::DataStore::query_filtered(&pg.store, entity, filter)
1265 .map_err(data_err_to_runtime);
1266 }
1267 let ent = self.require_entity(entity)?;
1268 let conn = self.lock_read_conn()?;
1269
1270 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1271 let obj = filter
1272 .as_object()
1273 .unwrap_or(&serde_json::Map::new())
1274 .clone();
1275
1276 let mut where_clauses = Vec::new();
1277 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1278 let mut order_clause = String::new();
1279 let mut limit_clause = String::new();
1280 let mut join_clause = String::new();
1281 let mut fts_order = false;
1282 let mut idx = 1;
1283
1284 for (key, val) in &obj {
1285 match key.as_str() {
1286 "$order" => {
1287 if let Some(order_obj) = val.as_object() {
1288 let mut parts: Vec<String> = Vec::new();
1289 for (col, dir) in order_obj {
1290 validate_column_name(col, ent)?;
1291 let d = match dir.as_str().unwrap_or("asc") {
1292 "desc" | "DESC" => "DESC",
1293 _ => "ASC",
1294 };
1295 parts.push(format!("{} {d}", quote_ident(col)));
1296 }
1297 if !parts.is_empty() {
1298 order_clause = format!(" ORDER BY {}", parts.join(", "));
1299 }
1300 }
1301 }
1302 "$limit" => {
1303 if let Some(n) = val.as_u64() {
1304 limit_clause = format!(" LIMIT {n}");
1305 }
1306 }
1307 "$offset" => {
1308 if let Some(n) = val.as_u64() {
1309 if limit_clause.is_empty() {
1311 limit_clause = " LIMIT -1".into();
1312 }
1313 limit_clause = format!("{limit_clause} OFFSET {n}");
1314 }
1315 }
1316 "$search" => {
1317 if let Some(q) = val.as_str() {
1318 let fts = format!("{}_fts", entity);
1320 join_clause = format!(
1321 " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
1322 fts = quote_ident(&fts),
1323 ent = quote_ident(entity),
1324 );
1325 where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
1326 values.push(Box::new(q.to_string()));
1327 fts_order = true;
1328 idx += 1;
1329 }
1330 }
1331 _ => {
1332 validate_column_name(key, ent)?;
1333 let quoted_key = quote_ident(key);
1334
1335 if let Some(op_obj) = val.as_object() {
1336 for (op, op_val) in op_obj {
1337 match op.as_str() {
1338 "$not" => {
1339 where_clauses.push(format!("{quoted_key} != ?{idx}"));
1340 values.push(json_to_sql(op_val));
1341 idx += 1;
1342 }
1343 "$gt" => {
1344 where_clauses.push(format!("{quoted_key} > ?{idx}"));
1345 values.push(json_to_sql(op_val));
1346 idx += 1;
1347 }
1348 "$gte" => {
1349 where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1350 values.push(json_to_sql(op_val));
1351 idx += 1;
1352 }
1353 "$lt" => {
1354 where_clauses.push(format!("{quoted_key} < ?{idx}"));
1355 values.push(json_to_sql(op_val));
1356 idx += 1;
1357 }
1358 "$lte" => {
1359 where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1360 values.push(json_to_sql(op_val));
1361 idx += 1;
1362 }
1363 "$like" => {
1364 where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1365 let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1366 values.push(Box::new(pattern));
1367 idx += 1;
1368 }
1369 "$in" => {
1370 if let Some(arr) = op_val.as_array() {
1371 if arr.is_empty() {
1372 where_clauses.push("0".into());
1381 } else {
1382 let placeholders: Vec<String> = arr
1383 .iter()
1384 .map(|v| {
1385 let p = format!("?{idx}");
1386 values.push(json_to_sql(v));
1387 idx += 1;
1388 p
1389 })
1390 .collect();
1391 where_clauses.push(format!(
1392 "{quoted_key} IN ({})",
1393 placeholders.join(", ")
1394 ));
1395 }
1396 }
1397 }
1398 _ => {}
1399 }
1400 }
1401 } else {
1402 where_clauses.push(format!("{quoted_key} = ?{idx}"));
1404 values.push(json_to_sql(val));
1405 idx += 1;
1406 }
1407 }
1408 }
1409 }
1410
1411 let where_sql = if where_clauses.is_empty() {
1412 String::new()
1413 } else {
1414 format!(" WHERE {}", where_clauses.join(" AND "))
1415 };
1416
1417 if order_clause.is_empty() {
1418 order_clause = if fts_order {
1419 " ORDER BY bm25(".to_string() + "e_ident(&format!("{}_fts", entity)) + ")"
1421 } else {
1422 format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1423 };
1424 }
1425
1426 let select_prefix = format!("{}.*", quote_ident(entity));
1427 let sql = format!(
1428 "SELECT {} FROM {}{}{}{}{}",
1429 select_prefix,
1430 quote_ident(entity),
1431 join_clause,
1432 where_sql,
1433 order_clause,
1434 limit_clause
1435 );
1436 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1437 values.iter().map(|v| v.as_ref()).collect();
1438
1439 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1440 code: "QUERY_FAILED".into(),
1441 message: format!("Failed to prepare filtered query: {e}"),
1442 })?;
1443
1444 let rows = stmt
1445 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1446 .map_err(|e| RuntimeError {
1447 code: "QUERY_FAILED".into(),
1448 message: format!("Filtered query failed: {e}"),
1449 })?;
1450
1451 let mut result = Vec::new();
1452 for row in rows {
1453 if let Ok(val) = row {
1454 result.push(val);
1455 }
1456 }
1457 Ok(result)
1458 }
1459
1460 pub fn query_graph(
1465 &self,
1466 query: &serde_json::Value,
1467 ) -> Result<serde_json::Value, RuntimeError> {
1468 let obj = query.as_object().ok_or_else(|| RuntimeError {
1469 code: "INVALID_QUERY".into(),
1470 message: "Graph query must be a JSON object".into(),
1471 })?;
1472
1473 let mut results = serde_json::Map::new();
1474
1475 for (entity_name, query_opts) in obj {
1476 let _ent = self.require_entity(entity_name)?;
1477
1478 let filter = query_opts
1480 .get("where")
1481 .cloned()
1482 .unwrap_or(serde_json::json!({}));
1483 let rows = self.query_filtered(entity_name, &filter)?;
1484
1485 let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1487 {
1488 let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1494 code: "INVARIANT_BROKEN".into(),
1495 message: format!(
1496 "entity \"{entity_name}\" missing from registry during include expansion"
1497 ),
1498 })?;
1499 rows.into_iter()
1500 .map(|mut row| {
1501 for (rel_name, _sub_query) in include {
1502 if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1503 let fk_value = row
1504 .get(&rel.field)
1505 .and_then(|v| v.as_str())
1506 .map(|s| s.to_string());
1507 if let Some(fk) = fk_value {
1508 if rel.many {
1509 let sub_filter = serde_json::json!({ &rel.field: &fk });
1511 if let Ok(related) =
1512 self.query_filtered(&rel.target, &sub_filter)
1513 {
1514 row[rel_name] = serde_json::json!(related);
1515 }
1516 } else {
1517 if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1519 {
1520 row[rel_name] = related;
1521 }
1522 }
1523 }
1524 }
1525 }
1526 row
1527 })
1528 .collect()
1529 } else {
1530 rows
1531 };
1532
1533 let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1535 rows.into_iter().take(limit as usize).collect()
1536 } else {
1537 rows
1538 };
1539
1540 results.insert(entity_name.clone(), serde_json::json!(rows));
1541 }
1542
1543 Ok(serde_json::Value::Object(results))
1544 }
1545
1546 pub fn insert_with_conn(
1552 &self,
1553 conn: &Connection,
1554 entity: &str,
1555 data: &serde_json::Value,
1556 ) -> Result<String, RuntimeError> {
1557 let ent = self.require_entity(entity)?;
1558 let id = generate_id();
1559 let obj = data.as_object().ok_or_else(|| RuntimeError {
1560 code: "INVALID_DATA".into(),
1561 message: "Insert data must be a JSON object".into(),
1562 })?;
1563
1564 let mut col_names = vec![quote_ident("id")];
1565 let mut placeholders = vec!["?1".to_string()];
1566 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1567 let mut idx = 2;
1568 for (key, val) in obj {
1569 if key == "id" {
1570 continue;
1571 }
1572 validate_column_name(key, ent)?;
1573 col_names.push(quote_ident(key));
1574 placeholders.push(format!("?{idx}"));
1575 values.push(json_to_sql(val));
1576 idx += 1;
1577 }
1578
1579 let sql = format!(
1580 "INSERT INTO {} ({}) VALUES ({})",
1581 quote_ident(entity),
1582 col_names.join(", "),
1583 placeholders.join(", ")
1584 );
1585 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1586 conn.execute(&sql, params.as_slice())
1587 .map_err(|e| RuntimeError {
1588 code: "INSERT_FAILED".into(),
1589 message: format!("Insert into {entity} failed: {e}"),
1590 })?;
1591
1592 if let Some(cfg) = ent.search.as_ref() {
1595 if !cfg.is_empty() {
1596 pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1597 .map_err(|e| RuntimeError {
1598 code: "SEARCH_MAINTENANCE_FAILED".into(),
1599 message: format!("search index update on insert {entity}: {e}"),
1600 })?;
1601 }
1602 }
1603
1604 Ok(id)
1605 }
1606
1607 pub fn update_with_conn(
1609 &self,
1610 conn: &Connection,
1611 entity: &str,
1612 id: &str,
1613 data: &serde_json::Value,
1614 ) -> Result<bool, RuntimeError> {
1615 let ent = self.require_entity(entity)?;
1616 let obj = data.as_object().ok_or_else(|| RuntimeError {
1617 code: "INVALID_DATA".into(),
1618 message: "Update data must be a JSON object".into(),
1619 })?;
1620
1621 let mut set_clauses = Vec::new();
1622 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1623 let mut idx = 1;
1624 for (key, val) in obj {
1625 if key == "id" {
1626 continue;
1627 }
1628 validate_column_name(key, ent)?;
1629 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1630 values.push(json_to_sql(val));
1631 idx += 1;
1632 }
1633 if set_clauses.is_empty() {
1634 return Ok(false);
1635 }
1636
1637 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1642 let old_row = if searchable {
1643 self.get_by_id_with_conn(conn, entity, id)?
1644 } else {
1645 None
1646 };
1647
1648 values.push(Box::new(id.to_string()));
1649 let sql = format!(
1650 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1651 quote_ident(entity),
1652 set_clauses.join(", ")
1653 );
1654 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1655 let affected = conn
1656 .execute(&sql, params.as_slice())
1657 .map_err(|e| RuntimeError {
1658 code: "UPDATE_FAILED".into(),
1659 message: format!("Update {entity}/{id} failed: {e}"),
1660 })?;
1661
1662 if affected > 0 && searchable {
1663 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1664 pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1665 .map_err(|e| RuntimeError {
1666 code: "SEARCH_MAINTENANCE_FAILED".into(),
1667 message: format!("search index update on update {entity}: {e}"),
1668 })?;
1669 }
1670 }
1671
1672 Ok(affected > 0)
1673 }
1674
1675 pub fn delete_with_conn(
1677 &self,
1678 conn: &Connection,
1679 entity: &str,
1680 id: &str,
1681 ) -> Result<bool, RuntimeError> {
1682 let ent = self.require_entity(entity)?;
1683
1684 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1687 if searchable {
1688 if let (Some(cfg), Ok(Some(row))) = (
1689 ent.search.as_ref(),
1690 self.get_by_id_with_conn(conn, entity, id),
1691 ) {
1692 pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1693 .map_err(|e| RuntimeError {
1694 code: "SEARCH_MAINTENANCE_FAILED".into(),
1695 message: format!("search index update on delete {entity}: {e}"),
1696 })?;
1697 }
1698 }
1699
1700 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1701 let affected = conn
1702 .execute(&sql, rusqlite::params![id])
1703 .map_err(|e| RuntimeError {
1704 code: "DELETE_FAILED".into(),
1705 message: format!("Delete {entity}/{id} failed: {e}"),
1706 })?;
1707 Ok(affected > 0)
1708 }
1709
1710 pub fn get_by_id_with_conn(
1712 &self,
1713 conn: &Connection,
1714 entity: &str,
1715 id: &str,
1716 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1717 let ent = self.require_entity(entity)?;
1718 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1719 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1720 code: "QUERY_FAILED".into(),
1721 message: format!("Failed to prepare query: {e}"),
1722 })?;
1723 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1724 Ok(stmt
1725 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1726 .ok())
1727 }
1728
1729 pub fn list_with_conn(
1731 &self,
1732 conn: &Connection,
1733 entity: &str,
1734 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1735 let ent = self.require_entity(entity)?;
1736 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1737 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1738 code: "QUERY_FAILED".into(),
1739 message: format!("Failed to prepare query: {e}"),
1740 })?;
1741 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1742 let rows = stmt
1743 .query_map([], |row| Ok(row_to_json(row, &columns)))
1744 .map_err(|e| RuntimeError {
1745 code: "QUERY_FAILED".into(),
1746 message: format!("Query failed: {e}"),
1747 })?;
1748 Ok(rows.flatten().collect())
1749 }
1750
1751 pub fn list_after_with_conn(
1753 &self,
1754 conn: &Connection,
1755 entity: &str,
1756 after: Option<&str>,
1757 limit: usize,
1758 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1759 let ent = self.require_entity(entity)?;
1760 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1761 let table = quote_ident(entity);
1762 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1763 Some(cursor) => (
1764 format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1765 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1766 ),
1767 None => (
1768 format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1769 vec![Box::new(limit as i64)],
1770 ),
1771 };
1772 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1773 params.iter().map(|v| v.as_ref()).collect();
1774 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1775 code: "QUERY_FAILED".into(),
1776 message: format!("Failed to prepare: {e}"),
1777 })?;
1778 let rows = stmt
1779 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1780 .map_err(|e| RuntimeError {
1781 code: "QUERY_FAILED".into(),
1782 message: format!("Query failed: {e}"),
1783 })?;
1784 Ok(rows.flatten().collect())
1785 }
1786
1787 pub fn lookup_with_conn(
1789 &self,
1790 conn: &Connection,
1791 entity: &str,
1792 field: &str,
1793 value: &str,
1794 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1795 let ent = self.require_entity(entity)?;
1796 validate_column_name(field, ent)?;
1797 let sql = format!(
1798 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1799 quote_ident(entity),
1800 quote_ident(field)
1801 );
1802 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1803 Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1804 stmt.query_row(rusqlite::params![value], |row| {
1805 Ok(row_to_json(row, &columns))
1806 })
1807 .ok()
1808 }))
1809 }
1810
1811 pub fn link_with_conn(
1813 &self,
1814 conn: &Connection,
1815 entity: &str,
1816 id: &str,
1817 relation: &str,
1818 target_id: &str,
1819 ) -> Result<bool, RuntimeError> {
1820 let ent = self.require_entity(entity)?;
1821 let rel = ent
1822 .relations
1823 .iter()
1824 .find(|r| r.name == relation)
1825 .ok_or_else(|| RuntimeError {
1826 code: "RELATION_NOT_FOUND".into(),
1827 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1828 })?;
1829 let data = serde_json::json!({ rel.field.clone(): target_id });
1830 self.update_with_conn(conn, entity, id, &data)
1831 }
1832
1833 pub fn unlink_with_conn(
1835 &self,
1836 conn: &Connection,
1837 entity: &str,
1838 id: &str,
1839 relation: &str,
1840 ) -> Result<bool, RuntimeError> {
1841 let ent = self.require_entity(entity)?;
1842 let rel = ent
1843 .relations
1844 .iter()
1845 .find(|r| r.name == relation)
1846 .ok_or_else(|| RuntimeError {
1847 code: "RELATION_NOT_FOUND".into(),
1848 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1849 })?;
1850 let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1851 self.update_with_conn(conn, entity, id, &data)
1852 }
1853
1854 pub fn query_filtered_with_conn(
1859 &self,
1860 conn: &Connection,
1861 entity: &str,
1862 filter: &serde_json::Value,
1863 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1864 let ent = self.require_entity(entity)?;
1865 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1866 let empty = serde_json::Map::new();
1867 let obj = filter.as_object().unwrap_or(&empty);
1868
1869 let mut where_clauses = Vec::new();
1870 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1871 let mut order_clause = String::new();
1872 let mut limit_clause = String::new();
1873 let mut idx = 1;
1874
1875 for (key, val) in obj {
1876 match key.as_str() {
1877 "$order" => {
1878 if let Some(o) = val.as_object() {
1879 let mut parts: Vec<String> = Vec::new();
1880 for (col, dir) in o {
1881 validate_column_name(col, ent)?;
1882 let d = match dir.as_str().unwrap_or("asc") {
1883 "desc" | "DESC" => "DESC",
1884 _ => "ASC",
1885 };
1886 parts.push(format!("{} {d}", quote_ident(col)));
1887 }
1888 if !parts.is_empty() {
1889 order_clause = format!(" ORDER BY {}", parts.join(", "));
1890 }
1891 }
1892 }
1893 "$limit" => {
1894 if let Some(n) = val.as_u64() {
1895 limit_clause = format!(" LIMIT {n}");
1896 }
1897 }
1898 "$offset" => {
1899 if let Some(n) = val.as_u64() {
1900 if limit_clause.is_empty() {
1901 limit_clause = " LIMIT -1".into();
1902 }
1903 limit_clause = format!("{limit_clause} OFFSET {n}");
1904 }
1905 }
1906 _ => {
1907 validate_column_name(key, ent)?;
1908 let qk = quote_ident(key);
1909 if let Some(op_obj) = val.as_object() {
1910 for (op, op_val) in op_obj {
1911 match op.as_str() {
1912 "$not" => {
1913 where_clauses.push(format!("{qk} != ?{idx}"));
1914 values.push(json_to_sql(op_val));
1915 idx += 1;
1916 }
1917 "$gt" => {
1918 where_clauses.push(format!("{qk} > ?{idx}"));
1919 values.push(json_to_sql(op_val));
1920 idx += 1;
1921 }
1922 "$gte" => {
1923 where_clauses.push(format!("{qk} >= ?{idx}"));
1924 values.push(json_to_sql(op_val));
1925 idx += 1;
1926 }
1927 "$lt" => {
1928 where_clauses.push(format!("{qk} < ?{idx}"));
1929 values.push(json_to_sql(op_val));
1930 idx += 1;
1931 }
1932 "$lte" => {
1933 where_clauses.push(format!("{qk} <= ?{idx}"));
1934 values.push(json_to_sql(op_val));
1935 idx += 1;
1936 }
1937 "$like" => {
1938 where_clauses.push(format!("{qk} LIKE ?{idx}"));
1939 let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1940 values.push(Box::new(p));
1941 idx += 1;
1942 }
1943 "$in" => {
1944 if let Some(arr) = op_val.as_array() {
1945 let ph: Vec<String> = arr
1946 .iter()
1947 .map(|v| {
1948 let p = format!("?{idx}");
1949 values.push(json_to_sql(v));
1950 idx += 1;
1951 p
1952 })
1953 .collect();
1954 if !ph.is_empty() {
1955 where_clauses
1956 .push(format!("{qk} IN ({})", ph.join(", ")));
1957 }
1958 }
1959 }
1960 _ => {}
1961 }
1962 }
1963 } else {
1964 where_clauses.push(format!("{qk} = ?{idx}"));
1965 values.push(json_to_sql(val));
1966 idx += 1;
1967 }
1968 }
1969 }
1970 }
1971
1972 let where_sql = if where_clauses.is_empty() {
1973 String::new()
1974 } else {
1975 format!(" WHERE {}", where_clauses.join(" AND "))
1976 };
1977 if order_clause.is_empty() {
1978 order_clause = " ORDER BY \"id\"".into();
1979 }
1980
1981 let sql = format!(
1982 "SELECT * FROM {}{}{}{}",
1983 quote_ident(entity),
1984 where_sql,
1985 order_clause,
1986 limit_clause
1987 );
1988 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1989 values.iter().map(|v| v.as_ref()).collect();
1990 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1991 code: "QUERY_FAILED".into(),
1992 message: format!("Failed to prepare: {e}"),
1993 })?;
1994 let rows = stmt
1995 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1996 .map_err(|e| RuntimeError {
1997 code: "QUERY_FAILED".into(),
1998 message: format!("Query failed: {e}"),
1999 })?;
2000 Ok(rows.flatten().collect())
2001 }
2002
2003 pub fn query_graph_with_conn(
2005 &self,
2006 conn: &Connection,
2007 query: &serde_json::Value,
2008 ) -> Result<serde_json::Value, RuntimeError> {
2009 let obj = query.as_object().ok_or_else(|| RuntimeError {
2010 code: "INVALID_QUERY".into(),
2011 message: "Graph query must be a JSON object".into(),
2012 })?;
2013 let mut results = serde_json::Map::new();
2014 for (entity_name, query_opts) in obj {
2015 let _ent = self.require_entity(entity_name)?;
2016 let filter = query_opts
2017 .get("where")
2018 .cloned()
2019 .unwrap_or(serde_json::json!({}));
2020 let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
2021 results.insert(entity_name.clone(), serde_json::json!(rows));
2022 }
2023 Ok(serde_json::Value::Object(results))
2024 }
2025
2026 pub fn aggregate(
2033 &self,
2034 entity: &str,
2035 spec: &serde_json::Value,
2036 ) -> Result<serde_json::Value, RuntimeError> {
2037 if let Some(pg) = self.pg_backend() {
2038 return pylon_http::DataStore::aggregate(&pg.store, entity, spec)
2039 .map_err(data_err_to_runtime);
2040 }
2041 let ent = self.require_entity(entity)?;
2042 let conn = self.lock_read_conn()?;
2043 let obj = spec.as_object().ok_or_else(|| RuntimeError {
2044 code: "INVALID_QUERY".into(),
2045 message: "aggregate spec must be an object".into(),
2046 })?;
2047
2048 let mut select_parts: Vec<String> = Vec::new();
2050 let mut result_fields: Vec<String> = Vec::new();
2051
2052 if let Some(count) = obj.get("count") {
2053 match count {
2054 serde_json::Value::String(s) if s == "*" => {
2055 select_parts.push("COUNT(*) AS count".into());
2056 result_fields.push("count".into());
2057 }
2058 serde_json::Value::String(field) => {
2059 validate_column_name(field, ent)?;
2060 let alias = format!("count_{field}");
2061 select_parts.push(format!(
2062 "COUNT({}) AS {}",
2063 quote_ident(field),
2064 quote_ident(&alias)
2065 ));
2066 result_fields.push(alias);
2067 }
2068 _ => {}
2069 }
2070 }
2071
2072 for (fn_name, alias_prefix) in [
2073 ("sum", "sum_"),
2074 ("avg", "avg_"),
2075 ("min", "min_"),
2076 ("max", "max_"),
2077 ] {
2078 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
2079 for field in fields {
2080 if let Some(f) = field.as_str() {
2081 validate_column_name(f, ent)?;
2082 let alias = format!("{alias_prefix}{f}");
2083 let sql_fn = fn_name.to_uppercase();
2084 select_parts.push(format!(
2085 "{}({}) AS {}",
2086 sql_fn,
2087 quote_ident(f),
2088 quote_ident(&alias)
2089 ));
2090 result_fields.push(alias);
2091 }
2092 }
2093 }
2094 }
2095
2096 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
2101 for field in fields {
2102 if let Some(f) = field.as_str() {
2103 validate_column_name(f, ent)?;
2104 let alias = format!("count_distinct_{f}");
2105 select_parts.push(format!(
2106 "COUNT(DISTINCT {}) AS {}",
2107 quote_ident(f),
2108 quote_ident(&alias)
2109 ));
2110 result_fields.push(alias);
2111 }
2112 }
2113 }
2114
2115 let mut group_by: Vec<String> = Vec::new();
2121 let mut group_select: Vec<String> = Vec::new();
2122 let mut group_field_names: Vec<String> = Vec::new();
2123 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
2124 for g in groups {
2125 if let Some(f) = g.as_str() {
2126 validate_column_name(f, ent)?;
2127 let quoted = quote_ident(f);
2128 group_by.push(quoted.clone());
2129 group_select.push(quoted);
2130 group_field_names.push(f.to_string());
2131 } else if let Some(spec) = g.as_object() {
2132 let field =
2133 spec.get("field")
2134 .and_then(|v| v.as_str())
2135 .ok_or_else(|| RuntimeError {
2136 code: "INVALID_QUERY".into(),
2137 message: "groupBy object spec requires `field`".into(),
2138 })?;
2139 validate_column_name(field, ent)?;
2140 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
2141 let fmt = match bucket {
2142 "hour" => "%Y-%m-%d %H:00:00",
2143 "day" => "%Y-%m-%d",
2144 "month" => "%Y-%m",
2145 "year" => "%Y",
2146 "week" => "%Y-W%W",
2147 _ => {
2148 return Err(RuntimeError {
2149 code: "INVALID_QUERY".into(),
2150 message: format!(
2151 "bucket must be one of hour/day/week/month/year, got {bucket}"
2152 ),
2153 });
2154 }
2155 };
2156 let alias = format!("{field}_{bucket}");
2157 let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
2158 group_by.push(expr.clone());
2159 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
2160 group_field_names.push(alias);
2161 }
2162 }
2163 }
2164 let mut full_select = group_select.clone();
2165 full_select.extend(select_parts.iter().cloned());
2166 if full_select.is_empty() {
2167 return Err(RuntimeError {
2168 code: "INVALID_QUERY".into(),
2169 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
2170 });
2171 }
2172
2173 let mut where_clauses = Vec::new();
2175 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
2176 let mut idx = 1;
2177 if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
2178 for (k, v) in where_obj {
2179 validate_column_name(k, ent)?;
2180 where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
2181 values.push(json_to_sql(v));
2182 idx += 1;
2183 }
2184 }
2185 let where_sql = if where_clauses.is_empty() {
2186 String::new()
2187 } else {
2188 format!(" WHERE {}", where_clauses.join(" AND "))
2189 };
2190
2191 let group_sql = if group_by.is_empty() {
2192 String::new()
2193 } else {
2194 format!(" GROUP BY {}", group_by.join(", "))
2195 };
2196
2197 let sql = format!(
2198 "SELECT {} FROM {}{}{}",
2199 full_select.join(", "),
2200 quote_ident(entity),
2201 where_sql,
2202 group_sql
2203 );
2204
2205 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
2206 values.iter().map(|v| v.as_ref()).collect();
2207 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
2208 code: "QUERY_FAILED".into(),
2209 message: format!("Failed to prepare aggregate: {e}"),
2210 })?;
2211
2212 let column_names: Vec<String> = {
2213 let mut v = group_field_names.clone();
2214 v.extend(result_fields.iter().cloned());
2215 v
2216 };
2217
2218 let rows = stmt
2219 .query_map(param_refs.as_slice(), |row| {
2220 let mut obj = serde_json::Map::new();
2221 for (i, name) in column_names.iter().enumerate() {
2222 if let Ok(n) = row.get::<_, i64>(i) {
2224 obj.insert(name.clone(), serde_json::Value::Number(n.into()));
2225 } else if let Ok(f) = row.get::<_, f64>(i) {
2226 if let Some(num) = serde_json::Number::from_f64(f) {
2227 obj.insert(name.clone(), serde_json::Value::Number(num));
2228 } else {
2229 obj.insert(name.clone(), serde_json::Value::Null);
2230 }
2231 } else if let Ok(s) = row.get::<_, String>(i) {
2232 obj.insert(name.clone(), serde_json::Value::String(s));
2233 } else {
2234 obj.insert(name.clone(), serde_json::Value::Null);
2235 }
2236 }
2237 Ok(serde_json::Value::Object(obj))
2238 })
2239 .map_err(|e| RuntimeError {
2240 code: "QUERY_FAILED".into(),
2241 message: format!("Aggregate failed: {e}"),
2242 })?;
2243
2244 let mut result = Vec::new();
2245 for row in rows {
2246 if let Ok(val) = row {
2247 result.push(val);
2248 }
2249 }
2250 Ok(serde_json::json!({ "rows": result }))
2251 }
2252
2253 fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
2258 self.entities.get(name).ok_or_else(|| RuntimeError {
2259 code: "ENTITY_NOT_FOUND".into(),
2260 message: format!("Unknown entity: \"{name}\""),
2261 })
2262 }
2263
2264 fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
2270 let sb = self.sqlite_backend()?;
2271 sb.write_conn.lock().map_err(|e| RuntimeError {
2272 code: "LOCK_FAILED".into(),
2273 message: format!("Failed to acquire write connection lock: {e}"),
2274 })
2275 }
2276
2277 fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
2281 let sb = self.sqlite_backend()?;
2282 if !sb.read_pool.is_empty() {
2283 let idx = sb.read_counter.fetch_add(1, Ordering::Relaxed) % sb.read_pool.len();
2284 let guard = sb.read_pool[idx].lock().map_err(|e| RuntimeError {
2285 code: "LOCK_FAILED".into(),
2286 message: format!("Failed to acquire read connection: {e}"),
2287 })?;
2288 Ok(ReadConnGuard::Pooled(guard))
2289 } else {
2290 let guard = sb.write_conn.lock().map_err(|e| RuntimeError {
2292 code: "LOCK_FAILED".into(),
2293 message: format!("Failed to acquire connection: {e}"),
2294 })?;
2295 Ok(ReadConnGuard::Write(guard))
2296 }
2297 }
2298
2299 fn sqlite_backend(&self) -> Result<&SqliteBackend, RuntimeError> {
2303 match &self.backend {
2304 RuntimeBackend::Sqlite(sb) => Ok(sb),
2305 RuntimeBackend::Postgres(_) => Err(RuntimeError {
2306 code: "NOT_SQLITE_BACKEND".into(),
2307 message: "this operation requires a SQLite-backed Runtime".into(),
2308 }),
2309 }
2310 }
2311
2312 pub(crate) fn pg_backend(&self) -> Option<&PgBackend> {
2317 match &self.backend {
2318 RuntimeBackend::Sqlite(_) => None,
2319 RuntimeBackend::Postgres(pg) => Some(pg),
2320 }
2321 }
2322
2323 pub fn pg_data_store_pub(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2333 self.pg_data_store()
2334 }
2335
2336 #[doc(hidden)]
2337 pub fn pg_data_store_for_tests(&self) -> &pylon_storage::pg_datastore::PostgresDataStore {
2338 self.pg_data_store().expect("pg backend")
2339 }
2340
2341 #[doc(hidden)]
2346 pub fn run_in_pg_mutation_tx_for_tests<F, T, E>(&self, body: F) -> Result<T, E>
2347 where
2348 F: FnOnce(&dyn pylon_http::DataStore) -> Result<T, E>,
2349 E: From<pylon_http::DataError>,
2350 {
2351 let pg_backend = self.pg_backend().expect("pg backend");
2352 let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
2353 std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2354 crdt: std::sync::Arc::clone(&pg_backend.crdt),
2355 manifest: std::sync::Arc::new(self.manifest.clone()),
2356 });
2357 pg_backend
2358 .store
2359 .with_transaction_crdt(crdt_hook, body)
2360 }
2361
2362 pub(crate) fn pg_data_store(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2363 self.pg_backend().map(|pg| &pg.store)
2364 }
2365}
2366
2367fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
2395where
2396 F: FnOnce() -> Result<T, RuntimeError>,
2397{
2398 conn.execute("BEGIN IMMEDIATE", [])
2399 .map_err(|e| RuntimeError {
2400 code: "TX_BEGIN_FAILED".into(),
2401 message: format!("BEGIN: {e}"),
2402 })?;
2403 match body() {
2404 Ok(v) => {
2405 conn.execute("COMMIT", []).map_err(|e| RuntimeError {
2406 code: "TX_COMMIT_FAILED".into(),
2407 message: format!("COMMIT: {e}"),
2408 })?;
2409 Ok(v)
2410 }
2411 Err(e) => {
2412 let _ = conn.execute("ROLLBACK", []);
2415 Err(e)
2416 }
2417 }
2418}
2419
2420fn generate_id() -> String {
2421 use std::sync::atomic::{AtomicU32, Ordering};
2422 use std::time::{SystemTime, UNIX_EPOCH};
2423 static COUNTER: AtomicU32 = AtomicU32::new(0);
2424 let nanos = SystemTime::now()
2425 .duration_since(UNIX_EPOCH)
2426 .unwrap_or_default()
2427 .as_nanos();
2428 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2429 format!("{nanos:032x}{seq:08x}")
2430}
2431
2432fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2434 match val {
2435 serde_json::Value::Null => Box::new(rusqlite::types::Null),
2436 serde_json::Value::Bool(b) => Box::new(*b as i32),
2437 serde_json::Value::Number(n) => {
2438 if let Some(i) = n.as_i64() {
2439 Box::new(i)
2440 } else if let Some(f) = n.as_f64() {
2441 Box::new(f)
2442 } else {
2443 Box::new(n.to_string())
2444 }
2445 }
2446 serde_json::Value::String(s) => Box::new(s.clone()),
2447 other => Box::new(other.to_string()),
2448 }
2449}
2450
2451fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2466 let mut obj = serde_json::Map::new();
2467
2468 let stmt = row.as_ref();
2469 let count = stmt.column_count();
2470 for i in 0..count {
2471 let name = match stmt.column_name(i) {
2475 Ok(n) => n.to_string(),
2476 Err(_) => continue,
2477 };
2478 let value = if let Ok(s) = row.get::<_, String>(i) {
2479 serde_json::Value::String(s)
2480 } else if let Ok(n) = row.get::<_, i64>(i) {
2481 serde_json::Value::Number(serde_json::Number::from(n))
2482 } else if let Ok(f) = row.get::<_, f64>(i) {
2483 serde_json::Number::from_f64(f)
2484 .map(serde_json::Value::Number)
2485 .unwrap_or(serde_json::Value::Null)
2486 } else {
2487 serde_json::Value::Null
2488 };
2489 obj.insert(name, value);
2490 }
2491
2492 serde_json::Value::Object(obj)
2493}
2494
2495#[cfg(test)]
2496mod tests {
2497 use super::*;
2498 use pylon_kernel::{ManifestField, ManifestIndex};
2499
2500 fn test_manifest() -> AppManifest {
2501 AppManifest {
2502 manifest_version: 1,
2503 name: "Test".into(),
2504 version: "0.1.0".into(),
2505 entities: vec![pylon_kernel::ManifestEntity {
2506 name: "User".into(),
2507 fields: vec![
2508 ManifestField {
2509 name: "email".into(),
2510 field_type: "string".into(),
2511 optional: false,
2512 unique: true,
2513 crdt: None,
2514 },
2515 ManifestField {
2516 name: "displayName".into(),
2517 field_type: "string".into(),
2518 optional: false,
2519 unique: false,
2520 crdt: None,
2521 },
2522 ],
2523 indexes: vec![ManifestIndex {
2524 name: "user_email".into(),
2525 fields: vec!["email".into()],
2526 unique: true,
2527 }],
2528 relations: vec![],
2529 search: None,
2530 crdt: true,
2531 }],
2532 routes: vec![],
2533 queries: vec![],
2534 actions: vec![],
2535 policies: vec![],
2536 auth: Default::default(),
2537 }
2538 }
2539
2540 #[test]
2541 fn reset_for_tests_wipes_in_memory() {
2542 let rt = Runtime::in_memory(test_manifest()).unwrap();
2543 rt.insert(
2544 "User",
2545 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2546 )
2547 .unwrap();
2548 assert_eq!(rt.list("User").unwrap().len(), 1);
2549 rt.reset_for_tests().unwrap();
2550 assert_eq!(rt.list("User").unwrap().len(), 0);
2551 }
2552
2553 #[test]
2554 fn reset_for_tests_refuses_file_db() {
2555 let dir = std::env::temp_dir().join("pylon-reset-refuse");
2556 let _ = std::fs::create_dir_all(&dir);
2557 let db_path = dir.join("db.sqlite");
2558 let _ = std::fs::remove_file(&db_path);
2559 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2560 let err = rt.reset_for_tests().unwrap_err();
2561 assert_eq!(err.code, "RESET_REFUSED");
2562 let _ = std::fs::remove_file(&db_path);
2563 }
2564
2565 #[test]
2566 fn insert_and_get() {
2567 let rt = Runtime::in_memory(test_manifest()).unwrap();
2568 let id = rt
2569 .insert(
2570 "User",
2571 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2572 )
2573 .unwrap();
2574 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2575 assert_eq!(row["email"], "a@b.com");
2576 }
2577
2578 #[test]
2587 fn row_to_json_handles_columns_added_out_of_manifest_order() {
2588 let mut manifest = test_manifest();
2590 manifest.entities[0].fields = vec![
2591 ManifestField {
2592 name: "email".into(),
2593 field_type: "string".into(),
2594 optional: false,
2595 unique: true,
2596 crdt: None,
2597 },
2598 ManifestField {
2599 name: "displayName".into(),
2600 field_type: "string".into(),
2601 optional: false,
2602 unique: false,
2603 crdt: None,
2604 },
2605 ManifestField {
2606 name: "avatarColor".into(),
2607 field_type: "string".into(),
2608 optional: true,
2609 unique: false,
2610 crdt: None,
2611 },
2612 ManifestField {
2613 name: "createdAt".into(),
2614 field_type: "datetime".into(),
2615 optional: true,
2616 unique: false,
2617 crdt: None,
2618 },
2619 ];
2620 manifest.entities[0].crdt = false;
2627 let rt = Runtime::in_memory(manifest).unwrap();
2628 let id = rt
2629 .insert(
2630 "User",
2631 &serde_json::json!({
2632 "email": "a@b.com",
2633 "displayName": "Alice",
2634 "avatarColor": "#abc",
2635 "createdAt": "2026-01-01T00:00:00Z",
2636 }),
2637 )
2638 .unwrap();
2639
2640 {
2646 let conn = rt.lock_write_conn().unwrap();
2647 conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2648 .unwrap();
2649 conn.execute(
2650 "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2651 rusqlite::params!["hashed-password", &id],
2652 )
2653 .unwrap();
2654 }
2655 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2661 assert_eq!(row["email"], "a@b.com");
2665 assert_eq!(row["displayName"], "Alice");
2666 assert_eq!(row["avatarColor"], "#abc");
2667 assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2668 assert_eq!(row["passwordHash"], "hashed-password");
2669 }
2670
2671 #[test]
2676 fn crdt_default_writes_through_loro_store() {
2677 let rt = Runtime::in_memory(test_manifest()).unwrap();
2678 let id = rt
2679 .insert(
2680 "User",
2681 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2682 )
2683 .unwrap();
2684
2685 let conn = rt.lock_write_conn().unwrap();
2687 let snap_count: i64 = conn
2688 .query_row(
2689 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2690 WHERE entity = ?1 AND row_id = ?2",
2691 rusqlite::params!["User", &id],
2692 |r| r.get(0),
2693 )
2694 .unwrap();
2695 assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2696
2697 assert!(rt.crdt_store().cached_rows() >= 1);
2700
2701 drop(conn);
2703 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2704 assert_eq!(row["email"], "x@y.com");
2705 assert_eq!(row["displayName"], "Eric");
2706 }
2707
2708 #[test]
2712 fn crdt_update_persists_new_snapshot() {
2713 let rt = Runtime::in_memory(test_manifest()).unwrap();
2714 let id = rt
2715 .insert(
2716 "User",
2717 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2718 )
2719 .unwrap();
2720
2721 let snap_after_insert: Vec<u8> = {
2722 let conn = rt.lock_write_conn().unwrap();
2723 conn.query_row(
2724 "SELECT snapshot FROM _pylon_crdt_snapshots
2725 WHERE entity = 'User' AND row_id = ?1",
2726 rusqlite::params![&id],
2727 |r| r.get(0),
2728 )
2729 .unwrap()
2730 };
2731
2732 rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2733 .unwrap();
2734
2735 let snap_after_update: Vec<u8> = {
2736 let conn = rt.lock_write_conn().unwrap();
2737 conn.query_row(
2738 "SELECT snapshot FROM _pylon_crdt_snapshots
2739 WHERE entity = 'User' AND row_id = ?1",
2740 rusqlite::params![&id],
2741 |r| r.get(0),
2742 )
2743 .unwrap()
2744 };
2745
2746 assert_ne!(
2747 snap_after_insert, snap_after_update,
2748 "snapshot bytes should change after an update"
2749 );
2750
2751 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2752 assert_eq!(row["displayName"], "Eric C");
2753 assert_eq!(row["email"], "x@y.com");
2754 }
2755
2756 #[test]
2763 fn crdt_insert_rolls_back_when_sql_step_fails() {
2764 let rt = Runtime::in_memory(test_manifest()).unwrap();
2765 rt.insert(
2767 "User",
2768 &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2769 )
2770 .unwrap();
2771
2772 let snap_count_before: i64 = {
2774 let conn = rt.lock_write_conn().unwrap();
2775 conn.query_row(
2776 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2777 [],
2778 |r| r.get(0),
2779 )
2780 .unwrap()
2781 };
2782
2783 let err = rt
2785 .insert(
2786 "User",
2787 &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2788 )
2789 .expect_err("duplicate email must fail");
2790 assert_eq!(err.code, "INSERT_FAILED");
2791
2792 let snap_count_after: i64 = {
2795 let conn = rt.lock_write_conn().unwrap();
2796 conn.query_row(
2797 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2798 [],
2799 |r| r.get(0),
2800 )
2801 .unwrap()
2802 };
2803 assert_eq!(
2804 snap_count_after, snap_count_before,
2805 "failed insert should not leave a sidecar snapshot behind"
2806 );
2807 }
2808
2809 #[test]
2812 fn crdt_false_skips_loro_store() {
2813 let mut manifest = test_manifest();
2814 manifest.entities[0].crdt = false;
2816 let rt = Runtime::in_memory(manifest).unwrap();
2817
2818 let id = rt
2819 .insert(
2820 "User",
2821 &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2822 )
2823 .unwrap();
2824
2825 let conn = rt.lock_write_conn().unwrap();
2826 let snap_count: i64 = conn
2827 .query_row(
2828 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2829 WHERE entity = 'User' AND row_id = ?1",
2830 rusqlite::params![&id],
2831 |r| r.get(0),
2832 )
2833 .unwrap();
2834 assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2835 assert_eq!(
2836 rt.crdt_store().cached_rows(),
2837 0,
2838 "crdt:false should not warm the cache"
2839 );
2840
2841 drop(conn);
2844 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2845 assert_eq!(row["email"], "lww@example.com");
2846 }
2847
2848 #[test]
2849 fn list_entities() {
2850 let rt = Runtime::in_memory(test_manifest()).unwrap();
2851 rt.insert(
2852 "User",
2853 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2854 )
2855 .unwrap();
2856 rt.insert(
2857 "User",
2858 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2859 )
2860 .unwrap();
2861 let rows = rt.list("User").unwrap();
2862 assert_eq!(rows.len(), 2);
2863 }
2864
2865 #[test]
2866 fn update_entity() {
2867 let rt = Runtime::in_memory(test_manifest()).unwrap();
2868 let id = rt
2869 .insert(
2870 "User",
2871 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2872 )
2873 .unwrap();
2874 let updated = rt
2875 .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2876 .unwrap();
2877 assert!(updated);
2878 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2879 assert_eq!(row["displayName"], "Updated");
2880 }
2881
2882 #[test]
2883 fn delete_entity() {
2884 let rt = Runtime::in_memory(test_manifest()).unwrap();
2885 let id = rt
2886 .insert(
2887 "User",
2888 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2889 )
2890 .unwrap();
2891 let deleted = rt.delete("User", &id).unwrap();
2892 assert!(deleted);
2893 assert!(rt.get_by_id("User", &id).unwrap().is_none());
2894 }
2895
2896 #[test]
2897 fn lookup_by_field() {
2898 let rt = Runtime::in_memory(test_manifest()).unwrap();
2899 rt.insert(
2900 "User",
2901 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2902 )
2903 .unwrap();
2904 let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2905 assert_eq!(row["displayName"], "A");
2906 }
2907
2908 #[test]
2909 fn unknown_entity_returns_error() {
2910 let rt = Runtime::in_memory(test_manifest()).unwrap();
2911 let err = rt.list("Nonexistent").unwrap_err();
2912 assert_eq!(err.code, "ENTITY_NOT_FOUND");
2913 }
2914
2915 #[test]
2916 fn insert_rejects_unknown_column() {
2917 let rt = Runtime::in_memory(test_manifest()).unwrap();
2918 let err = rt
2919 .insert(
2920 "User",
2921 &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2922 )
2923 .unwrap_err();
2924 assert_eq!(err.code, "INVALID_COLUMN");
2925 }
2926
2927 #[test]
2928 fn update_rejects_unknown_column() {
2929 let rt = Runtime::in_memory(test_manifest()).unwrap();
2930 let id = rt
2931 .insert(
2932 "User",
2933 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2934 )
2935 .unwrap();
2936 let err = rt
2937 .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2938 .unwrap_err();
2939 assert_eq!(err.code, "INVALID_COLUMN");
2940 }
2941
2942 #[test]
2943 fn lookup_rejects_unknown_column() {
2944 let rt = Runtime::in_memory(test_manifest()).unwrap();
2945 let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2946 assert_eq!(err.code, "INVALID_COLUMN");
2947 }
2948
2949 #[test]
2950 fn query_filtered_rejects_unknown_column() {
2951 let rt = Runtime::in_memory(test_manifest()).unwrap();
2952 let err = rt
2953 .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2954 .unwrap_err();
2955 assert_eq!(err.code, "INVALID_COLUMN");
2956 }
2957
2958 #[test]
2959 fn query_filtered_rejects_unknown_order_column() {
2960 let rt = Runtime::in_memory(test_manifest()).unwrap();
2961 let err = rt
2962 .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2963 .unwrap_err();
2964 assert_eq!(err.code, "INVALID_COLUMN");
2965 }
2966
2967 #[test]
2968 fn query_filtered_sanitizes_order_direction() {
2969 let rt = Runtime::in_memory(test_manifest()).unwrap();
2970 rt.insert(
2971 "User",
2972 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2973 )
2974 .unwrap();
2975 let rows = rt
2977 .query_filtered(
2978 "User",
2979 &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2980 )
2981 .unwrap();
2982 assert_eq!(rows.len(), 1);
2983 }
2984
2985 #[test]
2986 fn in_memory_has_no_read_pool() {
2987 let rt = Runtime::in_memory(test_manifest()).unwrap();
2988 assert_eq!(rt.read_pool_size(), 0);
2989 }
2990
2991 #[test]
2992 fn open_creates_read_pool() {
2993 let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2994 std::fs::create_dir_all(&dir).unwrap();
2995 let db_path = dir.join("test_read_pool.db");
2996
2997 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2998 assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
2999
3000 let id = rt
3002 .insert(
3003 "User",
3004 &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
3005 )
3006 .unwrap();
3007 let row = rt.get_by_id("User", &id).unwrap().unwrap();
3008 assert_eq!(row["email"], "pool@test.com");
3009
3010 let _ = std::fs::remove_dir_all(&dir);
3012 }
3013
3014 #[test]
3015 fn concurrent_reads_dont_block_on_write() {
3016 use std::sync::Arc;
3017
3018 let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
3019 std::fs::create_dir_all(&dir).unwrap();
3020 let db_path = dir.join("test_concurrent.db");
3021
3022 let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
3023
3024 rt.insert(
3026 "User",
3027 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
3028 )
3029 .unwrap();
3030 rt.insert(
3031 "User",
3032 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
3033 )
3034 .unwrap();
3035
3036 let write_guard = rt.lock_write_conn().unwrap();
3038
3039 let mut handles = Vec::new();
3041 for _ in 0..4 {
3042 let rt_clone = Arc::clone(&rt);
3043 handles.push(std::thread::spawn(move || {
3044 let rows = rt_clone.list("User").unwrap();
3045 assert_eq!(rows.len(), 2);
3046 }));
3047 }
3048
3049 for h in handles {
3050 h.join().expect("reader thread panicked");
3051 }
3052
3053 drop(write_guard);
3055
3056 let _ = std::fs::remove_dir_all(&dir);
3058 }
3059}