1pub mod account_backend;
2pub mod api_key_backend;
3pub mod cache_handlers;
4pub mod cache_server;
5pub mod config;
6pub mod cron;
7pub mod datastore;
8pub mod ip_limit;
9pub mod job_store;
10pub mod jobs;
11pub mod log;
12pub mod loro_store;
13pub mod pg_loro_store;
14pub mod magic_code_backend;
15pub mod metrics;
16pub mod oauth_backend;
17pub mod org_backend;
18pub mod openapi;
19pub mod presence;
20pub mod pubsub;
21pub mod rate_limit;
22pub mod resp;
23pub mod resp_server;
24pub mod rooms;
25pub mod scheduler;
26pub mod server;
27pub mod session_backend;
28pub mod shard_ws;
29pub mod sse;
30pub mod tls;
31pub mod workflow_store;
32pub mod workflows;
33pub mod ws;
34
35use std::collections::HashMap;
36use std::sync::atomic::{AtomicUsize, Ordering};
37use std::sync::Mutex;
38
39use pylon_kernel::{AppManifest, ManifestEntity};
40use rusqlite::Connection;
41
42#[derive(Debug, Clone)]
47pub struct RuntimeError {
48 pub code: String,
49 pub message: String,
50}
51
52impl std::fmt::Display for RuntimeError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "[{}] {}", self.code, self.message)
55 }
56}
57
58impl std::error::Error for RuntimeError {}
59
60impl From<pylon_http::DataError> for RuntimeError {
65 fn from(e: pylon_http::DataError) -> Self {
66 RuntimeError {
67 code: e.code,
68 message: e.message,
69 }
70 }
71}
72
73fn quote_ident(name: &str) -> String {
80 format!("\"{}\"", name.replace('"', "\"\""))
81}
82
83fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
87 if name == "id" {
88 return Ok(());
89 }
90 if entity.fields.iter().any(|f| f.name == name) {
91 return Ok(());
92 }
93 Err(RuntimeError {
94 code: "INVALID_COLUMN".into(),
95 message: format!(
96 "Unknown column \"{}\" -- valid columns: id, {}",
97 name,
98 entity
99 .fields
100 .iter()
101 .map(|f| f.name.as_str())
102 .collect::<Vec<_>>()
103 .join(", ")
104 ),
105 })
106}
107
108fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
120 let pragmas: &[(&str, &str)] = if in_memory {
121 &[
122 ("temp_store", "MEMORY"),
123 ("cache_size", "-65536"),
124 ("foreign_keys", "ON"),
125 ]
126 } else {
127 &[
128 ("journal_mode", "WAL"),
129 ("synchronous", "NORMAL"),
130 ("cache_size", "-65536"),
131 ("mmap_size", "268435456"),
132 ("temp_store", "MEMORY"),
133 ("busy_timeout", "5000"),
134 ("foreign_keys", "ON"),
135 ("wal_autocheckpoint", "1000"),
136 ]
137 };
138 for (key, value) in pragmas {
139 let _ = conn.pragma_update(None, key, value);
140 }
141}
142
143enum ReadConnGuard<'a> {
150 Pooled(std::sync::MutexGuard<'a, Connection>),
151 Write(std::sync::MutexGuard<'a, Connection>),
152}
153
154impl<'a> std::ops::Deref for ReadConnGuard<'a> {
155 type Target = Connection;
156 fn deref(&self) -> &Connection {
157 match self {
158 ReadConnGuard::Pooled(g) => g,
159 ReadConnGuard::Write(g) => g,
160 }
161 }
162}
163
164pub struct Runtime {
182 backend: RuntimeBackend,
183 manifest: AppManifest,
184 entities: HashMap<String, ManifestEntity>,
185 is_in_memory: bool,
189}
190
191enum RuntimeBackend {
194 Sqlite(SqliteBackend),
195 Postgres(PgBackend),
196}
197
198struct SqliteBackend {
202 write_conn: Mutex<Connection>,
204 read_pool: Vec<Mutex<Connection>>,
207 read_counter: AtomicUsize,
209 crdt: crate::loro_store::LoroStore,
214}
215
216pub(crate) struct PgBackend {
219 pub(crate) store: pylon_storage::pg_datastore::PostgresDataStore,
220 pub(crate) crdt: std::sync::Arc<crate::pg_loro_store::PgLoroStore>,
225}
226
227const READ_POOL_SIZE: usize = 4;
229
230fn is_postgres_url(url: &str) -> bool {
234 let lower = url.to_ascii_lowercase();
235 lower.starts_with("postgres://") || lower.starts_with("postgresql://")
236}
237
238fn data_err_to_runtime(e: pylon_http::DataError) -> RuntimeError {
242 RuntimeError {
243 code: e.code,
244 message: e.message,
245 }
246}
247
248impl Runtime {
249 pub fn open(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
257 if is_postgres_url(url) {
258 Self::open_postgres(url, manifest)
259 } else {
260 let conn = Connection::open(url).map_err(|e| RuntimeError {
261 code: "RUNTIME_OPEN_FAILED".into(),
262 message: format!("Failed to open database: {e}"),
263 })?;
264 Self::from_connection(conn, manifest, false)
265 }
266 }
267
268 pub fn open_postgres(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
278 let store = pylon_storage::pg_datastore::PostgresDataStore::connect(url, manifest.clone())
279 .map_err(data_err_to_runtime)?;
280 store.with_client(|c| crate::pg_loro_store::ensure_sidecar(c)).map_err(|e| {
286 RuntimeError {
287 code: "CRDT_SIDECAR_BOOTSTRAP_FAILED".into(),
288 message: format!("ensure pg crdt sidecar: {e}"),
289 }
290 })?;
291 let entities: HashMap<String, ManifestEntity> = manifest
292 .entities
293 .iter()
294 .map(|e| (e.name.clone(), e.clone()))
295 .collect();
296 Ok(Self {
297 backend: RuntimeBackend::Postgres(PgBackend {
298 store,
299 crdt: std::sync::Arc::new(crate::pg_loro_store::PgLoroStore::new()),
300 }),
301 manifest,
302 entities,
303 is_in_memory: false,
304 })
305 }
306
307 pub fn is_in_memory(&self) -> bool {
318 self.is_in_memory
319 }
320
321 pub fn db_path(&self) -> Option<String> {
327 if self.is_in_memory {
328 return None;
329 }
330 let sb = match &self.backend {
331 RuntimeBackend::Sqlite(sb) => sb,
332 RuntimeBackend::Postgres(_) => return None,
333 };
334 let conn = sb.write_conn.lock().ok()?;
335 conn.path().filter(|p| !p.is_empty()).map(String::from)
336 }
337
338 pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
345 if !self.is_in_memory() {
346 return Err(RuntimeError {
347 code: "RESET_REFUSED".into(),
348 message: "reset_for_tests is only available on in-memory databases".into(),
349 });
350 }
351 let conn = self.lock_write_conn()?;
352 let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
353 for name in entity_names {
354 let sql = format!("DELETE FROM {}", quote_ident(&name));
355 let _ = conn.execute(&sql, []);
356 let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
358 let _ = conn.execute(&fts_sql, []);
359 }
360 Ok(())
361 }
362
363 pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
367 let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
368 code: "RUNTIME_OPEN_FAILED".into(),
369 message: format!("Failed to open in-memory database: {e}"),
370 })?;
371 Self::from_connection(conn, manifest, true)
372 }
373
374 fn from_connection(
375 conn: Connection,
376 manifest: AppManifest,
377 is_in_memory: bool,
378 ) -> Result<Self, RuntimeError> {
379 tune_runtime_connection(&conn, is_in_memory);
381
382 let entities: HashMap<String, ManifestEntity> = manifest
384 .entities
385 .iter()
386 .map(|e| (e.name.clone(), e.clone()))
387 .collect();
388
389 for entity in &manifest.entities {
391 let fields: Vec<String> = entity
392 .fields
393 .iter()
394 .map(|f| {
395 let col_type = match f.field_type.as_str() {
396 "int" => "INTEGER",
397 "float" => "REAL",
398 "bool" => "INTEGER",
399 _ => "TEXT",
400 };
401 let not_null = if f.optional { "" } else { " NOT NULL" };
402 let unique = if f.unique { " UNIQUE" } else { "" };
403 format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
404 })
405 .collect();
406
407 let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
408 cols.extend(fields);
409 let sql = format!(
410 "CREATE TABLE IF NOT EXISTS {} ({})",
411 quote_ident(&entity.name),
412 cols.join(", ")
413 );
414 conn.execute(&sql, []).map_err(|e| RuntimeError {
415 code: "SCHEMA_INIT_FAILED".into(),
416 message: format!("Failed to create table {}: {e}", entity.name),
417 })?;
418
419 for idx in &entity.indexes {
421 let unique_kw = if idx.unique { "UNIQUE " } else { "" };
422 let quoted_fields: Vec<String> =
423 idx.fields.iter().map(|f| quote_ident(f)).collect();
424 let idx_sql = format!(
425 "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
426 quote_ident(&idx.name),
427 quote_ident(&entity.name),
428 quoted_fields.join(", ")
429 );
430 conn.execute(&idx_sql, []).ok();
431 }
432
433 let text_fields: Vec<&str> = entity
441 .fields
442 .iter()
443 .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
444 .map(|f| f.name.as_str())
445 .collect();
446 if !text_fields.is_empty() {
447 let fts_name = format!("{}_fts", entity.name);
448 let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
449 let fts_sql = format!(
450 "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
451 quote_ident(&fts_name),
452 quoted_cols.join(", "),
453 quote_ident(&entity.name),
454 );
455 let fts_ok = conn.execute(&fts_sql, []).is_ok();
458
459 if fts_ok {
460 let tbl = quote_ident(&entity.name);
471 let ftb = quote_ident(&fts_name);
472 let cols_list = quoted_cols.join(", ");
473 let new_list: Vec<String> = text_fields
474 .iter()
475 .map(|f| format!("new.{}", quote_ident(f)))
476 .collect();
477 let old_list: Vec<String> = text_fields
478 .iter()
479 .map(|f| format!("old.{}", quote_ident(f)))
480 .collect();
481
482 let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
483 let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
484 let trigger_au = quote_ident(&format!("{}_au", fts_name));
485
486 let trigger_ins = format!(
487 "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
488 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
489 new_vals = new_list.join(", "),
490 );
491 let trigger_del = format!(
492 "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
493 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
494 old_vals = old_list.join(", "),
495 );
496 let trigger_upd = format!(
497 "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
498 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
499 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
500 new_vals = new_list.join(", "),
501 old_vals = old_list.join(", "),
502 );
503 for (label, sql) in [
506 ("ai", &trigger_ins),
507 ("ad", &trigger_del),
508 ("au", &trigger_upd),
509 ] {
510 if let Err(e) = conn.execute(sql, []) {
511 tracing::warn!(
512 "[fts] failed to create {label} trigger for {}: {e}",
513 entity.name
514 );
515 }
516 }
517 }
518 }
519 }
520
521 let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
525
526 let read_pool = if let Some(ref path) = db_path {
527 let mut pool = Vec::with_capacity(READ_POOL_SIZE);
528 for _ in 0..READ_POOL_SIZE {
529 let read_conn = Connection::open_with_flags(
530 path,
531 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
532 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
533 )
534 .map_err(|e| RuntimeError {
535 code: "POOL_OPEN_FAILED".into(),
536 message: format!("Failed to open read connection: {e}"),
537 })?;
538 tune_runtime_connection(&read_conn, false);
539 pool.push(Mutex::new(read_conn));
540 }
541 pool
542 } else {
543 Vec::new()
545 };
546
547 crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
550 code: "CRDT_SIDECAR_FAILED".into(),
551 message: format!("create CRDT sidecar table: {e}"),
552 })?;
553
554 Ok(Self {
555 backend: RuntimeBackend::Sqlite(SqliteBackend {
556 write_conn: Mutex::new(conn),
557 read_pool,
558 read_counter: AtomicUsize::new(0),
559 crdt: crate::loro_store::LoroStore::new(),
560 }),
561 manifest,
562 entities,
563 is_in_memory,
564 })
565 }
566
567 pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
578 if matches!(self.backend, RuntimeBackend::Postgres(_)) {
582 return Ok(());
583 }
584 let conn = self.lock_write_conn()?;
585 conn.execute(pylon_storage::search::create_facet_table_sql(), [])
586 .map_err(|e| RuntimeError {
587 code: "FACET_TABLE_FAILED".into(),
588 message: format!("create _facet_bitmap: {e}"),
589 })?;
590 for entity in &self.manifest.entities {
591 if let Some(cfg) = &entity.search {
592 if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
593 conn.execute(&sql, []).map_err(|e| RuntimeError {
594 code: "FTS_TABLE_FAILED".into(),
595 message: format!("create FTS table for {}: {e}", entity.name),
596 })?;
597 }
598 for field in &cfg.sortable {
599 let idx_sql = format!(
600 "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
601 entity.name, entity.name,
602 );
603 conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
604 code: "SORT_INDEX_FAILED".into(),
605 message: format!("create sort index for {}.{field}: {e}", entity.name),
606 })?;
607 }
608 }
609 }
610 Ok(())
611 }
612
613 pub fn manifest(&self) -> &AppManifest {
615 &self.manifest
616 }
617
618 pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
624 self.lock_write_conn()
625 }
626
627 pub fn read_pool_size(&self) -> usize {
632 match &self.backend {
633 RuntimeBackend::Sqlite(sb) => sb.read_pool.len(),
634 RuntimeBackend::Postgres(_) => 0,
635 }
636 }
637
638 pub fn is_postgres(&self) -> bool {
641 matches!(self.backend, RuntimeBackend::Postgres(_))
642 }
643
644 pub(crate) fn crdt_fields_for(
653 &self,
654 ent: &ManifestEntity,
655 ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
656 let mut out = Vec::with_capacity(ent.fields.len());
657 for f in &ent.fields {
658 if f.name == "id" {
661 continue;
662 }
663 let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
664 code: "INVALID_CRDT_FIELD".into(),
665 message: format!(
666 "{}.{}: {e} (declared type={}, crdt={:?})",
667 ent.name, f.name, f.field_type, f.crdt
668 ),
669 })?;
670 out.push(pylon_crdt::CrdtField {
671 name: f.name.clone(),
672 kind,
673 });
674 }
675 Ok(out)
676 }
677
678 pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
686 match &self.backend {
687 RuntimeBackend::Sqlite(sb) => &sb.crdt,
688 RuntimeBackend::Postgres(_) => {
689 panic!("crdt_store() called on Postgres-backed Runtime")
690 }
691 }
692 }
693
694 pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
707 if let Some(pg) = self.pg_backend() {
708 let ent = self.require_entity(entity)?;
709 if ent.crdt {
715 let crdt_fields = self.crdt_fields_for(ent)?;
716 let id = generate_id();
717 let mut row = data.clone();
720 if let Some(obj) = row.as_object_mut() {
721 obj.insert("id".into(), serde_json::Value::String(id.clone()));
722 }
723 let result = pg.store.with_transaction_raw(|tx| -> Result<(), RuntimeError> {
724 pg.crdt
725 .apply_patch(tx, entity, &id, &crdt_fields, data)
726 .map_err(|e| RuntimeError {
727 code: "CRDT_APPLY_FAILED".into(),
728 message: format!("crdt write {entity}/{id}: {e}"),
729 })?;
730 pylon_storage::pg_tx_store::tx_insert(tx, &self.manifest, entity, &row)
731 .map(|_| ())
732 .map_err(data_err_to_runtime)?;
733 pg.crdt.cache_after_commit(tx, entity, &id);
734 Ok(())
735 });
736 if result.is_err() {
737 pg.crdt.evict(entity, &id);
744 }
745 result?;
746 return Ok(id);
747 }
748 return pylon_http::DataStore::insert(&pg.store, entity, data)
752 .map_err(data_err_to_runtime);
753 }
754 let ent = self.require_entity(entity)?;
755 let conn = self.lock_write_conn()?;
756
757 let id = generate_id();
758
759 let obj = data.as_object().ok_or_else(|| RuntimeError {
760 code: "INVALID_DATA".into(),
761 message: "Insert data must be a JSON object".into(),
762 })?;
763
764 for key in obj.keys() {
767 if key != "id" {
768 validate_column_name(key, ent)?;
769 }
770 }
771
772 let sb = self.sqlite_backend()?;
777
778 with_write_tx(&conn, || {
782 if ent.crdt {
783 let crdt_fields = self.crdt_fields_for(ent)?;
784 sb.crdt
785 .apply_patch(&conn, entity, &id, &crdt_fields, data)
786 .map_err(|e| RuntimeError {
787 code: "CRDT_APPLY_FAILED".into(),
788 message: format!("crdt write {entity}/{id}: {e}"),
789 })?;
790 }
791
792 let mut col_names = vec![quote_ident("id")];
793 let mut placeholders = vec!["?1".to_string()];
794 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
795
796 let mut idx = 2;
797 for (key, val) in obj {
798 if key == "id" {
799 continue;
800 }
801 col_names.push(quote_ident(key));
802 placeholders.push(format!("?{idx}"));
803 values.push(json_to_sql(val));
804 idx += 1;
805 }
806
807 let sql = format!(
808 "INSERT INTO {} ({}) VALUES ({})",
809 quote_ident(entity),
810 col_names.join(", "),
811 placeholders.join(", ")
812 );
813
814 let params: Vec<&dyn rusqlite::types::ToSql> =
815 values.iter().map(|v| v.as_ref()).collect();
816 conn.execute(&sql, params.as_slice())
817 .map_err(|e| RuntimeError {
818 code: "INSERT_FAILED".into(),
819 message: format!("Insert into {entity} failed: {e}"),
820 })?;
821
822 if let Some(cfg) = ent.search.as_ref() {
826 if !cfg.is_empty() {
827 pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
828 .map_err(|e| RuntimeError {
829 code: "SEARCH_MAINTENANCE_FAILED".into(),
830 message: format!("search index update on insert {entity}: {e}"),
831 })?;
832 }
833 }
834 Ok(())
835 })?;
836
837 Ok(id)
838 }
839
840 pub fn get_by_id(
842 &self,
843 entity: &str,
844 id: &str,
845 ) -> Result<Option<serde_json::Value>, RuntimeError> {
846 if let Some(pg) = self.pg_backend() {
847 return pylon_http::DataStore::get_by_id(&pg.store, entity, id)
848 .map_err(data_err_to_runtime);
849 }
850 let ent = self.require_entity(entity)?;
851 let conn = self.lock_read_conn()?;
852
853 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
854 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
855 code: "QUERY_FAILED".into(),
856 message: format!("Failed to prepare query: {e}"),
857 })?;
858
859 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
860
861 let result = stmt
862 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
863 .ok();
864
865 Ok(result)
866 }
867
868 pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
870 if let Some(pg) = self.pg_backend() {
871 return pylon_http::DataStore::list(&pg.store, entity).map_err(data_err_to_runtime);
872 }
873 let ent = self.require_entity(entity)?;
874 let conn = self.lock_read_conn()?;
875
876 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
877 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
878 code: "QUERY_FAILED".into(),
879 message: format!("Failed to prepare query: {e}"),
880 })?;
881
882 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
883
884 let rows = stmt
885 .query_map([], |row| Ok(row_to_json(row, &columns)))
886 .map_err(|e| RuntimeError {
887 code: "QUERY_FAILED".into(),
888 message: format!("Query failed: {e}"),
889 })?;
890
891 let mut result = Vec::new();
892 for row in rows {
893 if let Ok(val) = row {
894 result.push(val);
895 }
896 }
897 Ok(result)
898 }
899
900 pub fn list_after(
902 &self,
903 entity: &str,
904 after: Option<&str>,
905 limit: usize,
906 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
907 if let Some(pg) = self.pg_backend() {
908 return pylon_http::DataStore::list_after(&pg.store, entity, after, limit)
909 .map_err(data_err_to_runtime);
910 }
911 let ent = self.require_entity(entity)?;
912 let conn = self.lock_read_conn()?;
913
914 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
915 let table = quote_ident(entity);
916
917 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
918 Some(cursor) => (
919 format!(
920 "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
921 table
922 ),
923 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
924 ),
925 None => (
926 format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
927 vec![Box::new(limit as i64)],
928 ),
929 };
930
931 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
932 params.iter().map(|v| v.as_ref()).collect();
933
934 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
935 code: "QUERY_FAILED".into(),
936 message: format!("Failed to prepare query: {e}"),
937 })?;
938
939 let rows = stmt
940 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
941 .map_err(|e| RuntimeError {
942 code: "QUERY_FAILED".into(),
943 message: format!("Query failed: {e}"),
944 })?;
945
946 let mut result = Vec::new();
947 for row in rows {
948 if let Ok(val) = row {
949 result.push(val);
950 }
951 }
952 Ok(result)
953 }
954
955 pub fn update(
961 &self,
962 entity: &str,
963 id: &str,
964 data: &serde_json::Value,
965 ) -> Result<bool, RuntimeError> {
966 if let Some(pg) = self.pg_backend() {
967 let ent = self.require_entity(entity)?;
968 if ent.crdt {
969 let crdt_fields = self.crdt_fields_for(ent)?;
982 let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
983 pg.crdt
984 .apply_patch(tx, entity, id, &crdt_fields, data)
985 .map_err(|e| RuntimeError {
986 code: "CRDT_APPLY_FAILED".into(),
987 message: format!("crdt update {entity}/{id}: {e}"),
988 })?;
989 let updated = pylon_storage::pg_tx_store::tx_update(
990 tx,
991 &self.manifest,
992 entity,
993 id,
994 data,
995 )
996 .map_err(data_err_to_runtime)?;
997 if !updated {
998 return Err(RuntimeError {
1001 code: "ENTITY_NOT_FOUND".into(),
1002 message: format!(
1003 "Update on {entity}/{id} found no row — refusing to commit \
1004 a CRDT snapshot that would orphan."
1005 ),
1006 });
1007 }
1008 pg.crdt.cache_after_commit(tx, entity, id);
1012 Ok(updated)
1013 });
1014 if result.is_err() {
1015 pg.crdt.evict(entity, id);
1016 if let Err(ref e) = result {
1022 if e.code == "ENTITY_NOT_FOUND" {
1023 return Ok(false);
1024 }
1025 }
1026 }
1027 return result;
1028 }
1029 return pylon_http::DataStore::update(&pg.store, entity, id, data)
1030 .map_err(data_err_to_runtime);
1031 }
1032 let ent = self.require_entity(entity)?;
1033 let conn = self.lock_write_conn()?;
1034
1035 let obj = data.as_object().ok_or_else(|| RuntimeError {
1036 code: "INVALID_DATA".into(),
1037 message: "Update data must be a JSON object".into(),
1038 })?;
1039
1040 for key in obj.keys() {
1042 if key != "id" {
1043 validate_column_name(key, ent)?;
1044 }
1045 }
1046 let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
1047 if writable_keys.is_empty() {
1048 return Ok(false);
1049 }
1050
1051 let sb = self.sqlite_backend()?;
1053
1054 let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
1057 if ent.crdt {
1058 let crdt_fields = self.crdt_fields_for(ent)?;
1059 sb.crdt
1060 .apply_patch(&conn, entity, id, &crdt_fields, data)
1061 .map_err(|e| RuntimeError {
1062 code: "CRDT_APPLY_FAILED".into(),
1063 message: format!("crdt write {entity}/{id}: {e}"),
1064 })?;
1065 }
1066
1067 let mut set_clauses = Vec::new();
1068 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1069 let mut idx = 1;
1070 for key in &writable_keys {
1071 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1072 values.push(json_to_sql(&obj[key.as_str()]));
1073 idx += 1;
1074 }
1075
1076 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1080 let old_row = if searchable {
1081 self.get_by_id_with_conn(&conn, entity, id)?
1082 } else {
1083 None
1084 };
1085
1086 values.push(Box::new(id.to_string()));
1087 let sql = format!(
1088 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1089 quote_ident(entity),
1090 set_clauses.join(", ")
1091 );
1092
1093 let params: Vec<&dyn rusqlite::types::ToSql> =
1094 values.iter().map(|v| v.as_ref()).collect();
1095 let affected = conn
1096 .execute(&sql, params.as_slice())
1097 .map_err(|e| RuntimeError {
1098 code: "UPDATE_FAILED".into(),
1099 message: format!("Update {entity}/{id} failed: {e}"),
1100 })? as i64;
1101
1102 if affected > 0 && searchable {
1103 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1104 pylon_storage::search_maintenance::apply_update(
1105 &conn, entity, id, &old, data, cfg,
1106 )
1107 .map_err(|e| RuntimeError {
1108 code: "SEARCH_MAINTENANCE_FAILED".into(),
1109 message: format!("search index update on update {entity}: {e}"),
1110 })?;
1111 }
1112 }
1113 Ok(affected)
1114 })?;
1115
1116 Ok(affected > 0)
1117 }
1118
1119 pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
1121 if let Some(pg) = self.pg_backend() {
1122 let ent = self.require_entity(entity)?;
1123 if ent.crdt {
1124 let result = pg.store.with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
1129 tx.execute(
1130 "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
1131 &[&entity, &id],
1132 )
1133 .map_err(|e| RuntimeError {
1134 code: "CRDT_SIDECAR_DELETE_FAILED".into(),
1135 message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
1136 })?;
1137 pylon_storage::pg_tx_store::tx_delete(tx, &self.manifest, entity, id)
1138 .map_err(data_err_to_runtime)
1139 });
1140 if result.is_ok() {
1146 pg.crdt.evict(entity, id);
1147 }
1148 return result;
1149 }
1150 return pylon_http::DataStore::delete(&pg.store, entity, id)
1151 .map_err(data_err_to_runtime);
1152 }
1153 let ent = self.require_entity(entity)?;
1154 let conn = self.lock_write_conn()?;
1155
1156 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1159 if searchable {
1160 if let (Some(cfg), Ok(Some(row))) = (
1161 ent.search.as_ref(),
1162 self.get_by_id_with_conn(&conn, entity, id),
1163 ) {
1164 pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
1165 .map_err(|e| RuntimeError {
1166 code: "SEARCH_MAINTENANCE_FAILED".into(),
1167 message: format!("search index update on delete {entity}: {e}"),
1168 })?;
1169 }
1170 }
1171
1172 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1173 let affected = conn
1174 .execute(&sql, rusqlite::params![id])
1175 .map_err(|e| RuntimeError {
1176 code: "DELETE_FAILED".into(),
1177 message: format!("Delete {entity}/{id} failed: {e}"),
1178 })?;
1179
1180 Ok(affected > 0)
1181 }
1182
1183 pub fn lookup(
1185 &self,
1186 entity: &str,
1187 field: &str,
1188 value: &str,
1189 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1190 if let Some(pg) = self.pg_backend() {
1191 return pylon_http::DataStore::lookup(&pg.store, entity, field, value)
1192 .map_err(data_err_to_runtime);
1193 }
1194 let ent = self.require_entity(entity)?;
1195 validate_column_name(field, ent)?;
1196 let conn = self.lock_read_conn()?;
1197
1198 let sql = format!(
1199 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1200 quote_ident(entity),
1201 quote_ident(field)
1202 );
1203 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1204
1205 let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1206 stmt.query_row(rusqlite::params![value], |row| {
1207 Ok(row_to_json(row, &columns))
1208 })
1209 .ok()
1210 });
1211
1212 Ok(result)
1213 }
1214
1215 pub fn link(
1217 &self,
1218 entity: &str,
1219 id: &str,
1220 relation: &str,
1221 target_id: &str,
1222 ) -> Result<bool, RuntimeError> {
1223 let ent = self.require_entity(entity)?;
1224
1225 let rel = ent
1227 .relations
1228 .iter()
1229 .find(|r| r.name == relation)
1230 .ok_or_else(|| RuntimeError {
1231 code: "RELATION_NOT_FOUND".into(),
1232 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1233 })?;
1234
1235 let data = serde_json::json!({ rel.field.clone(): target_id });
1236 self.update(entity, id, &data)
1237 }
1238
1239 pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
1241 let ent = self.require_entity(entity)?;
1242
1243 let rel = ent
1244 .relations
1245 .iter()
1246 .find(|r| r.name == relation)
1247 .ok_or_else(|| RuntimeError {
1248 code: "RELATION_NOT_FOUND".into(),
1249 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1250 })?;
1251
1252 let data = serde_json::json!({ rel.field.clone(): null });
1253 self.update(entity, id, &data)
1254 }
1255
1256 pub fn query_filtered(
1258 &self,
1259 entity: &str,
1260 filter: &serde_json::Value,
1261 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1262 if let Some(pg) = self.pg_backend() {
1263 return pylon_http::DataStore::query_filtered(&pg.store, entity, filter)
1264 .map_err(data_err_to_runtime);
1265 }
1266 let ent = self.require_entity(entity)?;
1267 let conn = self.lock_read_conn()?;
1268
1269 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1270 let obj = filter
1271 .as_object()
1272 .unwrap_or(&serde_json::Map::new())
1273 .clone();
1274
1275 let mut where_clauses = Vec::new();
1276 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1277 let mut order_clause = String::new();
1278 let mut limit_clause = String::new();
1279 let mut join_clause = String::new();
1280 let mut fts_order = false;
1281 let mut idx = 1;
1282
1283 for (key, val) in &obj {
1284 match key.as_str() {
1285 "$order" => {
1286 if let Some(order_obj) = val.as_object() {
1287 let mut parts: Vec<String> = Vec::new();
1288 for (col, dir) in order_obj {
1289 validate_column_name(col, ent)?;
1290 let d = match dir.as_str().unwrap_or("asc") {
1291 "desc" | "DESC" => "DESC",
1292 _ => "ASC",
1293 };
1294 parts.push(format!("{} {d}", quote_ident(col)));
1295 }
1296 if !parts.is_empty() {
1297 order_clause = format!(" ORDER BY {}", parts.join(", "));
1298 }
1299 }
1300 }
1301 "$limit" => {
1302 if let Some(n) = val.as_u64() {
1303 limit_clause = format!(" LIMIT {n}");
1304 }
1305 }
1306 "$offset" => {
1307 if let Some(n) = val.as_u64() {
1308 if limit_clause.is_empty() {
1310 limit_clause = " LIMIT -1".into();
1311 }
1312 limit_clause = format!("{limit_clause} OFFSET {n}");
1313 }
1314 }
1315 "$search" => {
1316 if let Some(q) = val.as_str() {
1317 let fts = format!("{}_fts", entity);
1319 join_clause = format!(
1320 " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
1321 fts = quote_ident(&fts),
1322 ent = quote_ident(entity),
1323 );
1324 where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
1325 values.push(Box::new(q.to_string()));
1326 fts_order = true;
1327 idx += 1;
1328 }
1329 }
1330 _ => {
1331 validate_column_name(key, ent)?;
1332 let quoted_key = quote_ident(key);
1333
1334 if let Some(op_obj) = val.as_object() {
1335 for (op, op_val) in op_obj {
1336 match op.as_str() {
1337 "$not" => {
1338 where_clauses.push(format!("{quoted_key} != ?{idx}"));
1339 values.push(json_to_sql(op_val));
1340 idx += 1;
1341 }
1342 "$gt" => {
1343 where_clauses.push(format!("{quoted_key} > ?{idx}"));
1344 values.push(json_to_sql(op_val));
1345 idx += 1;
1346 }
1347 "$gte" => {
1348 where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1349 values.push(json_to_sql(op_val));
1350 idx += 1;
1351 }
1352 "$lt" => {
1353 where_clauses.push(format!("{quoted_key} < ?{idx}"));
1354 values.push(json_to_sql(op_val));
1355 idx += 1;
1356 }
1357 "$lte" => {
1358 where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1359 values.push(json_to_sql(op_val));
1360 idx += 1;
1361 }
1362 "$like" => {
1363 where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1364 let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1365 values.push(Box::new(pattern));
1366 idx += 1;
1367 }
1368 "$in" => {
1369 if let Some(arr) = op_val.as_array() {
1370 if arr.is_empty() {
1371 where_clauses.push("0".into());
1380 } else {
1381 let placeholders: Vec<String> = arr
1382 .iter()
1383 .map(|v| {
1384 let p = format!("?{idx}");
1385 values.push(json_to_sql(v));
1386 idx += 1;
1387 p
1388 })
1389 .collect();
1390 where_clauses.push(format!(
1391 "{quoted_key} IN ({})",
1392 placeholders.join(", ")
1393 ));
1394 }
1395 }
1396 }
1397 _ => {}
1398 }
1399 }
1400 } else {
1401 where_clauses.push(format!("{quoted_key} = ?{idx}"));
1403 values.push(json_to_sql(val));
1404 idx += 1;
1405 }
1406 }
1407 }
1408 }
1409
1410 let where_sql = if where_clauses.is_empty() {
1411 String::new()
1412 } else {
1413 format!(" WHERE {}", where_clauses.join(" AND "))
1414 };
1415
1416 if order_clause.is_empty() {
1417 order_clause = if fts_order {
1418 " ORDER BY bm25(".to_string() + "e_ident(&format!("{}_fts", entity)) + ")"
1420 } else {
1421 format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1422 };
1423 }
1424
1425 let select_prefix = format!("{}.*", quote_ident(entity));
1426 let sql = format!(
1427 "SELECT {} FROM {}{}{}{}{}",
1428 select_prefix,
1429 quote_ident(entity),
1430 join_clause,
1431 where_sql,
1432 order_clause,
1433 limit_clause
1434 );
1435 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1436 values.iter().map(|v| v.as_ref()).collect();
1437
1438 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1439 code: "QUERY_FAILED".into(),
1440 message: format!("Failed to prepare filtered query: {e}"),
1441 })?;
1442
1443 let rows = stmt
1444 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1445 .map_err(|e| RuntimeError {
1446 code: "QUERY_FAILED".into(),
1447 message: format!("Filtered query failed: {e}"),
1448 })?;
1449
1450 let mut result = Vec::new();
1451 for row in rows {
1452 if let Ok(val) = row {
1453 result.push(val);
1454 }
1455 }
1456 Ok(result)
1457 }
1458
1459 pub fn query_graph(
1464 &self,
1465 query: &serde_json::Value,
1466 ) -> Result<serde_json::Value, RuntimeError> {
1467 let obj = query.as_object().ok_or_else(|| RuntimeError {
1468 code: "INVALID_QUERY".into(),
1469 message: "Graph query must be a JSON object".into(),
1470 })?;
1471
1472 let mut results = serde_json::Map::new();
1473
1474 for (entity_name, query_opts) in obj {
1475 let _ent = self.require_entity(entity_name)?;
1476
1477 let filter = query_opts
1479 .get("where")
1480 .cloned()
1481 .unwrap_or(serde_json::json!({}));
1482 let rows = self.query_filtered(entity_name, &filter)?;
1483
1484 let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1486 {
1487 let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1493 code: "INVARIANT_BROKEN".into(),
1494 message: format!(
1495 "entity \"{entity_name}\" missing from registry during include expansion"
1496 ),
1497 })?;
1498 rows.into_iter()
1499 .map(|mut row| {
1500 for (rel_name, _sub_query) in include {
1501 if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1502 let fk_value = row
1503 .get(&rel.field)
1504 .and_then(|v| v.as_str())
1505 .map(|s| s.to_string());
1506 if let Some(fk) = fk_value {
1507 if rel.many {
1508 let sub_filter = serde_json::json!({ &rel.field: &fk });
1510 if let Ok(related) =
1511 self.query_filtered(&rel.target, &sub_filter)
1512 {
1513 row[rel_name] = serde_json::json!(related);
1514 }
1515 } else {
1516 if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1518 {
1519 row[rel_name] = related;
1520 }
1521 }
1522 }
1523 }
1524 }
1525 row
1526 })
1527 .collect()
1528 } else {
1529 rows
1530 };
1531
1532 let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1534 rows.into_iter().take(limit as usize).collect()
1535 } else {
1536 rows
1537 };
1538
1539 results.insert(entity_name.clone(), serde_json::json!(rows));
1540 }
1541
1542 Ok(serde_json::Value::Object(results))
1543 }
1544
1545 pub fn insert_with_conn(
1551 &self,
1552 conn: &Connection,
1553 entity: &str,
1554 data: &serde_json::Value,
1555 ) -> Result<String, RuntimeError> {
1556 let ent = self.require_entity(entity)?;
1557 let id = generate_id();
1558 let obj = data.as_object().ok_or_else(|| RuntimeError {
1559 code: "INVALID_DATA".into(),
1560 message: "Insert data must be a JSON object".into(),
1561 })?;
1562
1563 let mut col_names = vec![quote_ident("id")];
1564 let mut placeholders = vec!["?1".to_string()];
1565 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1566 let mut idx = 2;
1567 for (key, val) in obj {
1568 if key == "id" {
1569 continue;
1570 }
1571 validate_column_name(key, ent)?;
1572 col_names.push(quote_ident(key));
1573 placeholders.push(format!("?{idx}"));
1574 values.push(json_to_sql(val));
1575 idx += 1;
1576 }
1577
1578 let sql = format!(
1579 "INSERT INTO {} ({}) VALUES ({})",
1580 quote_ident(entity),
1581 col_names.join(", "),
1582 placeholders.join(", ")
1583 );
1584 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1585 conn.execute(&sql, params.as_slice())
1586 .map_err(|e| RuntimeError {
1587 code: "INSERT_FAILED".into(),
1588 message: format!("Insert into {entity} failed: {e}"),
1589 })?;
1590
1591 if let Some(cfg) = ent.search.as_ref() {
1594 if !cfg.is_empty() {
1595 pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1596 .map_err(|e| RuntimeError {
1597 code: "SEARCH_MAINTENANCE_FAILED".into(),
1598 message: format!("search index update on insert {entity}: {e}"),
1599 })?;
1600 }
1601 }
1602
1603 Ok(id)
1604 }
1605
1606 pub fn update_with_conn(
1608 &self,
1609 conn: &Connection,
1610 entity: &str,
1611 id: &str,
1612 data: &serde_json::Value,
1613 ) -> Result<bool, RuntimeError> {
1614 let ent = self.require_entity(entity)?;
1615 let obj = data.as_object().ok_or_else(|| RuntimeError {
1616 code: "INVALID_DATA".into(),
1617 message: "Update data must be a JSON object".into(),
1618 })?;
1619
1620 let mut set_clauses = Vec::new();
1621 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1622 let mut idx = 1;
1623 for (key, val) in obj {
1624 if key == "id" {
1625 continue;
1626 }
1627 validate_column_name(key, ent)?;
1628 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1629 values.push(json_to_sql(val));
1630 idx += 1;
1631 }
1632 if set_clauses.is_empty() {
1633 return Ok(false);
1634 }
1635
1636 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1641 let old_row = if searchable {
1642 self.get_by_id_with_conn(conn, entity, id)?
1643 } else {
1644 None
1645 };
1646
1647 values.push(Box::new(id.to_string()));
1648 let sql = format!(
1649 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1650 quote_ident(entity),
1651 set_clauses.join(", ")
1652 );
1653 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1654 let affected = conn
1655 .execute(&sql, params.as_slice())
1656 .map_err(|e| RuntimeError {
1657 code: "UPDATE_FAILED".into(),
1658 message: format!("Update {entity}/{id} failed: {e}"),
1659 })?;
1660
1661 if affected > 0 && searchable {
1662 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1663 pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1664 .map_err(|e| RuntimeError {
1665 code: "SEARCH_MAINTENANCE_FAILED".into(),
1666 message: format!("search index update on update {entity}: {e}"),
1667 })?;
1668 }
1669 }
1670
1671 Ok(affected > 0)
1672 }
1673
1674 pub fn delete_with_conn(
1676 &self,
1677 conn: &Connection,
1678 entity: &str,
1679 id: &str,
1680 ) -> Result<bool, RuntimeError> {
1681 let ent = self.require_entity(entity)?;
1682
1683 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1686 if searchable {
1687 if let (Some(cfg), Ok(Some(row))) = (
1688 ent.search.as_ref(),
1689 self.get_by_id_with_conn(conn, entity, id),
1690 ) {
1691 pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1692 .map_err(|e| RuntimeError {
1693 code: "SEARCH_MAINTENANCE_FAILED".into(),
1694 message: format!("search index update on delete {entity}: {e}"),
1695 })?;
1696 }
1697 }
1698
1699 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1700 let affected = conn
1701 .execute(&sql, rusqlite::params![id])
1702 .map_err(|e| RuntimeError {
1703 code: "DELETE_FAILED".into(),
1704 message: format!("Delete {entity}/{id} failed: {e}"),
1705 })?;
1706 Ok(affected > 0)
1707 }
1708
1709 pub fn get_by_id_with_conn(
1711 &self,
1712 conn: &Connection,
1713 entity: &str,
1714 id: &str,
1715 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1716 let ent = self.require_entity(entity)?;
1717 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1718 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1719 code: "QUERY_FAILED".into(),
1720 message: format!("Failed to prepare query: {e}"),
1721 })?;
1722 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1723 Ok(stmt
1724 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1725 .ok())
1726 }
1727
1728 pub fn list_with_conn(
1730 &self,
1731 conn: &Connection,
1732 entity: &str,
1733 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1734 let ent = self.require_entity(entity)?;
1735 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1736 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1737 code: "QUERY_FAILED".into(),
1738 message: format!("Failed to prepare query: {e}"),
1739 })?;
1740 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1741 let rows = stmt
1742 .query_map([], |row| Ok(row_to_json(row, &columns)))
1743 .map_err(|e| RuntimeError {
1744 code: "QUERY_FAILED".into(),
1745 message: format!("Query failed: {e}"),
1746 })?;
1747 Ok(rows.flatten().collect())
1748 }
1749
1750 pub fn list_after_with_conn(
1752 &self,
1753 conn: &Connection,
1754 entity: &str,
1755 after: Option<&str>,
1756 limit: usize,
1757 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1758 let ent = self.require_entity(entity)?;
1759 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1760 let table = quote_ident(entity);
1761 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1762 Some(cursor) => (
1763 format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1764 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1765 ),
1766 None => (
1767 format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1768 vec![Box::new(limit as i64)],
1769 ),
1770 };
1771 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1772 params.iter().map(|v| v.as_ref()).collect();
1773 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1774 code: "QUERY_FAILED".into(),
1775 message: format!("Failed to prepare: {e}"),
1776 })?;
1777 let rows = stmt
1778 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1779 .map_err(|e| RuntimeError {
1780 code: "QUERY_FAILED".into(),
1781 message: format!("Query failed: {e}"),
1782 })?;
1783 Ok(rows.flatten().collect())
1784 }
1785
1786 pub fn lookup_with_conn(
1788 &self,
1789 conn: &Connection,
1790 entity: &str,
1791 field: &str,
1792 value: &str,
1793 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1794 let ent = self.require_entity(entity)?;
1795 validate_column_name(field, ent)?;
1796 let sql = format!(
1797 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1798 quote_ident(entity),
1799 quote_ident(field)
1800 );
1801 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1802 Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1803 stmt.query_row(rusqlite::params![value], |row| {
1804 Ok(row_to_json(row, &columns))
1805 })
1806 .ok()
1807 }))
1808 }
1809
1810 pub fn link_with_conn(
1812 &self,
1813 conn: &Connection,
1814 entity: &str,
1815 id: &str,
1816 relation: &str,
1817 target_id: &str,
1818 ) -> Result<bool, RuntimeError> {
1819 let ent = self.require_entity(entity)?;
1820 let rel = ent
1821 .relations
1822 .iter()
1823 .find(|r| r.name == relation)
1824 .ok_or_else(|| RuntimeError {
1825 code: "RELATION_NOT_FOUND".into(),
1826 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1827 })?;
1828 let data = serde_json::json!({ rel.field.clone(): target_id });
1829 self.update_with_conn(conn, entity, id, &data)
1830 }
1831
1832 pub fn unlink_with_conn(
1834 &self,
1835 conn: &Connection,
1836 entity: &str,
1837 id: &str,
1838 relation: &str,
1839 ) -> Result<bool, RuntimeError> {
1840 let ent = self.require_entity(entity)?;
1841 let rel = ent
1842 .relations
1843 .iter()
1844 .find(|r| r.name == relation)
1845 .ok_or_else(|| RuntimeError {
1846 code: "RELATION_NOT_FOUND".into(),
1847 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1848 })?;
1849 let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1850 self.update_with_conn(conn, entity, id, &data)
1851 }
1852
1853 pub fn query_filtered_with_conn(
1858 &self,
1859 conn: &Connection,
1860 entity: &str,
1861 filter: &serde_json::Value,
1862 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1863 let ent = self.require_entity(entity)?;
1864 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1865 let empty = serde_json::Map::new();
1866 let obj = filter.as_object().unwrap_or(&empty);
1867
1868 let mut where_clauses = Vec::new();
1869 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1870 let mut order_clause = String::new();
1871 let mut limit_clause = String::new();
1872 let mut idx = 1;
1873
1874 for (key, val) in obj {
1875 match key.as_str() {
1876 "$order" => {
1877 if let Some(o) = val.as_object() {
1878 let mut parts: Vec<String> = Vec::new();
1879 for (col, dir) in o {
1880 validate_column_name(col, ent)?;
1881 let d = match dir.as_str().unwrap_or("asc") {
1882 "desc" | "DESC" => "DESC",
1883 _ => "ASC",
1884 };
1885 parts.push(format!("{} {d}", quote_ident(col)));
1886 }
1887 if !parts.is_empty() {
1888 order_clause = format!(" ORDER BY {}", parts.join(", "));
1889 }
1890 }
1891 }
1892 "$limit" => {
1893 if let Some(n) = val.as_u64() {
1894 limit_clause = format!(" LIMIT {n}");
1895 }
1896 }
1897 "$offset" => {
1898 if let Some(n) = val.as_u64() {
1899 if limit_clause.is_empty() {
1900 limit_clause = " LIMIT -1".into();
1901 }
1902 limit_clause = format!("{limit_clause} OFFSET {n}");
1903 }
1904 }
1905 _ => {
1906 validate_column_name(key, ent)?;
1907 let qk = quote_ident(key);
1908 if let Some(op_obj) = val.as_object() {
1909 for (op, op_val) in op_obj {
1910 match op.as_str() {
1911 "$not" => {
1912 where_clauses.push(format!("{qk} != ?{idx}"));
1913 values.push(json_to_sql(op_val));
1914 idx += 1;
1915 }
1916 "$gt" => {
1917 where_clauses.push(format!("{qk} > ?{idx}"));
1918 values.push(json_to_sql(op_val));
1919 idx += 1;
1920 }
1921 "$gte" => {
1922 where_clauses.push(format!("{qk} >= ?{idx}"));
1923 values.push(json_to_sql(op_val));
1924 idx += 1;
1925 }
1926 "$lt" => {
1927 where_clauses.push(format!("{qk} < ?{idx}"));
1928 values.push(json_to_sql(op_val));
1929 idx += 1;
1930 }
1931 "$lte" => {
1932 where_clauses.push(format!("{qk} <= ?{idx}"));
1933 values.push(json_to_sql(op_val));
1934 idx += 1;
1935 }
1936 "$like" => {
1937 where_clauses.push(format!("{qk} LIKE ?{idx}"));
1938 let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1939 values.push(Box::new(p));
1940 idx += 1;
1941 }
1942 "$in" => {
1943 if let Some(arr) = op_val.as_array() {
1944 let ph: Vec<String> = arr
1945 .iter()
1946 .map(|v| {
1947 let p = format!("?{idx}");
1948 values.push(json_to_sql(v));
1949 idx += 1;
1950 p
1951 })
1952 .collect();
1953 if !ph.is_empty() {
1954 where_clauses
1955 .push(format!("{qk} IN ({})", ph.join(", ")));
1956 }
1957 }
1958 }
1959 _ => {}
1960 }
1961 }
1962 } else {
1963 where_clauses.push(format!("{qk} = ?{idx}"));
1964 values.push(json_to_sql(val));
1965 idx += 1;
1966 }
1967 }
1968 }
1969 }
1970
1971 let where_sql = if where_clauses.is_empty() {
1972 String::new()
1973 } else {
1974 format!(" WHERE {}", where_clauses.join(" AND "))
1975 };
1976 if order_clause.is_empty() {
1977 order_clause = " ORDER BY \"id\"".into();
1978 }
1979
1980 let sql = format!(
1981 "SELECT * FROM {}{}{}{}",
1982 quote_ident(entity),
1983 where_sql,
1984 order_clause,
1985 limit_clause
1986 );
1987 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1988 values.iter().map(|v| v.as_ref()).collect();
1989 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1990 code: "QUERY_FAILED".into(),
1991 message: format!("Failed to prepare: {e}"),
1992 })?;
1993 let rows = stmt
1994 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1995 .map_err(|e| RuntimeError {
1996 code: "QUERY_FAILED".into(),
1997 message: format!("Query failed: {e}"),
1998 })?;
1999 Ok(rows.flatten().collect())
2000 }
2001
2002 pub fn query_graph_with_conn(
2004 &self,
2005 conn: &Connection,
2006 query: &serde_json::Value,
2007 ) -> Result<serde_json::Value, RuntimeError> {
2008 let obj = query.as_object().ok_or_else(|| RuntimeError {
2009 code: "INVALID_QUERY".into(),
2010 message: "Graph query must be a JSON object".into(),
2011 })?;
2012 let mut results = serde_json::Map::new();
2013 for (entity_name, query_opts) in obj {
2014 let _ent = self.require_entity(entity_name)?;
2015 let filter = query_opts
2016 .get("where")
2017 .cloned()
2018 .unwrap_or(serde_json::json!({}));
2019 let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
2020 results.insert(entity_name.clone(), serde_json::json!(rows));
2021 }
2022 Ok(serde_json::Value::Object(results))
2023 }
2024
2025 pub fn aggregate(
2032 &self,
2033 entity: &str,
2034 spec: &serde_json::Value,
2035 ) -> Result<serde_json::Value, RuntimeError> {
2036 if let Some(pg) = self.pg_backend() {
2037 return pylon_http::DataStore::aggregate(&pg.store, entity, spec)
2038 .map_err(data_err_to_runtime);
2039 }
2040 let ent = self.require_entity(entity)?;
2041 let conn = self.lock_read_conn()?;
2042 let obj = spec.as_object().ok_or_else(|| RuntimeError {
2043 code: "INVALID_QUERY".into(),
2044 message: "aggregate spec must be an object".into(),
2045 })?;
2046
2047 let mut select_parts: Vec<String> = Vec::new();
2049 let mut result_fields: Vec<String> = Vec::new();
2050
2051 if let Some(count) = obj.get("count") {
2052 match count {
2053 serde_json::Value::String(s) if s == "*" => {
2054 select_parts.push("COUNT(*) AS count".into());
2055 result_fields.push("count".into());
2056 }
2057 serde_json::Value::String(field) => {
2058 validate_column_name(field, ent)?;
2059 let alias = format!("count_{field}");
2060 select_parts.push(format!(
2061 "COUNT({}) AS {}",
2062 quote_ident(field),
2063 quote_ident(&alias)
2064 ));
2065 result_fields.push(alias);
2066 }
2067 _ => {}
2068 }
2069 }
2070
2071 for (fn_name, alias_prefix) in [
2072 ("sum", "sum_"),
2073 ("avg", "avg_"),
2074 ("min", "min_"),
2075 ("max", "max_"),
2076 ] {
2077 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
2078 for field in fields {
2079 if let Some(f) = field.as_str() {
2080 validate_column_name(f, ent)?;
2081 let alias = format!("{alias_prefix}{f}");
2082 let sql_fn = fn_name.to_uppercase();
2083 select_parts.push(format!(
2084 "{}({}) AS {}",
2085 sql_fn,
2086 quote_ident(f),
2087 quote_ident(&alias)
2088 ));
2089 result_fields.push(alias);
2090 }
2091 }
2092 }
2093 }
2094
2095 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
2100 for field in fields {
2101 if let Some(f) = field.as_str() {
2102 validate_column_name(f, ent)?;
2103 let alias = format!("count_distinct_{f}");
2104 select_parts.push(format!(
2105 "COUNT(DISTINCT {}) AS {}",
2106 quote_ident(f),
2107 quote_ident(&alias)
2108 ));
2109 result_fields.push(alias);
2110 }
2111 }
2112 }
2113
2114 let mut group_by: Vec<String> = Vec::new();
2120 let mut group_select: Vec<String> = Vec::new();
2121 let mut group_field_names: Vec<String> = Vec::new();
2122 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
2123 for g in groups {
2124 if let Some(f) = g.as_str() {
2125 validate_column_name(f, ent)?;
2126 let quoted = quote_ident(f);
2127 group_by.push(quoted.clone());
2128 group_select.push(quoted);
2129 group_field_names.push(f.to_string());
2130 } else if let Some(spec) = g.as_object() {
2131 let field =
2132 spec.get("field")
2133 .and_then(|v| v.as_str())
2134 .ok_or_else(|| RuntimeError {
2135 code: "INVALID_QUERY".into(),
2136 message: "groupBy object spec requires `field`".into(),
2137 })?;
2138 validate_column_name(field, ent)?;
2139 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
2140 let fmt = match bucket {
2141 "hour" => "%Y-%m-%d %H:00:00",
2142 "day" => "%Y-%m-%d",
2143 "month" => "%Y-%m",
2144 "year" => "%Y",
2145 "week" => "%Y-W%W",
2146 _ => {
2147 return Err(RuntimeError {
2148 code: "INVALID_QUERY".into(),
2149 message: format!(
2150 "bucket must be one of hour/day/week/month/year, got {bucket}"
2151 ),
2152 });
2153 }
2154 };
2155 let alias = format!("{field}_{bucket}");
2156 let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
2157 group_by.push(expr.clone());
2158 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
2159 group_field_names.push(alias);
2160 }
2161 }
2162 }
2163 let mut full_select = group_select.clone();
2164 full_select.extend(select_parts.iter().cloned());
2165 if full_select.is_empty() {
2166 return Err(RuntimeError {
2167 code: "INVALID_QUERY".into(),
2168 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
2169 });
2170 }
2171
2172 let mut where_clauses = Vec::new();
2174 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
2175 let mut idx = 1;
2176 if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
2177 for (k, v) in where_obj {
2178 validate_column_name(k, ent)?;
2179 where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
2180 values.push(json_to_sql(v));
2181 idx += 1;
2182 }
2183 }
2184 let where_sql = if where_clauses.is_empty() {
2185 String::new()
2186 } else {
2187 format!(" WHERE {}", where_clauses.join(" AND "))
2188 };
2189
2190 let group_sql = if group_by.is_empty() {
2191 String::new()
2192 } else {
2193 format!(" GROUP BY {}", group_by.join(", "))
2194 };
2195
2196 let sql = format!(
2197 "SELECT {} FROM {}{}{}",
2198 full_select.join(", "),
2199 quote_ident(entity),
2200 where_sql,
2201 group_sql
2202 );
2203
2204 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
2205 values.iter().map(|v| v.as_ref()).collect();
2206 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
2207 code: "QUERY_FAILED".into(),
2208 message: format!("Failed to prepare aggregate: {e}"),
2209 })?;
2210
2211 let column_names: Vec<String> = {
2212 let mut v = group_field_names.clone();
2213 v.extend(result_fields.iter().cloned());
2214 v
2215 };
2216
2217 let rows = stmt
2218 .query_map(param_refs.as_slice(), |row| {
2219 let mut obj = serde_json::Map::new();
2220 for (i, name) in column_names.iter().enumerate() {
2221 if let Ok(n) = row.get::<_, i64>(i) {
2223 obj.insert(name.clone(), serde_json::Value::Number(n.into()));
2224 } else if let Ok(f) = row.get::<_, f64>(i) {
2225 if let Some(num) = serde_json::Number::from_f64(f) {
2226 obj.insert(name.clone(), serde_json::Value::Number(num));
2227 } else {
2228 obj.insert(name.clone(), serde_json::Value::Null);
2229 }
2230 } else if let Ok(s) = row.get::<_, String>(i) {
2231 obj.insert(name.clone(), serde_json::Value::String(s));
2232 } else {
2233 obj.insert(name.clone(), serde_json::Value::Null);
2234 }
2235 }
2236 Ok(serde_json::Value::Object(obj))
2237 })
2238 .map_err(|e| RuntimeError {
2239 code: "QUERY_FAILED".into(),
2240 message: format!("Aggregate failed: {e}"),
2241 })?;
2242
2243 let mut result = Vec::new();
2244 for row in rows {
2245 if let Ok(val) = row {
2246 result.push(val);
2247 }
2248 }
2249 Ok(serde_json::json!({ "rows": result }))
2250 }
2251
2252 fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
2257 self.entities.get(name).ok_or_else(|| RuntimeError {
2258 code: "ENTITY_NOT_FOUND".into(),
2259 message: format!("Unknown entity: \"{name}\""),
2260 })
2261 }
2262
2263 fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
2269 let sb = self.sqlite_backend()?;
2270 sb.write_conn.lock().map_err(|e| RuntimeError {
2271 code: "LOCK_FAILED".into(),
2272 message: format!("Failed to acquire write connection lock: {e}"),
2273 })
2274 }
2275
2276 fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
2280 let sb = self.sqlite_backend()?;
2281 if !sb.read_pool.is_empty() {
2282 let idx = sb.read_counter.fetch_add(1, Ordering::Relaxed) % sb.read_pool.len();
2283 let guard = sb.read_pool[idx].lock().map_err(|e| RuntimeError {
2284 code: "LOCK_FAILED".into(),
2285 message: format!("Failed to acquire read connection: {e}"),
2286 })?;
2287 Ok(ReadConnGuard::Pooled(guard))
2288 } else {
2289 let guard = sb.write_conn.lock().map_err(|e| RuntimeError {
2291 code: "LOCK_FAILED".into(),
2292 message: format!("Failed to acquire connection: {e}"),
2293 })?;
2294 Ok(ReadConnGuard::Write(guard))
2295 }
2296 }
2297
2298 fn sqlite_backend(&self) -> Result<&SqliteBackend, RuntimeError> {
2302 match &self.backend {
2303 RuntimeBackend::Sqlite(sb) => Ok(sb),
2304 RuntimeBackend::Postgres(_) => Err(RuntimeError {
2305 code: "NOT_SQLITE_BACKEND".into(),
2306 message: "this operation requires a SQLite-backed Runtime".into(),
2307 }),
2308 }
2309 }
2310
2311 pub(crate) fn pg_backend(&self) -> Option<&PgBackend> {
2316 match &self.backend {
2317 RuntimeBackend::Sqlite(_) => None,
2318 RuntimeBackend::Postgres(pg) => Some(pg),
2319 }
2320 }
2321
2322 pub fn pg_data_store_pub(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2332 self.pg_data_store()
2333 }
2334
2335 #[doc(hidden)]
2336 pub fn pg_data_store_for_tests(&self) -> &pylon_storage::pg_datastore::PostgresDataStore {
2337 self.pg_data_store().expect("pg backend")
2338 }
2339
2340 #[doc(hidden)]
2345 pub fn run_in_pg_mutation_tx_for_tests<F, T, E>(&self, body: F) -> Result<T, E>
2346 where
2347 F: FnOnce(&dyn pylon_http::DataStore) -> Result<T, E>,
2348 E: From<pylon_http::DataError>,
2349 {
2350 let pg_backend = self.pg_backend().expect("pg backend");
2351 let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
2352 std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2353 crdt: std::sync::Arc::clone(&pg_backend.crdt),
2354 manifest: std::sync::Arc::new(self.manifest.clone()),
2355 });
2356 pg_backend
2357 .store
2358 .with_transaction_crdt(crdt_hook, body)
2359 }
2360
2361 pub(crate) fn pg_data_store(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2362 self.pg_backend().map(|pg| &pg.store)
2363 }
2364}
2365
2366fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
2394where
2395 F: FnOnce() -> Result<T, RuntimeError>,
2396{
2397 conn.execute("BEGIN IMMEDIATE", [])
2398 .map_err(|e| RuntimeError {
2399 code: "TX_BEGIN_FAILED".into(),
2400 message: format!("BEGIN: {e}"),
2401 })?;
2402 match body() {
2403 Ok(v) => {
2404 conn.execute("COMMIT", []).map_err(|e| RuntimeError {
2405 code: "TX_COMMIT_FAILED".into(),
2406 message: format!("COMMIT: {e}"),
2407 })?;
2408 Ok(v)
2409 }
2410 Err(e) => {
2411 let _ = conn.execute("ROLLBACK", []);
2414 Err(e)
2415 }
2416 }
2417}
2418
2419fn generate_id() -> String {
2420 use std::sync::atomic::{AtomicU32, Ordering};
2421 use std::time::{SystemTime, UNIX_EPOCH};
2422 static COUNTER: AtomicU32 = AtomicU32::new(0);
2423 let nanos = SystemTime::now()
2424 .duration_since(UNIX_EPOCH)
2425 .unwrap_or_default()
2426 .as_nanos();
2427 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2428 format!("{nanos:032x}{seq:08x}")
2429}
2430
2431fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2433 match val {
2434 serde_json::Value::Null => Box::new(rusqlite::types::Null),
2435 serde_json::Value::Bool(b) => Box::new(*b as i32),
2436 serde_json::Value::Number(n) => {
2437 if let Some(i) = n.as_i64() {
2438 Box::new(i)
2439 } else if let Some(f) = n.as_f64() {
2440 Box::new(f)
2441 } else {
2442 Box::new(n.to_string())
2443 }
2444 }
2445 serde_json::Value::String(s) => Box::new(s.clone()),
2446 other => Box::new(other.to_string()),
2447 }
2448}
2449
2450fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2465 let mut obj = serde_json::Map::new();
2466
2467 let stmt = row.as_ref();
2468 let count = stmt.column_count();
2469 for i in 0..count {
2470 let name = match stmt.column_name(i) {
2474 Ok(n) => n.to_string(),
2475 Err(_) => continue,
2476 };
2477 let value = if let Ok(s) = row.get::<_, String>(i) {
2478 serde_json::Value::String(s)
2479 } else if let Ok(n) = row.get::<_, i64>(i) {
2480 serde_json::Value::Number(serde_json::Number::from(n))
2481 } else if let Ok(f) = row.get::<_, f64>(i) {
2482 serde_json::Number::from_f64(f)
2483 .map(serde_json::Value::Number)
2484 .unwrap_or(serde_json::Value::Null)
2485 } else {
2486 serde_json::Value::Null
2487 };
2488 obj.insert(name, value);
2489 }
2490
2491 serde_json::Value::Object(obj)
2492}
2493
2494#[cfg(test)]
2495mod tests {
2496 use super::*;
2497 use pylon_kernel::{ManifestField, ManifestIndex};
2498
2499 fn test_manifest() -> AppManifest {
2500 AppManifest {
2501 manifest_version: 1,
2502 name: "Test".into(),
2503 version: "0.1.0".into(),
2504 entities: vec![pylon_kernel::ManifestEntity {
2505 name: "User".into(),
2506 fields: vec![
2507 ManifestField {
2508 name: "email".into(),
2509 field_type: "string".into(),
2510 optional: false,
2511 unique: true,
2512 crdt: None,
2513 },
2514 ManifestField {
2515 name: "displayName".into(),
2516 field_type: "string".into(),
2517 optional: false,
2518 unique: false,
2519 crdt: None,
2520 },
2521 ],
2522 indexes: vec![ManifestIndex {
2523 name: "user_email".into(),
2524 fields: vec!["email".into()],
2525 unique: true,
2526 }],
2527 relations: vec![],
2528 search: None,
2529 crdt: true,
2530 }],
2531 routes: vec![],
2532 queries: vec![],
2533 actions: vec![],
2534 policies: vec![],
2535 auth: Default::default(),
2536 }
2537 }
2538
2539 #[test]
2540 fn reset_for_tests_wipes_in_memory() {
2541 let rt = Runtime::in_memory(test_manifest()).unwrap();
2542 rt.insert(
2543 "User",
2544 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2545 )
2546 .unwrap();
2547 assert_eq!(rt.list("User").unwrap().len(), 1);
2548 rt.reset_for_tests().unwrap();
2549 assert_eq!(rt.list("User").unwrap().len(), 0);
2550 }
2551
2552 #[test]
2553 fn reset_for_tests_refuses_file_db() {
2554 let dir = std::env::temp_dir().join("pylon-reset-refuse");
2555 let _ = std::fs::create_dir_all(&dir);
2556 let db_path = dir.join("db.sqlite");
2557 let _ = std::fs::remove_file(&db_path);
2558 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2559 let err = rt.reset_for_tests().unwrap_err();
2560 assert_eq!(err.code, "RESET_REFUSED");
2561 let _ = std::fs::remove_file(&db_path);
2562 }
2563
2564 #[test]
2565 fn insert_and_get() {
2566 let rt = Runtime::in_memory(test_manifest()).unwrap();
2567 let id = rt
2568 .insert(
2569 "User",
2570 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2571 )
2572 .unwrap();
2573 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2574 assert_eq!(row["email"], "a@b.com");
2575 }
2576
2577 #[test]
2586 fn row_to_json_handles_columns_added_out_of_manifest_order() {
2587 let mut manifest = test_manifest();
2589 manifest.entities[0].fields = vec![
2590 ManifestField {
2591 name: "email".into(),
2592 field_type: "string".into(),
2593 optional: false,
2594 unique: true,
2595 crdt: None,
2596 },
2597 ManifestField {
2598 name: "displayName".into(),
2599 field_type: "string".into(),
2600 optional: false,
2601 unique: false,
2602 crdt: None,
2603 },
2604 ManifestField {
2605 name: "avatarColor".into(),
2606 field_type: "string".into(),
2607 optional: true,
2608 unique: false,
2609 crdt: None,
2610 },
2611 ManifestField {
2612 name: "createdAt".into(),
2613 field_type: "datetime".into(),
2614 optional: true,
2615 unique: false,
2616 crdt: None,
2617 },
2618 ];
2619 manifest.entities[0].crdt = false;
2626 let rt = Runtime::in_memory(manifest).unwrap();
2627 let id = rt
2628 .insert(
2629 "User",
2630 &serde_json::json!({
2631 "email": "a@b.com",
2632 "displayName": "Alice",
2633 "avatarColor": "#abc",
2634 "createdAt": "2026-01-01T00:00:00Z",
2635 }),
2636 )
2637 .unwrap();
2638
2639 {
2645 let conn = rt.lock_write_conn().unwrap();
2646 conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2647 .unwrap();
2648 conn.execute(
2649 "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2650 rusqlite::params!["hashed-password", &id],
2651 )
2652 .unwrap();
2653 }
2654 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2660 assert_eq!(row["email"], "a@b.com");
2664 assert_eq!(row["displayName"], "Alice");
2665 assert_eq!(row["avatarColor"], "#abc");
2666 assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2667 assert_eq!(row["passwordHash"], "hashed-password");
2668 }
2669
2670 #[test]
2675 fn crdt_default_writes_through_loro_store() {
2676 let rt = Runtime::in_memory(test_manifest()).unwrap();
2677 let id = rt
2678 .insert(
2679 "User",
2680 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2681 )
2682 .unwrap();
2683
2684 let conn = rt.lock_write_conn().unwrap();
2686 let snap_count: i64 = conn
2687 .query_row(
2688 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2689 WHERE entity = ?1 AND row_id = ?2",
2690 rusqlite::params!["User", &id],
2691 |r| r.get(0),
2692 )
2693 .unwrap();
2694 assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2695
2696 assert!(rt.crdt_store().cached_rows() >= 1);
2699
2700 drop(conn);
2702 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2703 assert_eq!(row["email"], "x@y.com");
2704 assert_eq!(row["displayName"], "Eric");
2705 }
2706
2707 #[test]
2711 fn crdt_update_persists_new_snapshot() {
2712 let rt = Runtime::in_memory(test_manifest()).unwrap();
2713 let id = rt
2714 .insert(
2715 "User",
2716 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2717 )
2718 .unwrap();
2719
2720 let snap_after_insert: Vec<u8> = {
2721 let conn = rt.lock_write_conn().unwrap();
2722 conn.query_row(
2723 "SELECT snapshot FROM _pylon_crdt_snapshots
2724 WHERE entity = 'User' AND row_id = ?1",
2725 rusqlite::params![&id],
2726 |r| r.get(0),
2727 )
2728 .unwrap()
2729 };
2730
2731 rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2732 .unwrap();
2733
2734 let snap_after_update: Vec<u8> = {
2735 let conn = rt.lock_write_conn().unwrap();
2736 conn.query_row(
2737 "SELECT snapshot FROM _pylon_crdt_snapshots
2738 WHERE entity = 'User' AND row_id = ?1",
2739 rusqlite::params![&id],
2740 |r| r.get(0),
2741 )
2742 .unwrap()
2743 };
2744
2745 assert_ne!(
2746 snap_after_insert, snap_after_update,
2747 "snapshot bytes should change after an update"
2748 );
2749
2750 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2751 assert_eq!(row["displayName"], "Eric C");
2752 assert_eq!(row["email"], "x@y.com");
2753 }
2754
2755 #[test]
2762 fn crdt_insert_rolls_back_when_sql_step_fails() {
2763 let rt = Runtime::in_memory(test_manifest()).unwrap();
2764 rt.insert(
2766 "User",
2767 &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2768 )
2769 .unwrap();
2770
2771 let snap_count_before: i64 = {
2773 let conn = rt.lock_write_conn().unwrap();
2774 conn.query_row(
2775 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2776 [],
2777 |r| r.get(0),
2778 )
2779 .unwrap()
2780 };
2781
2782 let err = rt
2784 .insert(
2785 "User",
2786 &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2787 )
2788 .expect_err("duplicate email must fail");
2789 assert_eq!(err.code, "INSERT_FAILED");
2790
2791 let snap_count_after: i64 = {
2794 let conn = rt.lock_write_conn().unwrap();
2795 conn.query_row(
2796 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2797 [],
2798 |r| r.get(0),
2799 )
2800 .unwrap()
2801 };
2802 assert_eq!(
2803 snap_count_after, snap_count_before,
2804 "failed insert should not leave a sidecar snapshot behind"
2805 );
2806 }
2807
2808 #[test]
2811 fn crdt_false_skips_loro_store() {
2812 let mut manifest = test_manifest();
2813 manifest.entities[0].crdt = false;
2815 let rt = Runtime::in_memory(manifest).unwrap();
2816
2817 let id = rt
2818 .insert(
2819 "User",
2820 &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2821 )
2822 .unwrap();
2823
2824 let conn = rt.lock_write_conn().unwrap();
2825 let snap_count: i64 = conn
2826 .query_row(
2827 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2828 WHERE entity = 'User' AND row_id = ?1",
2829 rusqlite::params![&id],
2830 |r| r.get(0),
2831 )
2832 .unwrap();
2833 assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2834 assert_eq!(
2835 rt.crdt_store().cached_rows(),
2836 0,
2837 "crdt:false should not warm the cache"
2838 );
2839
2840 drop(conn);
2843 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2844 assert_eq!(row["email"], "lww@example.com");
2845 }
2846
2847 #[test]
2848 fn list_entities() {
2849 let rt = Runtime::in_memory(test_manifest()).unwrap();
2850 rt.insert(
2851 "User",
2852 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2853 )
2854 .unwrap();
2855 rt.insert(
2856 "User",
2857 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2858 )
2859 .unwrap();
2860 let rows = rt.list("User").unwrap();
2861 assert_eq!(rows.len(), 2);
2862 }
2863
2864 #[test]
2865 fn update_entity() {
2866 let rt = Runtime::in_memory(test_manifest()).unwrap();
2867 let id = rt
2868 .insert(
2869 "User",
2870 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2871 )
2872 .unwrap();
2873 let updated = rt
2874 .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2875 .unwrap();
2876 assert!(updated);
2877 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2878 assert_eq!(row["displayName"], "Updated");
2879 }
2880
2881 #[test]
2882 fn delete_entity() {
2883 let rt = Runtime::in_memory(test_manifest()).unwrap();
2884 let id = rt
2885 .insert(
2886 "User",
2887 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2888 )
2889 .unwrap();
2890 let deleted = rt.delete("User", &id).unwrap();
2891 assert!(deleted);
2892 assert!(rt.get_by_id("User", &id).unwrap().is_none());
2893 }
2894
2895 #[test]
2896 fn lookup_by_field() {
2897 let rt = Runtime::in_memory(test_manifest()).unwrap();
2898 rt.insert(
2899 "User",
2900 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2901 )
2902 .unwrap();
2903 let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2904 assert_eq!(row["displayName"], "A");
2905 }
2906
2907 #[test]
2908 fn unknown_entity_returns_error() {
2909 let rt = Runtime::in_memory(test_manifest()).unwrap();
2910 let err = rt.list("Nonexistent").unwrap_err();
2911 assert_eq!(err.code, "ENTITY_NOT_FOUND");
2912 }
2913
2914 #[test]
2915 fn insert_rejects_unknown_column() {
2916 let rt = Runtime::in_memory(test_manifest()).unwrap();
2917 let err = rt
2918 .insert(
2919 "User",
2920 &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2921 )
2922 .unwrap_err();
2923 assert_eq!(err.code, "INVALID_COLUMN");
2924 }
2925
2926 #[test]
2927 fn update_rejects_unknown_column() {
2928 let rt = Runtime::in_memory(test_manifest()).unwrap();
2929 let id = rt
2930 .insert(
2931 "User",
2932 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2933 )
2934 .unwrap();
2935 let err = rt
2936 .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2937 .unwrap_err();
2938 assert_eq!(err.code, "INVALID_COLUMN");
2939 }
2940
2941 #[test]
2942 fn lookup_rejects_unknown_column() {
2943 let rt = Runtime::in_memory(test_manifest()).unwrap();
2944 let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2945 assert_eq!(err.code, "INVALID_COLUMN");
2946 }
2947
2948 #[test]
2949 fn query_filtered_rejects_unknown_column() {
2950 let rt = Runtime::in_memory(test_manifest()).unwrap();
2951 let err = rt
2952 .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2953 .unwrap_err();
2954 assert_eq!(err.code, "INVALID_COLUMN");
2955 }
2956
2957 #[test]
2958 fn query_filtered_rejects_unknown_order_column() {
2959 let rt = Runtime::in_memory(test_manifest()).unwrap();
2960 let err = rt
2961 .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2962 .unwrap_err();
2963 assert_eq!(err.code, "INVALID_COLUMN");
2964 }
2965
2966 #[test]
2967 fn query_filtered_sanitizes_order_direction() {
2968 let rt = Runtime::in_memory(test_manifest()).unwrap();
2969 rt.insert(
2970 "User",
2971 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2972 )
2973 .unwrap();
2974 let rows = rt
2976 .query_filtered(
2977 "User",
2978 &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2979 )
2980 .unwrap();
2981 assert_eq!(rows.len(), 1);
2982 }
2983
2984 #[test]
2985 fn in_memory_has_no_read_pool() {
2986 let rt = Runtime::in_memory(test_manifest()).unwrap();
2987 assert_eq!(rt.read_pool_size(), 0);
2988 }
2989
2990 #[test]
2991 fn open_creates_read_pool() {
2992 let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2993 std::fs::create_dir_all(&dir).unwrap();
2994 let db_path = dir.join("test_read_pool.db");
2995
2996 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2997 assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
2998
2999 let id = rt
3001 .insert(
3002 "User",
3003 &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
3004 )
3005 .unwrap();
3006 let row = rt.get_by_id("User", &id).unwrap().unwrap();
3007 assert_eq!(row["email"], "pool@test.com");
3008
3009 let _ = std::fs::remove_dir_all(&dir);
3011 }
3012
3013 #[test]
3014 fn concurrent_reads_dont_block_on_write() {
3015 use std::sync::Arc;
3016
3017 let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
3018 std::fs::create_dir_all(&dir).unwrap();
3019 let db_path = dir.join("test_concurrent.db");
3020
3021 let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
3022
3023 rt.insert(
3025 "User",
3026 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
3027 )
3028 .unwrap();
3029 rt.insert(
3030 "User",
3031 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
3032 )
3033 .unwrap();
3034
3035 let write_guard = rt.lock_write_conn().unwrap();
3037
3038 let mut handles = Vec::new();
3040 for _ in 0..4 {
3041 let rt_clone = Arc::clone(&rt);
3042 handles.push(std::thread::spawn(move || {
3043 let rows = rt_clone.list("User").unwrap();
3044 assert_eq!(rows.len(), 2);
3045 }));
3046 }
3047
3048 for h in handles {
3049 h.join().expect("reader thread panicked");
3050 }
3051
3052 drop(write_guard);
3054
3055 let _ = std::fs::remove_dir_all(&dir);
3057 }
3058}