1pub mod account_backend;
2pub mod api_key_backend;
3pub mod audit_backend;
4pub mod cache_handlers;
5pub mod cache_server;
6pub mod config;
7pub mod cron;
8pub mod datastore;
9pub mod ip_limit;
10pub mod job_store;
11pub mod jobs;
12pub mod log;
13pub mod loro_store;
14pub mod magic_code_backend;
15pub mod metrics;
16pub mod oauth_backend;
17pub mod openapi;
18pub mod org_backend;
19pub mod pg_loro_store;
20pub mod presence;
21pub mod pubsub;
22pub mod rate_limit;
23pub mod resp;
24pub mod resp_server;
25pub mod rooms;
26pub mod scheduler;
27pub mod server;
28pub mod session_backend;
29pub mod shard_ws;
30pub mod sse;
31pub mod tls;
32pub mod verification_backend;
33pub mod workflow_store;
34pub mod workflows;
35pub mod ws;
36
37use std::collections::HashMap;
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Mutex;
40
41use pylon_kernel::{AppManifest, ManifestEntity};
42use rusqlite::Connection;
43
44#[derive(Debug, Clone)]
49pub struct RuntimeError {
50 pub code: String,
51 pub message: String,
52}
53
54impl std::fmt::Display for RuntimeError {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "[{}] {}", self.code, self.message)
57 }
58}
59
60impl std::error::Error for RuntimeError {}
61
62impl From<pylon_http::DataError> for RuntimeError {
67 fn from(e: pylon_http::DataError) -> Self {
68 RuntimeError {
69 code: e.code,
70 message: e.message,
71 }
72 }
73}
74
75fn quote_ident(name: &str) -> String {
82 format!("\"{}\"", name.replace('"', "\"\""))
83}
84
85fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
89 if name == "id" {
90 return Ok(());
91 }
92 if entity.fields.iter().any(|f| f.name == name) {
93 return Ok(());
94 }
95 Err(RuntimeError {
96 code: "INVALID_COLUMN".into(),
97 message: format!(
98 "Unknown column \"{}\" -- valid columns: id, {}",
99 name,
100 entity
101 .fields
102 .iter()
103 .map(|f| f.name.as_str())
104 .collect::<Vec<_>>()
105 .join(", ")
106 ),
107 })
108}
109
110fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
122 let pragmas: &[(&str, &str)] = if in_memory {
123 &[
124 ("temp_store", "MEMORY"),
125 ("cache_size", "-65536"),
126 ("foreign_keys", "ON"),
127 ]
128 } else {
129 &[
130 ("journal_mode", "WAL"),
131 ("synchronous", "NORMAL"),
132 ("cache_size", "-65536"),
133 ("mmap_size", "268435456"),
134 ("temp_store", "MEMORY"),
135 ("busy_timeout", "5000"),
136 ("foreign_keys", "ON"),
137 ("wal_autocheckpoint", "1000"),
138 ]
139 };
140 for (key, value) in pragmas {
141 let _ = conn.pragma_update(None, key, value);
142 }
143}
144
145enum ReadConnGuard<'a> {
152 Pooled(std::sync::MutexGuard<'a, Connection>),
153 Write(std::sync::MutexGuard<'a, Connection>),
154}
155
156impl<'a> std::ops::Deref for ReadConnGuard<'a> {
157 type Target = Connection;
158 fn deref(&self) -> &Connection {
159 match self {
160 ReadConnGuard::Pooled(g) => g,
161 ReadConnGuard::Write(g) => g,
162 }
163 }
164}
165
166pub struct Runtime {
184 backend: RuntimeBackend,
185 manifest: AppManifest,
186 entities: HashMap<String, ManifestEntity>,
187 is_in_memory: bool,
191}
192
193enum RuntimeBackend {
196 Sqlite(SqliteBackend),
197 Postgres(PgBackend),
198}
199
200struct SqliteBackend {
204 write_conn: Mutex<Connection>,
206 read_pool: Vec<Mutex<Connection>>,
209 read_counter: AtomicUsize,
211 crdt: crate::loro_store::LoroStore,
216}
217
218pub(crate) struct PgBackend {
221 pub(crate) store: pylon_storage::pg_datastore::PostgresDataStore,
222 pub(crate) crdt: std::sync::Arc<crate::pg_loro_store::PgLoroStore>,
227}
228
229const READ_POOL_SIZE: usize = 4;
231
232fn is_postgres_url(url: &str) -> bool {
236 let lower = url.to_ascii_lowercase();
237 lower.starts_with("postgres://") || lower.starts_with("postgresql://")
238}
239
240fn data_err_to_runtime(e: pylon_http::DataError) -> RuntimeError {
244 RuntimeError {
245 code: e.code,
246 message: e.message,
247 }
248}
249
250impl Runtime {
251 pub fn open(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
259 if is_postgres_url(url) {
260 Self::open_postgres(url, manifest)
261 } else {
262 let conn = Connection::open(url).map_err(|e| RuntimeError {
263 code: "RUNTIME_OPEN_FAILED".into(),
264 message: format!("Failed to open database: {e}"),
265 })?;
266 Self::from_connection(conn, manifest, false)
267 }
268 }
269
270 pub fn open_postgres(url: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
280 let store = pylon_storage::pg_datastore::PostgresDataStore::connect(url, manifest.clone())
281 .map_err(data_err_to_runtime)?;
282 store
288 .with_client(|c| crate::pg_loro_store::ensure_sidecar(c))
289 .map_err(|e| RuntimeError {
290 code: "CRDT_SIDECAR_BOOTSTRAP_FAILED".into(),
291 message: format!("ensure pg crdt sidecar: {e}"),
292 })?;
293 let entities: HashMap<String, ManifestEntity> = manifest
294 .entities
295 .iter()
296 .map(|e| (e.name.clone(), e.clone()))
297 .collect();
298 Ok(Self {
299 backend: RuntimeBackend::Postgres(PgBackend {
300 store,
301 crdt: std::sync::Arc::new(crate::pg_loro_store::PgLoroStore::new()),
302 }),
303 manifest,
304 entities,
305 is_in_memory: false,
306 })
307 }
308
309 pub fn is_in_memory(&self) -> bool {
320 self.is_in_memory
321 }
322
323 pub fn db_path(&self) -> Option<String> {
329 if self.is_in_memory {
330 return None;
331 }
332 let sb = match &self.backend {
333 RuntimeBackend::Sqlite(sb) => sb,
334 RuntimeBackend::Postgres(_) => return None,
335 };
336 let conn = sb.write_conn.lock().ok()?;
337 conn.path().filter(|p| !p.is_empty()).map(String::from)
338 }
339
340 pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
347 if !self.is_in_memory() {
348 return Err(RuntimeError {
349 code: "RESET_REFUSED".into(),
350 message: "reset_for_tests is only available on in-memory databases".into(),
351 });
352 }
353 let conn = self.lock_write_conn()?;
354 let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
355 for name in entity_names {
356 let sql = format!("DELETE FROM {}", quote_ident(&name));
357 let _ = conn.execute(&sql, []);
358 let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
360 let _ = conn.execute(&fts_sql, []);
361 }
362 Ok(())
363 }
364
365 pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
369 let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
370 code: "RUNTIME_OPEN_FAILED".into(),
371 message: format!("Failed to open in-memory database: {e}"),
372 })?;
373 Self::from_connection(conn, manifest, true)
374 }
375
376 fn from_connection(
377 conn: Connection,
378 manifest: AppManifest,
379 is_in_memory: bool,
380 ) -> Result<Self, RuntimeError> {
381 tune_runtime_connection(&conn, is_in_memory);
383
384 let entities: HashMap<String, ManifestEntity> = manifest
386 .entities
387 .iter()
388 .map(|e| (e.name.clone(), e.clone()))
389 .collect();
390
391 for entity in &manifest.entities {
393 let fields: Vec<String> = entity
394 .fields
395 .iter()
396 .map(|f| {
397 let col_type = match f.field_type.as_str() {
398 "int" => "INTEGER",
399 "float" => "REAL",
400 "bool" => "INTEGER",
401 _ => "TEXT",
402 };
403 let not_null = if f.optional { "" } else { " NOT NULL" };
404 let unique = if f.unique { " UNIQUE" } else { "" };
405 format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
406 })
407 .collect();
408
409 let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
410 cols.extend(fields);
411 let sql = format!(
412 "CREATE TABLE IF NOT EXISTS {} ({})",
413 quote_ident(&entity.name),
414 cols.join(", ")
415 );
416 conn.execute(&sql, []).map_err(|e| RuntimeError {
417 code: "SCHEMA_INIT_FAILED".into(),
418 message: format!("Failed to create table {}: {e}", entity.name),
419 })?;
420
421 for idx in &entity.indexes {
423 let unique_kw = if idx.unique { "UNIQUE " } else { "" };
424 let quoted_fields: Vec<String> =
425 idx.fields.iter().map(|f| quote_ident(f)).collect();
426 let idx_sql = format!(
427 "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
428 quote_ident(&idx.name),
429 quote_ident(&entity.name),
430 quoted_fields.join(", ")
431 );
432 conn.execute(&idx_sql, []).ok();
433 }
434
435 let text_fields: Vec<&str> = entity
443 .fields
444 .iter()
445 .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
446 .map(|f| f.name.as_str())
447 .collect();
448 if !text_fields.is_empty() {
449 let fts_name = format!("{}_fts", entity.name);
450 let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
451 let fts_sql = format!(
452 "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
453 quote_ident(&fts_name),
454 quoted_cols.join(", "),
455 quote_ident(&entity.name),
456 );
457 let fts_ok = conn.execute(&fts_sql, []).is_ok();
460
461 if fts_ok {
462 let tbl = quote_ident(&entity.name);
473 let ftb = quote_ident(&fts_name);
474 let cols_list = quoted_cols.join(", ");
475 let new_list: Vec<String> = text_fields
476 .iter()
477 .map(|f| format!("new.{}", quote_ident(f)))
478 .collect();
479 let old_list: Vec<String> = text_fields
480 .iter()
481 .map(|f| format!("old.{}", quote_ident(f)))
482 .collect();
483
484 let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
485 let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
486 let trigger_au = quote_ident(&format!("{}_au", fts_name));
487
488 let trigger_ins = format!(
489 "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
490 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
491 new_vals = new_list.join(", "),
492 );
493 let trigger_del = format!(
494 "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
495 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
496 old_vals = old_list.join(", "),
497 );
498 let trigger_upd = format!(
499 "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
500 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
501 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
502 new_vals = new_list.join(", "),
503 old_vals = old_list.join(", "),
504 );
505 for (label, sql) in [
508 ("ai", &trigger_ins),
509 ("ad", &trigger_del),
510 ("au", &trigger_upd),
511 ] {
512 if let Err(e) = conn.execute(sql, []) {
513 tracing::warn!(
514 "[fts] failed to create {label} trigger for {}: {e}",
515 entity.name
516 );
517 }
518 }
519 }
520 }
521 }
522
523 let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
527
528 let read_pool = if let Some(ref path) = db_path {
529 let mut pool = Vec::with_capacity(READ_POOL_SIZE);
530 for _ in 0..READ_POOL_SIZE {
531 let read_conn = Connection::open_with_flags(
532 path,
533 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
534 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
535 )
536 .map_err(|e| RuntimeError {
537 code: "POOL_OPEN_FAILED".into(),
538 message: format!("Failed to open read connection: {e}"),
539 })?;
540 tune_runtime_connection(&read_conn, false);
541 pool.push(Mutex::new(read_conn));
542 }
543 pool
544 } else {
545 Vec::new()
547 };
548
549 crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
552 code: "CRDT_SIDECAR_FAILED".into(),
553 message: format!("create CRDT sidecar table: {e}"),
554 })?;
555
556 Ok(Self {
557 backend: RuntimeBackend::Sqlite(SqliteBackend {
558 write_conn: Mutex::new(conn),
559 read_pool,
560 read_counter: AtomicUsize::new(0),
561 crdt: crate::loro_store::LoroStore::new(),
562 }),
563 manifest,
564 entities,
565 is_in_memory,
566 })
567 }
568
569 pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
580 if matches!(self.backend, RuntimeBackend::Postgres(_)) {
584 return Ok(());
585 }
586 let conn = self.lock_write_conn()?;
587 conn.execute(pylon_storage::search::create_facet_table_sql(), [])
588 .map_err(|e| RuntimeError {
589 code: "FACET_TABLE_FAILED".into(),
590 message: format!("create _facet_bitmap: {e}"),
591 })?;
592 for entity in &self.manifest.entities {
593 if let Some(cfg) = &entity.search {
594 if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
595 conn.execute(&sql, []).map_err(|e| RuntimeError {
596 code: "FTS_TABLE_FAILED".into(),
597 message: format!("create FTS table for {}: {e}", entity.name),
598 })?;
599 }
600 for field in &cfg.sortable {
601 let idx_sql = format!(
602 "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
603 entity.name, entity.name,
604 );
605 conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
606 code: "SORT_INDEX_FAILED".into(),
607 message: format!("create sort index for {}.{field}: {e}", entity.name),
608 })?;
609 }
610 }
611 }
612 Ok(())
613 }
614
615 pub fn manifest(&self) -> &AppManifest {
617 &self.manifest
618 }
619
620 pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
626 self.lock_write_conn()
627 }
628
629 pub fn read_pool_size(&self) -> usize {
634 match &self.backend {
635 RuntimeBackend::Sqlite(sb) => sb.read_pool.len(),
636 RuntimeBackend::Postgres(_) => 0,
637 }
638 }
639
640 pub fn is_postgres(&self) -> bool {
643 matches!(self.backend, RuntimeBackend::Postgres(_))
644 }
645
646 pub(crate) fn crdt_fields_for(
655 &self,
656 ent: &ManifestEntity,
657 ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
658 let mut out = Vec::with_capacity(ent.fields.len());
659 for f in &ent.fields {
660 if f.name == "id" {
663 continue;
664 }
665 let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
666 code: "INVALID_CRDT_FIELD".into(),
667 message: format!(
668 "{}.{}: {e} (declared type={}, crdt={:?})",
669 ent.name, f.name, f.field_type, f.crdt
670 ),
671 })?;
672 out.push(pylon_crdt::CrdtField {
673 name: f.name.clone(),
674 kind,
675 });
676 }
677 Ok(out)
678 }
679
680 pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
688 match &self.backend {
689 RuntimeBackend::Sqlite(sb) => &sb.crdt,
690 RuntimeBackend::Postgres(_) => {
691 panic!("crdt_store() called on Postgres-backed Runtime")
692 }
693 }
694 }
695
696 pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
709 if let Some(pg) = self.pg_backend() {
710 let ent = self.require_entity(entity)?;
711 if ent.crdt {
717 let crdt_fields = self.crdt_fields_for(ent)?;
718 let id = generate_id();
719 let mut row = data.clone();
722 if let Some(obj) = row.as_object_mut() {
723 obj.insert("id".into(), serde_json::Value::String(id.clone()));
724 }
725 let result = pg
726 .store
727 .with_transaction_raw(|tx| -> Result<(), RuntimeError> {
728 pg.crdt
729 .apply_patch(tx, entity, &id, &crdt_fields, data)
730 .map_err(|e| RuntimeError {
731 code: "CRDT_APPLY_FAILED".into(),
732 message: format!("crdt write {entity}/{id}: {e}"),
733 })?;
734 pylon_storage::pg_tx_store::tx_insert(tx, &self.manifest, entity, &row)
735 .map(|_| ())
736 .map_err(data_err_to_runtime)?;
737 pg.crdt.cache_after_commit(tx, entity, &id);
738 Ok(())
739 });
740 if result.is_err() {
741 pg.crdt.evict(entity, &id);
748 }
749 result?;
750 return Ok(id);
751 }
752 return pylon_http::DataStore::insert(&pg.store, entity, data)
756 .map_err(data_err_to_runtime);
757 }
758 let ent = self.require_entity(entity)?;
759 let conn = self.lock_write_conn()?;
760
761 let id = generate_id();
762
763 let obj = data.as_object().ok_or_else(|| RuntimeError {
764 code: "INVALID_DATA".into(),
765 message: "Insert data must be a JSON object".into(),
766 })?;
767
768 for key in obj.keys() {
771 if key != "id" {
772 validate_column_name(key, ent)?;
773 }
774 }
775
776 let sb = self.sqlite_backend()?;
781
782 with_write_tx(&conn, || {
786 if ent.crdt {
787 let crdt_fields = self.crdt_fields_for(ent)?;
788 sb.crdt
789 .apply_patch(&conn, entity, &id, &crdt_fields, data)
790 .map_err(|e| RuntimeError {
791 code: "CRDT_APPLY_FAILED".into(),
792 message: format!("crdt write {entity}/{id}: {e}"),
793 })?;
794 }
795
796 let mut col_names = vec![quote_ident("id")];
797 let mut placeholders = vec!["?1".to_string()];
798 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
799
800 let mut idx = 2;
801 for (key, val) in obj {
802 if key == "id" {
803 continue;
804 }
805 col_names.push(quote_ident(key));
806 placeholders.push(format!("?{idx}"));
807 values.push(json_to_sql(val));
808 idx += 1;
809 }
810
811 let sql = format!(
812 "INSERT INTO {} ({}) VALUES ({})",
813 quote_ident(entity),
814 col_names.join(", "),
815 placeholders.join(", ")
816 );
817
818 let params: Vec<&dyn rusqlite::types::ToSql> =
819 values.iter().map(|v| v.as_ref()).collect();
820 conn.execute(&sql, params.as_slice())
821 .map_err(|e| RuntimeError {
822 code: "INSERT_FAILED".into(),
823 message: format!("Insert into {entity} failed: {e}"),
824 })?;
825
826 if let Some(cfg) = ent.search.as_ref() {
830 if !cfg.is_empty() {
831 pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
832 .map_err(|e| RuntimeError {
833 code: "SEARCH_MAINTENANCE_FAILED".into(),
834 message: format!("search index update on insert {entity}: {e}"),
835 })?;
836 }
837 }
838 Ok(())
839 })?;
840
841 Ok(id)
842 }
843
844 pub fn get_by_id(
846 &self,
847 entity: &str,
848 id: &str,
849 ) -> Result<Option<serde_json::Value>, RuntimeError> {
850 if let Some(pg) = self.pg_backend() {
851 return pylon_http::DataStore::get_by_id(&pg.store, entity, id)
852 .map_err(data_err_to_runtime);
853 }
854 let ent = self.require_entity(entity)?;
855 let conn = self.lock_read_conn()?;
856
857 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
858 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
859 code: "QUERY_FAILED".into(),
860 message: format!("Failed to prepare query: {e}"),
861 })?;
862
863 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
864
865 let result = stmt
866 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
867 .ok();
868
869 Ok(result)
870 }
871
872 pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
874 if let Some(pg) = self.pg_backend() {
875 return pylon_http::DataStore::list(&pg.store, entity).map_err(data_err_to_runtime);
876 }
877 let ent = self.require_entity(entity)?;
878 let conn = self.lock_read_conn()?;
879
880 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
881 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
882 code: "QUERY_FAILED".into(),
883 message: format!("Failed to prepare query: {e}"),
884 })?;
885
886 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
887
888 let rows = stmt
889 .query_map([], |row| Ok(row_to_json(row, &columns)))
890 .map_err(|e| RuntimeError {
891 code: "QUERY_FAILED".into(),
892 message: format!("Query failed: {e}"),
893 })?;
894
895 let mut result = Vec::new();
896 for row in rows {
897 if let Ok(val) = row {
898 result.push(val);
899 }
900 }
901 Ok(result)
902 }
903
904 pub fn list_after(
906 &self,
907 entity: &str,
908 after: Option<&str>,
909 limit: usize,
910 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
911 if let Some(pg) = self.pg_backend() {
912 return pylon_http::DataStore::list_after(&pg.store, entity, after, limit)
913 .map_err(data_err_to_runtime);
914 }
915 let ent = self.require_entity(entity)?;
916 let conn = self.lock_read_conn()?;
917
918 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
919 let table = quote_ident(entity);
920
921 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
922 Some(cursor) => (
923 format!(
924 "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
925 table
926 ),
927 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
928 ),
929 None => (
930 format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
931 vec![Box::new(limit as i64)],
932 ),
933 };
934
935 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
936 params.iter().map(|v| v.as_ref()).collect();
937
938 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
939 code: "QUERY_FAILED".into(),
940 message: format!("Failed to prepare query: {e}"),
941 })?;
942
943 let rows = stmt
944 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
945 .map_err(|e| RuntimeError {
946 code: "QUERY_FAILED".into(),
947 message: format!("Query failed: {e}"),
948 })?;
949
950 let mut result = Vec::new();
951 for row in rows {
952 if let Ok(val) = row {
953 result.push(val);
954 }
955 }
956 Ok(result)
957 }
958
959 pub fn update(
965 &self,
966 entity: &str,
967 id: &str,
968 data: &serde_json::Value,
969 ) -> Result<bool, RuntimeError> {
970 if let Some(pg) = self.pg_backend() {
971 let ent = self.require_entity(entity)?;
972 if ent.crdt {
973 let crdt_fields = self.crdt_fields_for(ent)?;
986 let result = pg
987 .store
988 .with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
989 pg.crdt
990 .apply_patch(tx, entity, id, &crdt_fields, data)
991 .map_err(|e| RuntimeError {
992 code: "CRDT_APPLY_FAILED".into(),
993 message: format!("crdt update {entity}/{id}: {e}"),
994 })?;
995 let updated = pylon_storage::pg_tx_store::tx_update(
996 tx,
997 &self.manifest,
998 entity,
999 id,
1000 data,
1001 )
1002 .map_err(data_err_to_runtime)?;
1003 if !updated {
1004 return Err(RuntimeError {
1007 code: "ENTITY_NOT_FOUND".into(),
1008 message: format!(
1009 "Update on {entity}/{id} found no row — refusing to commit \
1010 a CRDT snapshot that would orphan."
1011 ),
1012 });
1013 }
1014 pg.crdt.cache_after_commit(tx, entity, id);
1018 Ok(updated)
1019 });
1020 if result.is_err() {
1021 pg.crdt.evict(entity, id);
1022 if let Err(ref e) = result {
1028 if e.code == "ENTITY_NOT_FOUND" {
1029 return Ok(false);
1030 }
1031 }
1032 }
1033 return result;
1034 }
1035 return pylon_http::DataStore::update(&pg.store, entity, id, data)
1036 .map_err(data_err_to_runtime);
1037 }
1038 let ent = self.require_entity(entity)?;
1039 let conn = self.lock_write_conn()?;
1040
1041 let obj = data.as_object().ok_or_else(|| RuntimeError {
1042 code: "INVALID_DATA".into(),
1043 message: "Update data must be a JSON object".into(),
1044 })?;
1045
1046 for key in obj.keys() {
1048 if key != "id" {
1049 validate_column_name(key, ent)?;
1050 }
1051 }
1052 let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
1053 if writable_keys.is_empty() {
1054 return Ok(false);
1055 }
1056
1057 let sb = self.sqlite_backend()?;
1059
1060 let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
1063 if ent.crdt {
1064 let crdt_fields = self.crdt_fields_for(ent)?;
1065 sb.crdt
1066 .apply_patch(&conn, entity, id, &crdt_fields, data)
1067 .map_err(|e| RuntimeError {
1068 code: "CRDT_APPLY_FAILED".into(),
1069 message: format!("crdt write {entity}/{id}: {e}"),
1070 })?;
1071 }
1072
1073 let mut set_clauses = Vec::new();
1074 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1075 let mut idx = 1;
1076 for key in &writable_keys {
1077 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1078 values.push(json_to_sql(&obj[key.as_str()]));
1079 idx += 1;
1080 }
1081
1082 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1086 let old_row = if searchable {
1087 self.get_by_id_with_conn(&conn, entity, id)?
1088 } else {
1089 None
1090 };
1091
1092 values.push(Box::new(id.to_string()));
1093 let sql = format!(
1094 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1095 quote_ident(entity),
1096 set_clauses.join(", ")
1097 );
1098
1099 let params: Vec<&dyn rusqlite::types::ToSql> =
1100 values.iter().map(|v| v.as_ref()).collect();
1101 let affected = conn
1102 .execute(&sql, params.as_slice())
1103 .map_err(|e| RuntimeError {
1104 code: "UPDATE_FAILED".into(),
1105 message: format!("Update {entity}/{id} failed: {e}"),
1106 })? as i64;
1107
1108 if affected > 0 && searchable {
1109 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1110 pylon_storage::search_maintenance::apply_update(
1111 &conn, entity, id, &old, data, cfg,
1112 )
1113 .map_err(|e| RuntimeError {
1114 code: "SEARCH_MAINTENANCE_FAILED".into(),
1115 message: format!("search index update on update {entity}: {e}"),
1116 })?;
1117 }
1118 }
1119 Ok(affected)
1120 })?;
1121
1122 Ok(affected > 0)
1123 }
1124
1125 pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
1127 if let Some(pg) = self.pg_backend() {
1128 let ent = self.require_entity(entity)?;
1129 if ent.crdt {
1130 let result = pg
1135 .store
1136 .with_transaction_raw(|tx| -> Result<bool, RuntimeError> {
1137 tx.execute(
1138 "DELETE FROM _pylon_crdt_snapshots WHERE entity = $1 AND row_id = $2",
1139 &[&entity, &id],
1140 )
1141 .map_err(|e| RuntimeError {
1142 code: "CRDT_SIDECAR_DELETE_FAILED".into(),
1143 message: format!("delete pg crdt snapshot {entity}/{id}: {e}"),
1144 })?;
1145 pylon_storage::pg_tx_store::tx_delete(tx, &self.manifest, entity, id)
1146 .map_err(data_err_to_runtime)
1147 });
1148 if result.is_ok() {
1154 pg.crdt.evict(entity, id);
1155 }
1156 return result;
1157 }
1158 return pylon_http::DataStore::delete(&pg.store, entity, id)
1159 .map_err(data_err_to_runtime);
1160 }
1161 let ent = self.require_entity(entity)?;
1162 let conn = self.lock_write_conn()?;
1163
1164 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1167 if searchable {
1168 if let (Some(cfg), Ok(Some(row))) = (
1169 ent.search.as_ref(),
1170 self.get_by_id_with_conn(&conn, entity, id),
1171 ) {
1172 pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
1173 .map_err(|e| RuntimeError {
1174 code: "SEARCH_MAINTENANCE_FAILED".into(),
1175 message: format!("search index update on delete {entity}: {e}"),
1176 })?;
1177 }
1178 }
1179
1180 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1181 let affected = conn
1182 .execute(&sql, rusqlite::params![id])
1183 .map_err(|e| RuntimeError {
1184 code: "DELETE_FAILED".into(),
1185 message: format!("Delete {entity}/{id} failed: {e}"),
1186 })?;
1187
1188 Ok(affected > 0)
1189 }
1190
1191 pub fn lookup(
1193 &self,
1194 entity: &str,
1195 field: &str,
1196 value: &str,
1197 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1198 if let Some(pg) = self.pg_backend() {
1199 return pylon_http::DataStore::lookup(&pg.store, entity, field, value)
1200 .map_err(data_err_to_runtime);
1201 }
1202 let ent = self.require_entity(entity)?;
1203 validate_column_name(field, ent)?;
1204 let conn = self.lock_read_conn()?;
1205
1206 let sql = format!(
1207 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1208 quote_ident(entity),
1209 quote_ident(field)
1210 );
1211 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1212
1213 let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1214 stmt.query_row(rusqlite::params![value], |row| {
1215 Ok(row_to_json(row, &columns))
1216 })
1217 .ok()
1218 });
1219
1220 Ok(result)
1221 }
1222
1223 pub fn link(
1225 &self,
1226 entity: &str,
1227 id: &str,
1228 relation: &str,
1229 target_id: &str,
1230 ) -> Result<bool, RuntimeError> {
1231 let ent = self.require_entity(entity)?;
1232
1233 let rel = ent
1235 .relations
1236 .iter()
1237 .find(|r| r.name == relation)
1238 .ok_or_else(|| RuntimeError {
1239 code: "RELATION_NOT_FOUND".into(),
1240 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1241 })?;
1242
1243 let data = serde_json::json!({ rel.field.clone(): target_id });
1244 self.update(entity, id, &data)
1245 }
1246
1247 pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
1249 let ent = self.require_entity(entity)?;
1250
1251 let rel = ent
1252 .relations
1253 .iter()
1254 .find(|r| r.name == relation)
1255 .ok_or_else(|| RuntimeError {
1256 code: "RELATION_NOT_FOUND".into(),
1257 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
1258 })?;
1259
1260 let data = serde_json::json!({ rel.field.clone(): null });
1261 self.update(entity, id, &data)
1262 }
1263
1264 pub fn query_filtered(
1266 &self,
1267 entity: &str,
1268 filter: &serde_json::Value,
1269 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1270 if let Some(pg) = self.pg_backend() {
1271 return pylon_http::DataStore::query_filtered(&pg.store, entity, filter)
1272 .map_err(data_err_to_runtime);
1273 }
1274 let ent = self.require_entity(entity)?;
1275 let conn = self.lock_read_conn()?;
1276
1277 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1278 let obj = filter
1279 .as_object()
1280 .unwrap_or(&serde_json::Map::new())
1281 .clone();
1282
1283 let mut where_clauses = Vec::new();
1284 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1285 let mut order_clause = String::new();
1286 let mut limit_clause = String::new();
1287 let mut join_clause = String::new();
1288 let mut fts_order = false;
1289 let mut idx = 1;
1290
1291 for (key, val) in &obj {
1292 match key.as_str() {
1293 "$order" => {
1294 if let Some(order_obj) = val.as_object() {
1295 let mut parts: Vec<String> = Vec::new();
1296 for (col, dir) in order_obj {
1297 validate_column_name(col, ent)?;
1298 let d = match dir.as_str().unwrap_or("asc") {
1299 "desc" | "DESC" => "DESC",
1300 _ => "ASC",
1301 };
1302 parts.push(format!("{} {d}", quote_ident(col)));
1303 }
1304 if !parts.is_empty() {
1305 order_clause = format!(" ORDER BY {}", parts.join(", "));
1306 }
1307 }
1308 }
1309 "$limit" => {
1310 if let Some(n) = val.as_u64() {
1311 limit_clause = format!(" LIMIT {n}");
1312 }
1313 }
1314 "$offset" => {
1315 if let Some(n) = val.as_u64() {
1316 if limit_clause.is_empty() {
1318 limit_clause = " LIMIT -1".into();
1319 }
1320 limit_clause = format!("{limit_clause} OFFSET {n}");
1321 }
1322 }
1323 "$search" => {
1324 if let Some(q) = val.as_str() {
1325 let fts = format!("{}_fts", entity);
1327 join_clause = format!(
1328 " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
1329 fts = quote_ident(&fts),
1330 ent = quote_ident(entity),
1331 );
1332 where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
1333 values.push(Box::new(q.to_string()));
1334 fts_order = true;
1335 idx += 1;
1336 }
1337 }
1338 _ => {
1339 validate_column_name(key, ent)?;
1340 let quoted_key = quote_ident(key);
1341
1342 if let Some(op_obj) = val.as_object() {
1343 for (op, op_val) in op_obj {
1344 match op.as_str() {
1345 "$not" => {
1346 where_clauses.push(format!("{quoted_key} != ?{idx}"));
1347 values.push(json_to_sql(op_val));
1348 idx += 1;
1349 }
1350 "$gt" => {
1351 where_clauses.push(format!("{quoted_key} > ?{idx}"));
1352 values.push(json_to_sql(op_val));
1353 idx += 1;
1354 }
1355 "$gte" => {
1356 where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1357 values.push(json_to_sql(op_val));
1358 idx += 1;
1359 }
1360 "$lt" => {
1361 where_clauses.push(format!("{quoted_key} < ?{idx}"));
1362 values.push(json_to_sql(op_val));
1363 idx += 1;
1364 }
1365 "$lte" => {
1366 where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1367 values.push(json_to_sql(op_val));
1368 idx += 1;
1369 }
1370 "$like" => {
1371 where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1372 let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1373 values.push(Box::new(pattern));
1374 idx += 1;
1375 }
1376 "$in" => {
1377 if let Some(arr) = op_val.as_array() {
1378 if arr.is_empty() {
1379 where_clauses.push("0".into());
1388 } else {
1389 let placeholders: Vec<String> = arr
1390 .iter()
1391 .map(|v| {
1392 let p = format!("?{idx}");
1393 values.push(json_to_sql(v));
1394 idx += 1;
1395 p
1396 })
1397 .collect();
1398 where_clauses.push(format!(
1399 "{quoted_key} IN ({})",
1400 placeholders.join(", ")
1401 ));
1402 }
1403 }
1404 }
1405 _ => {}
1406 }
1407 }
1408 } else {
1409 where_clauses.push(format!("{quoted_key} = ?{idx}"));
1411 values.push(json_to_sql(val));
1412 idx += 1;
1413 }
1414 }
1415 }
1416 }
1417
1418 let where_sql = if where_clauses.is_empty() {
1419 String::new()
1420 } else {
1421 format!(" WHERE {}", where_clauses.join(" AND "))
1422 };
1423
1424 if order_clause.is_empty() {
1425 order_clause = if fts_order {
1426 " ORDER BY bm25(".to_string() + "e_ident(&format!("{}_fts", entity)) + ")"
1428 } else {
1429 format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1430 };
1431 }
1432
1433 let select_prefix = format!("{}.*", quote_ident(entity));
1434 let sql = format!(
1435 "SELECT {} FROM {}{}{}{}{}",
1436 select_prefix,
1437 quote_ident(entity),
1438 join_clause,
1439 where_sql,
1440 order_clause,
1441 limit_clause
1442 );
1443 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1444 values.iter().map(|v| v.as_ref()).collect();
1445
1446 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1447 code: "QUERY_FAILED".into(),
1448 message: format!("Failed to prepare filtered query: {e}"),
1449 })?;
1450
1451 let rows = stmt
1452 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1453 .map_err(|e| RuntimeError {
1454 code: "QUERY_FAILED".into(),
1455 message: format!("Filtered query failed: {e}"),
1456 })?;
1457
1458 let mut result = Vec::new();
1459 for row in rows {
1460 if let Ok(val) = row {
1461 result.push(val);
1462 }
1463 }
1464 Ok(result)
1465 }
1466
1467 pub fn query_graph(
1472 &self,
1473 query: &serde_json::Value,
1474 ) -> Result<serde_json::Value, RuntimeError> {
1475 let obj = query.as_object().ok_or_else(|| RuntimeError {
1476 code: "INVALID_QUERY".into(),
1477 message: "Graph query must be a JSON object".into(),
1478 })?;
1479
1480 let mut results = serde_json::Map::new();
1481
1482 for (entity_name, query_opts) in obj {
1483 let _ent = self.require_entity(entity_name)?;
1484
1485 let filter = query_opts
1487 .get("where")
1488 .cloned()
1489 .unwrap_or(serde_json::json!({}));
1490 let rows = self.query_filtered(entity_name, &filter)?;
1491
1492 let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1494 {
1495 let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1501 code: "INVARIANT_BROKEN".into(),
1502 message: format!(
1503 "entity \"{entity_name}\" missing from registry during include expansion"
1504 ),
1505 })?;
1506 rows.into_iter()
1507 .map(|mut row| {
1508 for (rel_name, _sub_query) in include {
1509 if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1510 let fk_value = row
1511 .get(&rel.field)
1512 .and_then(|v| v.as_str())
1513 .map(|s| s.to_string());
1514 if let Some(fk) = fk_value {
1515 if rel.many {
1516 let sub_filter = serde_json::json!({ &rel.field: &fk });
1518 if let Ok(related) =
1519 self.query_filtered(&rel.target, &sub_filter)
1520 {
1521 row[rel_name] = serde_json::json!(related);
1522 }
1523 } else {
1524 if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1526 {
1527 row[rel_name] = related;
1528 }
1529 }
1530 }
1531 }
1532 }
1533 row
1534 })
1535 .collect()
1536 } else {
1537 rows
1538 };
1539
1540 let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1542 rows.into_iter().take(limit as usize).collect()
1543 } else {
1544 rows
1545 };
1546
1547 results.insert(entity_name.clone(), serde_json::json!(rows));
1548 }
1549
1550 Ok(serde_json::Value::Object(results))
1551 }
1552
1553 pub fn insert_with_conn(
1559 &self,
1560 conn: &Connection,
1561 entity: &str,
1562 data: &serde_json::Value,
1563 ) -> Result<String, RuntimeError> {
1564 let ent = self.require_entity(entity)?;
1565 let id = generate_id();
1566 let obj = data.as_object().ok_or_else(|| RuntimeError {
1567 code: "INVALID_DATA".into(),
1568 message: "Insert data must be a JSON object".into(),
1569 })?;
1570
1571 let mut col_names = vec![quote_ident("id")];
1572 let mut placeholders = vec!["?1".to_string()];
1573 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1574 let mut idx = 2;
1575 for (key, val) in obj {
1576 if key == "id" {
1577 continue;
1578 }
1579 validate_column_name(key, ent)?;
1580 col_names.push(quote_ident(key));
1581 placeholders.push(format!("?{idx}"));
1582 values.push(json_to_sql(val));
1583 idx += 1;
1584 }
1585
1586 let sql = format!(
1587 "INSERT INTO {} ({}) VALUES ({})",
1588 quote_ident(entity),
1589 col_names.join(", "),
1590 placeholders.join(", ")
1591 );
1592 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1593 conn.execute(&sql, params.as_slice())
1594 .map_err(|e| RuntimeError {
1595 code: "INSERT_FAILED".into(),
1596 message: format!("Insert into {entity} failed: {e}"),
1597 })?;
1598
1599 if let Some(cfg) = ent.search.as_ref() {
1602 if !cfg.is_empty() {
1603 pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1604 .map_err(|e| RuntimeError {
1605 code: "SEARCH_MAINTENANCE_FAILED".into(),
1606 message: format!("search index update on insert {entity}: {e}"),
1607 })?;
1608 }
1609 }
1610
1611 Ok(id)
1612 }
1613
1614 pub fn update_with_conn(
1616 &self,
1617 conn: &Connection,
1618 entity: &str,
1619 id: &str,
1620 data: &serde_json::Value,
1621 ) -> Result<bool, RuntimeError> {
1622 let ent = self.require_entity(entity)?;
1623 let obj = data.as_object().ok_or_else(|| RuntimeError {
1624 code: "INVALID_DATA".into(),
1625 message: "Update data must be a JSON object".into(),
1626 })?;
1627
1628 let mut set_clauses = Vec::new();
1629 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1630 let mut idx = 1;
1631 for (key, val) in obj {
1632 if key == "id" {
1633 continue;
1634 }
1635 validate_column_name(key, ent)?;
1636 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1637 values.push(json_to_sql(val));
1638 idx += 1;
1639 }
1640 if set_clauses.is_empty() {
1641 return Ok(false);
1642 }
1643
1644 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1649 let old_row = if searchable {
1650 self.get_by_id_with_conn(conn, entity, id)?
1651 } else {
1652 None
1653 };
1654
1655 values.push(Box::new(id.to_string()));
1656 let sql = format!(
1657 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1658 quote_ident(entity),
1659 set_clauses.join(", ")
1660 );
1661 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1662 let affected = conn
1663 .execute(&sql, params.as_slice())
1664 .map_err(|e| RuntimeError {
1665 code: "UPDATE_FAILED".into(),
1666 message: format!("Update {entity}/{id} failed: {e}"),
1667 })?;
1668
1669 if affected > 0 && searchable {
1670 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1671 pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1672 .map_err(|e| RuntimeError {
1673 code: "SEARCH_MAINTENANCE_FAILED".into(),
1674 message: format!("search index update on update {entity}: {e}"),
1675 })?;
1676 }
1677 }
1678
1679 Ok(affected > 0)
1680 }
1681
1682 pub fn delete_with_conn(
1684 &self,
1685 conn: &Connection,
1686 entity: &str,
1687 id: &str,
1688 ) -> Result<bool, RuntimeError> {
1689 let ent = self.require_entity(entity)?;
1690
1691 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1694 if searchable {
1695 if let (Some(cfg), Ok(Some(row))) = (
1696 ent.search.as_ref(),
1697 self.get_by_id_with_conn(conn, entity, id),
1698 ) {
1699 pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1700 .map_err(|e| RuntimeError {
1701 code: "SEARCH_MAINTENANCE_FAILED".into(),
1702 message: format!("search index update on delete {entity}: {e}"),
1703 })?;
1704 }
1705 }
1706
1707 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1708 let affected = conn
1709 .execute(&sql, rusqlite::params![id])
1710 .map_err(|e| RuntimeError {
1711 code: "DELETE_FAILED".into(),
1712 message: format!("Delete {entity}/{id} failed: {e}"),
1713 })?;
1714 Ok(affected > 0)
1715 }
1716
1717 pub fn get_by_id_with_conn(
1719 &self,
1720 conn: &Connection,
1721 entity: &str,
1722 id: &str,
1723 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1724 let ent = self.require_entity(entity)?;
1725 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1726 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1727 code: "QUERY_FAILED".into(),
1728 message: format!("Failed to prepare query: {e}"),
1729 })?;
1730 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1731 Ok(stmt
1732 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1733 .ok())
1734 }
1735
1736 pub fn list_with_conn(
1738 &self,
1739 conn: &Connection,
1740 entity: &str,
1741 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1742 let ent = self.require_entity(entity)?;
1743 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1744 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1745 code: "QUERY_FAILED".into(),
1746 message: format!("Failed to prepare query: {e}"),
1747 })?;
1748 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1749 let rows = stmt
1750 .query_map([], |row| Ok(row_to_json(row, &columns)))
1751 .map_err(|e| RuntimeError {
1752 code: "QUERY_FAILED".into(),
1753 message: format!("Query failed: {e}"),
1754 })?;
1755 Ok(rows.flatten().collect())
1756 }
1757
1758 pub fn list_after_with_conn(
1760 &self,
1761 conn: &Connection,
1762 entity: &str,
1763 after: Option<&str>,
1764 limit: usize,
1765 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1766 let ent = self.require_entity(entity)?;
1767 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1768 let table = quote_ident(entity);
1769 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1770 Some(cursor) => (
1771 format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1772 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1773 ),
1774 None => (
1775 format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1776 vec![Box::new(limit as i64)],
1777 ),
1778 };
1779 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1780 params.iter().map(|v| v.as_ref()).collect();
1781 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1782 code: "QUERY_FAILED".into(),
1783 message: format!("Failed to prepare: {e}"),
1784 })?;
1785 let rows = stmt
1786 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1787 .map_err(|e| RuntimeError {
1788 code: "QUERY_FAILED".into(),
1789 message: format!("Query failed: {e}"),
1790 })?;
1791 Ok(rows.flatten().collect())
1792 }
1793
1794 pub fn lookup_with_conn(
1796 &self,
1797 conn: &Connection,
1798 entity: &str,
1799 field: &str,
1800 value: &str,
1801 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1802 let ent = self.require_entity(entity)?;
1803 validate_column_name(field, ent)?;
1804 let sql = format!(
1805 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1806 quote_ident(entity),
1807 quote_ident(field)
1808 );
1809 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1810 Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1811 stmt.query_row(rusqlite::params![value], |row| {
1812 Ok(row_to_json(row, &columns))
1813 })
1814 .ok()
1815 }))
1816 }
1817
1818 pub fn link_with_conn(
1820 &self,
1821 conn: &Connection,
1822 entity: &str,
1823 id: &str,
1824 relation: &str,
1825 target_id: &str,
1826 ) -> Result<bool, RuntimeError> {
1827 let ent = self.require_entity(entity)?;
1828 let rel = ent
1829 .relations
1830 .iter()
1831 .find(|r| r.name == relation)
1832 .ok_or_else(|| RuntimeError {
1833 code: "RELATION_NOT_FOUND".into(),
1834 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1835 })?;
1836 let data = serde_json::json!({ rel.field.clone(): target_id });
1837 self.update_with_conn(conn, entity, id, &data)
1838 }
1839
1840 pub fn unlink_with_conn(
1842 &self,
1843 conn: &Connection,
1844 entity: &str,
1845 id: &str,
1846 relation: &str,
1847 ) -> Result<bool, RuntimeError> {
1848 let ent = self.require_entity(entity)?;
1849 let rel = ent
1850 .relations
1851 .iter()
1852 .find(|r| r.name == relation)
1853 .ok_or_else(|| RuntimeError {
1854 code: "RELATION_NOT_FOUND".into(),
1855 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1856 })?;
1857 let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1858 self.update_with_conn(conn, entity, id, &data)
1859 }
1860
1861 pub fn query_filtered_with_conn(
1866 &self,
1867 conn: &Connection,
1868 entity: &str,
1869 filter: &serde_json::Value,
1870 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1871 let ent = self.require_entity(entity)?;
1872 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1873 let empty = serde_json::Map::new();
1874 let obj = filter.as_object().unwrap_or(&empty);
1875
1876 let mut where_clauses = Vec::new();
1877 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1878 let mut order_clause = String::new();
1879 let mut limit_clause = String::new();
1880 let mut idx = 1;
1881
1882 for (key, val) in obj {
1883 match key.as_str() {
1884 "$order" => {
1885 if let Some(o) = val.as_object() {
1886 let mut parts: Vec<String> = Vec::new();
1887 for (col, dir) in o {
1888 validate_column_name(col, ent)?;
1889 let d = match dir.as_str().unwrap_or("asc") {
1890 "desc" | "DESC" => "DESC",
1891 _ => "ASC",
1892 };
1893 parts.push(format!("{} {d}", quote_ident(col)));
1894 }
1895 if !parts.is_empty() {
1896 order_clause = format!(" ORDER BY {}", parts.join(", "));
1897 }
1898 }
1899 }
1900 "$limit" => {
1901 if let Some(n) = val.as_u64() {
1902 limit_clause = format!(" LIMIT {n}");
1903 }
1904 }
1905 "$offset" => {
1906 if let Some(n) = val.as_u64() {
1907 if limit_clause.is_empty() {
1908 limit_clause = " LIMIT -1".into();
1909 }
1910 limit_clause = format!("{limit_clause} OFFSET {n}");
1911 }
1912 }
1913 _ => {
1914 validate_column_name(key, ent)?;
1915 let qk = quote_ident(key);
1916 if let Some(op_obj) = val.as_object() {
1917 for (op, op_val) in op_obj {
1918 match op.as_str() {
1919 "$not" => {
1920 where_clauses.push(format!("{qk} != ?{idx}"));
1921 values.push(json_to_sql(op_val));
1922 idx += 1;
1923 }
1924 "$gt" => {
1925 where_clauses.push(format!("{qk} > ?{idx}"));
1926 values.push(json_to_sql(op_val));
1927 idx += 1;
1928 }
1929 "$gte" => {
1930 where_clauses.push(format!("{qk} >= ?{idx}"));
1931 values.push(json_to_sql(op_val));
1932 idx += 1;
1933 }
1934 "$lt" => {
1935 where_clauses.push(format!("{qk} < ?{idx}"));
1936 values.push(json_to_sql(op_val));
1937 idx += 1;
1938 }
1939 "$lte" => {
1940 where_clauses.push(format!("{qk} <= ?{idx}"));
1941 values.push(json_to_sql(op_val));
1942 idx += 1;
1943 }
1944 "$like" => {
1945 where_clauses.push(format!("{qk} LIKE ?{idx}"));
1946 let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1947 values.push(Box::new(p));
1948 idx += 1;
1949 }
1950 "$in" => {
1951 if let Some(arr) = op_val.as_array() {
1952 let ph: Vec<String> = arr
1953 .iter()
1954 .map(|v| {
1955 let p = format!("?{idx}");
1956 values.push(json_to_sql(v));
1957 idx += 1;
1958 p
1959 })
1960 .collect();
1961 if !ph.is_empty() {
1962 where_clauses
1963 .push(format!("{qk} IN ({})", ph.join(", ")));
1964 }
1965 }
1966 }
1967 _ => {}
1968 }
1969 }
1970 } else {
1971 where_clauses.push(format!("{qk} = ?{idx}"));
1972 values.push(json_to_sql(val));
1973 idx += 1;
1974 }
1975 }
1976 }
1977 }
1978
1979 let where_sql = if where_clauses.is_empty() {
1980 String::new()
1981 } else {
1982 format!(" WHERE {}", where_clauses.join(" AND "))
1983 };
1984 if order_clause.is_empty() {
1985 order_clause = " ORDER BY \"id\"".into();
1986 }
1987
1988 let sql = format!(
1989 "SELECT * FROM {}{}{}{}",
1990 quote_ident(entity),
1991 where_sql,
1992 order_clause,
1993 limit_clause
1994 );
1995 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1996 values.iter().map(|v| v.as_ref()).collect();
1997 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1998 code: "QUERY_FAILED".into(),
1999 message: format!("Failed to prepare: {e}"),
2000 })?;
2001 let rows = stmt
2002 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
2003 .map_err(|e| RuntimeError {
2004 code: "QUERY_FAILED".into(),
2005 message: format!("Query failed: {e}"),
2006 })?;
2007 Ok(rows.flatten().collect())
2008 }
2009
2010 pub fn query_graph_with_conn(
2012 &self,
2013 conn: &Connection,
2014 query: &serde_json::Value,
2015 ) -> Result<serde_json::Value, RuntimeError> {
2016 let obj = query.as_object().ok_or_else(|| RuntimeError {
2017 code: "INVALID_QUERY".into(),
2018 message: "Graph query must be a JSON object".into(),
2019 })?;
2020 let mut results = serde_json::Map::new();
2021 for (entity_name, query_opts) in obj {
2022 let _ent = self.require_entity(entity_name)?;
2023 let filter = query_opts
2024 .get("where")
2025 .cloned()
2026 .unwrap_or(serde_json::json!({}));
2027 let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
2028 results.insert(entity_name.clone(), serde_json::json!(rows));
2029 }
2030 Ok(serde_json::Value::Object(results))
2031 }
2032
2033 pub fn aggregate(
2040 &self,
2041 entity: &str,
2042 spec: &serde_json::Value,
2043 ) -> Result<serde_json::Value, RuntimeError> {
2044 if let Some(pg) = self.pg_backend() {
2045 return pylon_http::DataStore::aggregate(&pg.store, entity, spec)
2046 .map_err(data_err_to_runtime);
2047 }
2048 let ent = self.require_entity(entity)?;
2049 let conn = self.lock_read_conn()?;
2050 let obj = spec.as_object().ok_or_else(|| RuntimeError {
2051 code: "INVALID_QUERY".into(),
2052 message: "aggregate spec must be an object".into(),
2053 })?;
2054
2055 let mut select_parts: Vec<String> = Vec::new();
2057 let mut result_fields: Vec<String> = Vec::new();
2058
2059 if let Some(count) = obj.get("count") {
2060 match count {
2061 serde_json::Value::String(s) if s == "*" => {
2062 select_parts.push("COUNT(*) AS count".into());
2063 result_fields.push("count".into());
2064 }
2065 serde_json::Value::String(field) => {
2066 validate_column_name(field, ent)?;
2067 let alias = format!("count_{field}");
2068 select_parts.push(format!(
2069 "COUNT({}) AS {}",
2070 quote_ident(field),
2071 quote_ident(&alias)
2072 ));
2073 result_fields.push(alias);
2074 }
2075 _ => {}
2076 }
2077 }
2078
2079 for (fn_name, alias_prefix) in [
2080 ("sum", "sum_"),
2081 ("avg", "avg_"),
2082 ("min", "min_"),
2083 ("max", "max_"),
2084 ] {
2085 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
2086 for field in fields {
2087 if let Some(f) = field.as_str() {
2088 validate_column_name(f, ent)?;
2089 let alias = format!("{alias_prefix}{f}");
2090 let sql_fn = fn_name.to_uppercase();
2091 select_parts.push(format!(
2092 "{}({}) AS {}",
2093 sql_fn,
2094 quote_ident(f),
2095 quote_ident(&alias)
2096 ));
2097 result_fields.push(alias);
2098 }
2099 }
2100 }
2101 }
2102
2103 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
2108 for field in fields {
2109 if let Some(f) = field.as_str() {
2110 validate_column_name(f, ent)?;
2111 let alias = format!("count_distinct_{f}");
2112 select_parts.push(format!(
2113 "COUNT(DISTINCT {}) AS {}",
2114 quote_ident(f),
2115 quote_ident(&alias)
2116 ));
2117 result_fields.push(alias);
2118 }
2119 }
2120 }
2121
2122 let mut group_by: Vec<String> = Vec::new();
2128 let mut group_select: Vec<String> = Vec::new();
2129 let mut group_field_names: Vec<String> = Vec::new();
2130 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
2131 for g in groups {
2132 if let Some(f) = g.as_str() {
2133 validate_column_name(f, ent)?;
2134 let quoted = quote_ident(f);
2135 group_by.push(quoted.clone());
2136 group_select.push(quoted);
2137 group_field_names.push(f.to_string());
2138 } else if let Some(spec) = g.as_object() {
2139 let field =
2140 spec.get("field")
2141 .and_then(|v| v.as_str())
2142 .ok_or_else(|| RuntimeError {
2143 code: "INVALID_QUERY".into(),
2144 message: "groupBy object spec requires `field`".into(),
2145 })?;
2146 validate_column_name(field, ent)?;
2147 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
2148 let fmt = match bucket {
2149 "hour" => "%Y-%m-%d %H:00:00",
2150 "day" => "%Y-%m-%d",
2151 "month" => "%Y-%m",
2152 "year" => "%Y",
2153 "week" => "%Y-W%W",
2154 _ => {
2155 return Err(RuntimeError {
2156 code: "INVALID_QUERY".into(),
2157 message: format!(
2158 "bucket must be one of hour/day/week/month/year, got {bucket}"
2159 ),
2160 });
2161 }
2162 };
2163 let alias = format!("{field}_{bucket}");
2164 let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
2165 group_by.push(expr.clone());
2166 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
2167 group_field_names.push(alias);
2168 }
2169 }
2170 }
2171 let mut full_select = group_select.clone();
2172 full_select.extend(select_parts.iter().cloned());
2173 if full_select.is_empty() {
2174 return Err(RuntimeError {
2175 code: "INVALID_QUERY".into(),
2176 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
2177 });
2178 }
2179
2180 let mut where_clauses = Vec::new();
2182 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
2183 let mut idx = 1;
2184 if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
2185 for (k, v) in where_obj {
2186 validate_column_name(k, ent)?;
2187 where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
2188 values.push(json_to_sql(v));
2189 idx += 1;
2190 }
2191 }
2192 let where_sql = if where_clauses.is_empty() {
2193 String::new()
2194 } else {
2195 format!(" WHERE {}", where_clauses.join(" AND "))
2196 };
2197
2198 let group_sql = if group_by.is_empty() {
2199 String::new()
2200 } else {
2201 format!(" GROUP BY {}", group_by.join(", "))
2202 };
2203
2204 let sql = format!(
2205 "SELECT {} FROM {}{}{}",
2206 full_select.join(", "),
2207 quote_ident(entity),
2208 where_sql,
2209 group_sql
2210 );
2211
2212 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
2213 values.iter().map(|v| v.as_ref()).collect();
2214 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
2215 code: "QUERY_FAILED".into(),
2216 message: format!("Failed to prepare aggregate: {e}"),
2217 })?;
2218
2219 let column_names: Vec<String> = {
2220 let mut v = group_field_names.clone();
2221 v.extend(result_fields.iter().cloned());
2222 v
2223 };
2224
2225 let rows = stmt
2226 .query_map(param_refs.as_slice(), |row| {
2227 let mut obj = serde_json::Map::new();
2228 for (i, name) in column_names.iter().enumerate() {
2229 if let Ok(n) = row.get::<_, i64>(i) {
2231 obj.insert(name.clone(), serde_json::Value::Number(n.into()));
2232 } else if let Ok(f) = row.get::<_, f64>(i) {
2233 if let Some(num) = serde_json::Number::from_f64(f) {
2234 obj.insert(name.clone(), serde_json::Value::Number(num));
2235 } else {
2236 obj.insert(name.clone(), serde_json::Value::Null);
2237 }
2238 } else if let Ok(s) = row.get::<_, String>(i) {
2239 obj.insert(name.clone(), serde_json::Value::String(s));
2240 } else {
2241 obj.insert(name.clone(), serde_json::Value::Null);
2242 }
2243 }
2244 Ok(serde_json::Value::Object(obj))
2245 })
2246 .map_err(|e| RuntimeError {
2247 code: "QUERY_FAILED".into(),
2248 message: format!("Aggregate failed: {e}"),
2249 })?;
2250
2251 let mut result = Vec::new();
2252 for row in rows {
2253 if let Ok(val) = row {
2254 result.push(val);
2255 }
2256 }
2257 Ok(serde_json::json!({ "rows": result }))
2258 }
2259
2260 fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
2265 self.entities.get(name).ok_or_else(|| RuntimeError {
2266 code: "ENTITY_NOT_FOUND".into(),
2267 message: format!("Unknown entity: \"{name}\""),
2268 })
2269 }
2270
2271 fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
2277 let sb = self.sqlite_backend()?;
2278 sb.write_conn.lock().map_err(|e| RuntimeError {
2279 code: "LOCK_FAILED".into(),
2280 message: format!("Failed to acquire write connection lock: {e}"),
2281 })
2282 }
2283
2284 fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
2288 let sb = self.sqlite_backend()?;
2289 if !sb.read_pool.is_empty() {
2290 let idx = sb.read_counter.fetch_add(1, Ordering::Relaxed) % sb.read_pool.len();
2291 let guard = sb.read_pool[idx].lock().map_err(|e| RuntimeError {
2292 code: "LOCK_FAILED".into(),
2293 message: format!("Failed to acquire read connection: {e}"),
2294 })?;
2295 Ok(ReadConnGuard::Pooled(guard))
2296 } else {
2297 let guard = sb.write_conn.lock().map_err(|e| RuntimeError {
2299 code: "LOCK_FAILED".into(),
2300 message: format!("Failed to acquire connection: {e}"),
2301 })?;
2302 Ok(ReadConnGuard::Write(guard))
2303 }
2304 }
2305
2306 fn sqlite_backend(&self) -> Result<&SqliteBackend, RuntimeError> {
2310 match &self.backend {
2311 RuntimeBackend::Sqlite(sb) => Ok(sb),
2312 RuntimeBackend::Postgres(_) => Err(RuntimeError {
2313 code: "NOT_SQLITE_BACKEND".into(),
2314 message: "this operation requires a SQLite-backed Runtime".into(),
2315 }),
2316 }
2317 }
2318
2319 pub(crate) fn pg_backend(&self) -> Option<&PgBackend> {
2324 match &self.backend {
2325 RuntimeBackend::Sqlite(_) => None,
2326 RuntimeBackend::Postgres(pg) => Some(pg),
2327 }
2328 }
2329
2330 pub fn pg_data_store_pub(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2340 self.pg_data_store()
2341 }
2342
2343 #[doc(hidden)]
2344 pub fn pg_data_store_for_tests(&self) -> &pylon_storage::pg_datastore::PostgresDataStore {
2345 self.pg_data_store().expect("pg backend")
2346 }
2347
2348 #[doc(hidden)]
2353 pub fn run_in_pg_mutation_tx_for_tests<F, T, E>(&self, body: F) -> Result<T, E>
2354 where
2355 F: FnOnce(&dyn pylon_http::DataStore) -> Result<T, E>,
2356 E: From<pylon_http::DataError>,
2357 {
2358 let pg_backend = self.pg_backend().expect("pg backend");
2359 let crdt_hook: std::sync::Arc<dyn pylon_storage::pg_tx_store::PgCrdtHook> =
2360 std::sync::Arc::new(crate::pg_loro_store::PgCrdtHookImpl {
2361 crdt: std::sync::Arc::clone(&pg_backend.crdt),
2362 manifest: std::sync::Arc::new(self.manifest.clone()),
2363 });
2364 pg_backend.store.with_transaction_crdt(crdt_hook, body)
2365 }
2366
2367 pub(crate) fn pg_data_store(&self) -> Option<&pylon_storage::pg_datastore::PostgresDataStore> {
2368 self.pg_backend().map(|pg| &pg.store)
2369 }
2370}
2371
2372fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
2400where
2401 F: FnOnce() -> Result<T, RuntimeError>,
2402{
2403 conn.execute("BEGIN IMMEDIATE", [])
2404 .map_err(|e| RuntimeError {
2405 code: "TX_BEGIN_FAILED".into(),
2406 message: format!("BEGIN: {e}"),
2407 })?;
2408 match body() {
2409 Ok(v) => {
2410 conn.execute("COMMIT", []).map_err(|e| RuntimeError {
2411 code: "TX_COMMIT_FAILED".into(),
2412 message: format!("COMMIT: {e}"),
2413 })?;
2414 Ok(v)
2415 }
2416 Err(e) => {
2417 let _ = conn.execute("ROLLBACK", []);
2420 Err(e)
2421 }
2422 }
2423}
2424
2425fn generate_id() -> String {
2426 use std::sync::atomic::{AtomicU32, Ordering};
2427 use std::time::{SystemTime, UNIX_EPOCH};
2428 static COUNTER: AtomicU32 = AtomicU32::new(0);
2429 let nanos = SystemTime::now()
2430 .duration_since(UNIX_EPOCH)
2431 .unwrap_or_default()
2432 .as_nanos();
2433 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2434 format!("{nanos:032x}{seq:08x}")
2435}
2436
2437fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2439 match val {
2440 serde_json::Value::Null => Box::new(rusqlite::types::Null),
2441 serde_json::Value::Bool(b) => Box::new(*b as i32),
2442 serde_json::Value::Number(n) => {
2443 if let Some(i) = n.as_i64() {
2444 Box::new(i)
2445 } else if let Some(f) = n.as_f64() {
2446 Box::new(f)
2447 } else {
2448 Box::new(n.to_string())
2449 }
2450 }
2451 serde_json::Value::String(s) => Box::new(s.clone()),
2452 other => Box::new(other.to_string()),
2453 }
2454}
2455
2456fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2471 let mut obj = serde_json::Map::new();
2472
2473 let stmt = row.as_ref();
2474 let count = stmt.column_count();
2475 for i in 0..count {
2476 let name = match stmt.column_name(i) {
2480 Ok(n) => n.to_string(),
2481 Err(_) => continue,
2482 };
2483 let value = if let Ok(s) = row.get::<_, String>(i) {
2484 serde_json::Value::String(s)
2485 } else if let Ok(n) = row.get::<_, i64>(i) {
2486 serde_json::Value::Number(serde_json::Number::from(n))
2487 } else if let Ok(f) = row.get::<_, f64>(i) {
2488 serde_json::Number::from_f64(f)
2489 .map(serde_json::Value::Number)
2490 .unwrap_or(serde_json::Value::Null)
2491 } else {
2492 serde_json::Value::Null
2493 };
2494 obj.insert(name, value);
2495 }
2496
2497 serde_json::Value::Object(obj)
2498}
2499
2500#[cfg(test)]
2501mod tests {
2502 use super::*;
2503 use pylon_kernel::{ManifestField, ManifestIndex};
2504
2505 fn test_manifest() -> AppManifest {
2506 AppManifest {
2507 manifest_version: 1,
2508 name: "Test".into(),
2509 version: "0.1.0".into(),
2510 entities: vec![pylon_kernel::ManifestEntity {
2511 name: "User".into(),
2512 fields: vec![
2513 ManifestField {
2514 name: "email".into(),
2515 field_type: "string".into(),
2516 optional: false,
2517 unique: true,
2518 crdt: None,
2519 },
2520 ManifestField {
2521 name: "displayName".into(),
2522 field_type: "string".into(),
2523 optional: false,
2524 unique: false,
2525 crdt: None,
2526 },
2527 ],
2528 indexes: vec![ManifestIndex {
2529 name: "user_email".into(),
2530 fields: vec!["email".into()],
2531 unique: true,
2532 }],
2533 relations: vec![],
2534 search: None,
2535 crdt: true,
2536 }],
2537 routes: vec![],
2538 queries: vec![],
2539 actions: vec![],
2540 policies: vec![],
2541 auth: Default::default(),
2542 }
2543 }
2544
2545 #[test]
2546 fn reset_for_tests_wipes_in_memory() {
2547 let rt = Runtime::in_memory(test_manifest()).unwrap();
2548 rt.insert(
2549 "User",
2550 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2551 )
2552 .unwrap();
2553 assert_eq!(rt.list("User").unwrap().len(), 1);
2554 rt.reset_for_tests().unwrap();
2555 assert_eq!(rt.list("User").unwrap().len(), 0);
2556 }
2557
2558 #[test]
2559 fn reset_for_tests_refuses_file_db() {
2560 let dir = std::env::temp_dir().join("pylon-reset-refuse");
2561 let _ = std::fs::create_dir_all(&dir);
2562 let db_path = dir.join("db.sqlite");
2563 let _ = std::fs::remove_file(&db_path);
2564 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2565 let err = rt.reset_for_tests().unwrap_err();
2566 assert_eq!(err.code, "RESET_REFUSED");
2567 let _ = std::fs::remove_file(&db_path);
2568 }
2569
2570 #[test]
2571 fn insert_and_get() {
2572 let rt = Runtime::in_memory(test_manifest()).unwrap();
2573 let id = rt
2574 .insert(
2575 "User",
2576 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2577 )
2578 .unwrap();
2579 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2580 assert_eq!(row["email"], "a@b.com");
2581 }
2582
2583 #[test]
2592 fn row_to_json_handles_columns_added_out_of_manifest_order() {
2593 let mut manifest = test_manifest();
2595 manifest.entities[0].fields = vec![
2596 ManifestField {
2597 name: "email".into(),
2598 field_type: "string".into(),
2599 optional: false,
2600 unique: true,
2601 crdt: None,
2602 },
2603 ManifestField {
2604 name: "displayName".into(),
2605 field_type: "string".into(),
2606 optional: false,
2607 unique: false,
2608 crdt: None,
2609 },
2610 ManifestField {
2611 name: "avatarColor".into(),
2612 field_type: "string".into(),
2613 optional: true,
2614 unique: false,
2615 crdt: None,
2616 },
2617 ManifestField {
2618 name: "createdAt".into(),
2619 field_type: "datetime".into(),
2620 optional: true,
2621 unique: false,
2622 crdt: None,
2623 },
2624 ];
2625 manifest.entities[0].crdt = false;
2632 let rt = Runtime::in_memory(manifest).unwrap();
2633 let id = rt
2634 .insert(
2635 "User",
2636 &serde_json::json!({
2637 "email": "a@b.com",
2638 "displayName": "Alice",
2639 "avatarColor": "#abc",
2640 "createdAt": "2026-01-01T00:00:00Z",
2641 }),
2642 )
2643 .unwrap();
2644
2645 {
2651 let conn = rt.lock_write_conn().unwrap();
2652 conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2653 .unwrap();
2654 conn.execute(
2655 "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2656 rusqlite::params!["hashed-password", &id],
2657 )
2658 .unwrap();
2659 }
2660 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2666 assert_eq!(row["email"], "a@b.com");
2670 assert_eq!(row["displayName"], "Alice");
2671 assert_eq!(row["avatarColor"], "#abc");
2672 assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2673 assert_eq!(row["passwordHash"], "hashed-password");
2674 }
2675
2676 #[test]
2681 fn crdt_default_writes_through_loro_store() {
2682 let rt = Runtime::in_memory(test_manifest()).unwrap();
2683 let id = rt
2684 .insert(
2685 "User",
2686 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2687 )
2688 .unwrap();
2689
2690 let conn = rt.lock_write_conn().unwrap();
2692 let snap_count: i64 = conn
2693 .query_row(
2694 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2695 WHERE entity = ?1 AND row_id = ?2",
2696 rusqlite::params!["User", &id],
2697 |r| r.get(0),
2698 )
2699 .unwrap();
2700 assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2701
2702 assert!(rt.crdt_store().cached_rows() >= 1);
2705
2706 drop(conn);
2708 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2709 assert_eq!(row["email"], "x@y.com");
2710 assert_eq!(row["displayName"], "Eric");
2711 }
2712
2713 #[test]
2717 fn crdt_update_persists_new_snapshot() {
2718 let rt = Runtime::in_memory(test_manifest()).unwrap();
2719 let id = rt
2720 .insert(
2721 "User",
2722 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2723 )
2724 .unwrap();
2725
2726 let snap_after_insert: Vec<u8> = {
2727 let conn = rt.lock_write_conn().unwrap();
2728 conn.query_row(
2729 "SELECT snapshot FROM _pylon_crdt_snapshots
2730 WHERE entity = 'User' AND row_id = ?1",
2731 rusqlite::params![&id],
2732 |r| r.get(0),
2733 )
2734 .unwrap()
2735 };
2736
2737 rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2738 .unwrap();
2739
2740 let snap_after_update: Vec<u8> = {
2741 let conn = rt.lock_write_conn().unwrap();
2742 conn.query_row(
2743 "SELECT snapshot FROM _pylon_crdt_snapshots
2744 WHERE entity = 'User' AND row_id = ?1",
2745 rusqlite::params![&id],
2746 |r| r.get(0),
2747 )
2748 .unwrap()
2749 };
2750
2751 assert_ne!(
2752 snap_after_insert, snap_after_update,
2753 "snapshot bytes should change after an update"
2754 );
2755
2756 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2757 assert_eq!(row["displayName"], "Eric C");
2758 assert_eq!(row["email"], "x@y.com");
2759 }
2760
2761 #[test]
2768 fn crdt_insert_rolls_back_when_sql_step_fails() {
2769 let rt = Runtime::in_memory(test_manifest()).unwrap();
2770 rt.insert(
2772 "User",
2773 &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2774 )
2775 .unwrap();
2776
2777 let snap_count_before: i64 = {
2779 let conn = rt.lock_write_conn().unwrap();
2780 conn.query_row(
2781 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2782 [],
2783 |r| r.get(0),
2784 )
2785 .unwrap()
2786 };
2787
2788 let err = rt
2790 .insert(
2791 "User",
2792 &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2793 )
2794 .expect_err("duplicate email must fail");
2795 assert_eq!(err.code, "INSERT_FAILED");
2796
2797 let snap_count_after: i64 = {
2800 let conn = rt.lock_write_conn().unwrap();
2801 conn.query_row(
2802 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2803 [],
2804 |r| r.get(0),
2805 )
2806 .unwrap()
2807 };
2808 assert_eq!(
2809 snap_count_after, snap_count_before,
2810 "failed insert should not leave a sidecar snapshot behind"
2811 );
2812 }
2813
2814 #[test]
2817 fn crdt_false_skips_loro_store() {
2818 let mut manifest = test_manifest();
2819 manifest.entities[0].crdt = false;
2821 let rt = Runtime::in_memory(manifest).unwrap();
2822
2823 let id = rt
2824 .insert(
2825 "User",
2826 &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2827 )
2828 .unwrap();
2829
2830 let conn = rt.lock_write_conn().unwrap();
2831 let snap_count: i64 = conn
2832 .query_row(
2833 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2834 WHERE entity = 'User' AND row_id = ?1",
2835 rusqlite::params![&id],
2836 |r| r.get(0),
2837 )
2838 .unwrap();
2839 assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2840 assert_eq!(
2841 rt.crdt_store().cached_rows(),
2842 0,
2843 "crdt:false should not warm the cache"
2844 );
2845
2846 drop(conn);
2849 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2850 assert_eq!(row["email"], "lww@example.com");
2851 }
2852
2853 #[test]
2854 fn list_entities() {
2855 let rt = Runtime::in_memory(test_manifest()).unwrap();
2856 rt.insert(
2857 "User",
2858 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2859 )
2860 .unwrap();
2861 rt.insert(
2862 "User",
2863 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2864 )
2865 .unwrap();
2866 let rows = rt.list("User").unwrap();
2867 assert_eq!(rows.len(), 2);
2868 }
2869
2870 #[test]
2871 fn update_entity() {
2872 let rt = Runtime::in_memory(test_manifest()).unwrap();
2873 let id = rt
2874 .insert(
2875 "User",
2876 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2877 )
2878 .unwrap();
2879 let updated = rt
2880 .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2881 .unwrap();
2882 assert!(updated);
2883 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2884 assert_eq!(row["displayName"], "Updated");
2885 }
2886
2887 #[test]
2888 fn delete_entity() {
2889 let rt = Runtime::in_memory(test_manifest()).unwrap();
2890 let id = rt
2891 .insert(
2892 "User",
2893 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2894 )
2895 .unwrap();
2896 let deleted = rt.delete("User", &id).unwrap();
2897 assert!(deleted);
2898 assert!(rt.get_by_id("User", &id).unwrap().is_none());
2899 }
2900
2901 #[test]
2902 fn lookup_by_field() {
2903 let rt = Runtime::in_memory(test_manifest()).unwrap();
2904 rt.insert(
2905 "User",
2906 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2907 )
2908 .unwrap();
2909 let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2910 assert_eq!(row["displayName"], "A");
2911 }
2912
2913 #[test]
2914 fn unknown_entity_returns_error() {
2915 let rt = Runtime::in_memory(test_manifest()).unwrap();
2916 let err = rt.list("Nonexistent").unwrap_err();
2917 assert_eq!(err.code, "ENTITY_NOT_FOUND");
2918 }
2919
2920 #[test]
2921 fn insert_rejects_unknown_column() {
2922 let rt = Runtime::in_memory(test_manifest()).unwrap();
2923 let err = rt
2924 .insert(
2925 "User",
2926 &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2927 )
2928 .unwrap_err();
2929 assert_eq!(err.code, "INVALID_COLUMN");
2930 }
2931
2932 #[test]
2933 fn update_rejects_unknown_column() {
2934 let rt = Runtime::in_memory(test_manifest()).unwrap();
2935 let id = rt
2936 .insert(
2937 "User",
2938 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2939 )
2940 .unwrap();
2941 let err = rt
2942 .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2943 .unwrap_err();
2944 assert_eq!(err.code, "INVALID_COLUMN");
2945 }
2946
2947 #[test]
2948 fn lookup_rejects_unknown_column() {
2949 let rt = Runtime::in_memory(test_manifest()).unwrap();
2950 let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2951 assert_eq!(err.code, "INVALID_COLUMN");
2952 }
2953
2954 #[test]
2955 fn query_filtered_rejects_unknown_column() {
2956 let rt = Runtime::in_memory(test_manifest()).unwrap();
2957 let err = rt
2958 .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2959 .unwrap_err();
2960 assert_eq!(err.code, "INVALID_COLUMN");
2961 }
2962
2963 #[test]
2964 fn query_filtered_rejects_unknown_order_column() {
2965 let rt = Runtime::in_memory(test_manifest()).unwrap();
2966 let err = rt
2967 .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2968 .unwrap_err();
2969 assert_eq!(err.code, "INVALID_COLUMN");
2970 }
2971
2972 #[test]
2973 fn query_filtered_sanitizes_order_direction() {
2974 let rt = Runtime::in_memory(test_manifest()).unwrap();
2975 rt.insert(
2976 "User",
2977 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2978 )
2979 .unwrap();
2980 let rows = rt
2982 .query_filtered(
2983 "User",
2984 &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2985 )
2986 .unwrap();
2987 assert_eq!(rows.len(), 1);
2988 }
2989
2990 #[test]
2991 fn in_memory_has_no_read_pool() {
2992 let rt = Runtime::in_memory(test_manifest()).unwrap();
2993 assert_eq!(rt.read_pool_size(), 0);
2994 }
2995
2996 #[test]
2997 fn open_creates_read_pool() {
2998 let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2999 std::fs::create_dir_all(&dir).unwrap();
3000 let db_path = dir.join("test_read_pool.db");
3001
3002 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
3003 assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
3004
3005 let id = rt
3007 .insert(
3008 "User",
3009 &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
3010 )
3011 .unwrap();
3012 let row = rt.get_by_id("User", &id).unwrap().unwrap();
3013 assert_eq!(row["email"], "pool@test.com");
3014
3015 let _ = std::fs::remove_dir_all(&dir);
3017 }
3018
3019 #[test]
3020 fn concurrent_reads_dont_block_on_write() {
3021 use std::sync::Arc;
3022
3023 let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
3024 std::fs::create_dir_all(&dir).unwrap();
3025 let db_path = dir.join("test_concurrent.db");
3026
3027 let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
3028
3029 rt.insert(
3031 "User",
3032 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
3033 )
3034 .unwrap();
3035 rt.insert(
3036 "User",
3037 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
3038 )
3039 .unwrap();
3040
3041 let write_guard = rt.lock_write_conn().unwrap();
3043
3044 let mut handles = Vec::new();
3046 for _ in 0..4 {
3047 let rt_clone = Arc::clone(&rt);
3048 handles.push(std::thread::spawn(move || {
3049 let rows = rt_clone.list("User").unwrap();
3050 assert_eq!(rows.len(), 2);
3051 }));
3052 }
3053
3054 for h in handles {
3055 h.join().expect("reader thread panicked");
3056 }
3057
3058 drop(write_guard);
3060
3061 let _ = std::fs::remove_dir_all(&dir);
3063 }
3064}