1pub mod cache_handlers;
2pub mod cache_server;
3pub mod config;
4pub mod cron;
5pub mod datastore;
6pub mod ip_limit;
7pub mod job_store;
8pub mod jobs;
9pub mod log;
10pub mod loro_store;
11pub mod metrics;
12pub mod oauth_backend;
13pub mod openapi;
14pub mod presence;
15pub mod pubsub;
16pub mod rate_limit;
17pub mod resp;
18pub mod resp_server;
19pub mod rooms;
20pub mod scheduler;
21pub mod server;
22pub mod session_backend;
23pub mod shard_ws;
24pub mod sse;
25pub mod tls;
26pub mod workflow_store;
27pub mod workflows;
28pub mod ws;
29
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Mutex;
33
34use pylon_kernel::{AppManifest, ManifestEntity};
35use rusqlite::Connection;
36
37#[derive(Debug, Clone)]
42pub struct RuntimeError {
43 pub code: String,
44 pub message: String,
45}
46
47impl std::fmt::Display for RuntimeError {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 write!(f, "[{}] {}", self.code, self.message)
50 }
51}
52
53impl std::error::Error for RuntimeError {}
54
55fn quote_ident(name: &str) -> String {
62 format!("\"{}\"", name.replace('"', "\"\""))
63}
64
65fn validate_column_name(name: &str, entity: &ManifestEntity) -> Result<(), RuntimeError> {
69 if name == "id" {
70 return Ok(());
71 }
72 if entity.fields.iter().any(|f| f.name == name) {
73 return Ok(());
74 }
75 Err(RuntimeError {
76 code: "INVALID_COLUMN".into(),
77 message: format!(
78 "Unknown column \"{}\" -- valid columns: id, {}",
79 name,
80 entity
81 .fields
82 .iter()
83 .map(|f| f.name.as_str())
84 .collect::<Vec<_>>()
85 .join(", ")
86 ),
87 })
88}
89
90fn tune_runtime_connection(conn: &Connection, in_memory: bool) {
102 let pragmas: &[(&str, &str)] = if in_memory {
103 &[
104 ("temp_store", "MEMORY"),
105 ("cache_size", "-65536"),
106 ("foreign_keys", "ON"),
107 ]
108 } else {
109 &[
110 ("journal_mode", "WAL"),
111 ("synchronous", "NORMAL"),
112 ("cache_size", "-65536"),
113 ("mmap_size", "268435456"),
114 ("temp_store", "MEMORY"),
115 ("busy_timeout", "5000"),
116 ("foreign_keys", "ON"),
117 ("wal_autocheckpoint", "1000"),
118 ]
119 };
120 for (key, value) in pragmas {
121 let _ = conn.pragma_update(None, key, value);
122 }
123}
124
125enum ReadConnGuard<'a> {
132 Pooled(std::sync::MutexGuard<'a, Connection>),
133 Write(std::sync::MutexGuard<'a, Connection>),
134}
135
136impl<'a> std::ops::Deref for ReadConnGuard<'a> {
137 type Target = Connection;
138 fn deref(&self) -> &Connection {
139 match self {
140 ReadConnGuard::Pooled(g) => g,
141 ReadConnGuard::Write(g) => g,
142 }
143 }
144}
145
146pub struct Runtime {
158 write_conn: Mutex<Connection>,
160 read_pool: Vec<Mutex<Connection>>,
163 read_counter: AtomicUsize,
165 manifest: AppManifest,
166 entities: HashMap<String, ManifestEntity>,
167 is_in_memory: bool,
170 crdt: crate::loro_store::LoroStore,
175}
176
177const READ_POOL_SIZE: usize = 4;
179
180impl Runtime {
181 pub fn open(db_path: &str, manifest: AppManifest) -> Result<Self, RuntimeError> {
183 let conn = Connection::open(db_path).map_err(|e| RuntimeError {
184 code: "RUNTIME_OPEN_FAILED".into(),
185 message: format!("Failed to open database: {e}"),
186 })?;
187 Self::from_connection(conn, manifest, false)
188 }
189
190 pub fn is_in_memory(&self) -> bool {
201 self.is_in_memory
202 }
203
204 pub fn db_path(&self) -> Option<String> {
209 if self.is_in_memory {
210 return None;
211 }
212 let conn = self.write_conn.lock().ok()?;
213 conn.path().filter(|p| !p.is_empty()).map(String::from)
214 }
215
216 pub fn reset_for_tests(&self) -> Result<(), RuntimeError> {
223 if !self.is_in_memory() {
224 return Err(RuntimeError {
225 code: "RESET_REFUSED".into(),
226 message: "reset_for_tests is only available on in-memory databases".into(),
227 });
228 }
229 let conn = self.lock_write_conn()?;
230 let entity_names: Vec<String> = self.entities.values().map(|e| e.name.clone()).collect();
231 for name in entity_names {
232 let sql = format!("DELETE FROM {}", quote_ident(&name));
233 let _ = conn.execute(&sql, []);
234 let fts_sql = format!("DELETE FROM {}", quote_ident(&format!("{name}_fts")));
236 let _ = conn.execute(&fts_sql, []);
237 }
238 Ok(())
239 }
240
241 pub fn in_memory(manifest: AppManifest) -> Result<Self, RuntimeError> {
243 let conn = Connection::open_in_memory().map_err(|e| RuntimeError {
244 code: "RUNTIME_OPEN_FAILED".into(),
245 message: format!("Failed to open in-memory database: {e}"),
246 })?;
247 Self::from_connection(conn, manifest, true)
248 }
249
250 fn from_connection(
251 conn: Connection,
252 manifest: AppManifest,
253 is_in_memory: bool,
254 ) -> Result<Self, RuntimeError> {
255 tune_runtime_connection(&conn, is_in_memory);
257
258 let entities: HashMap<String, ManifestEntity> = manifest
260 .entities
261 .iter()
262 .map(|e| (e.name.clone(), e.clone()))
263 .collect();
264
265 for entity in &manifest.entities {
267 let fields: Vec<String> = entity
268 .fields
269 .iter()
270 .map(|f| {
271 let col_type = match f.field_type.as_str() {
272 "int" => "INTEGER",
273 "float" => "REAL",
274 "bool" => "INTEGER",
275 _ => "TEXT",
276 };
277 let not_null = if f.optional { "" } else { " NOT NULL" };
278 let unique = if f.unique { " UNIQUE" } else { "" };
279 format!("{} {col_type}{not_null}{unique}", quote_ident(&f.name))
280 })
281 .collect();
282
283 let mut cols = vec!["\"id\" TEXT PRIMARY KEY NOT NULL".to_string()];
284 cols.extend(fields);
285 let sql = format!(
286 "CREATE TABLE IF NOT EXISTS {} ({})",
287 quote_ident(&entity.name),
288 cols.join(", ")
289 );
290 conn.execute(&sql, []).map_err(|e| RuntimeError {
291 code: "SCHEMA_INIT_FAILED".into(),
292 message: format!("Failed to create table {}: {e}", entity.name),
293 })?;
294
295 for idx in &entity.indexes {
297 let unique_kw = if idx.unique { "UNIQUE " } else { "" };
298 let quoted_fields: Vec<String> =
299 idx.fields.iter().map(|f| quote_ident(f)).collect();
300 let idx_sql = format!(
301 "CREATE {unique_kw}INDEX IF NOT EXISTS {} ON {} ({})",
302 quote_ident(&idx.name),
303 quote_ident(&entity.name),
304 quoted_fields.join(", ")
305 );
306 conn.execute(&idx_sql, []).ok();
307 }
308
309 let text_fields: Vec<&str> = entity
317 .fields
318 .iter()
319 .filter(|f| matches!(f.field_type.as_str(), "string" | "richtext" | "text"))
320 .map(|f| f.name.as_str())
321 .collect();
322 if !text_fields.is_empty() {
323 let fts_name = format!("{}_fts", entity.name);
324 let quoted_cols: Vec<String> = text_fields.iter().map(|f| quote_ident(f)).collect();
325 let fts_sql = format!(
326 "CREATE VIRTUAL TABLE IF NOT EXISTS {} USING fts5({}, content={}, content_rowid='rowid')",
327 quote_ident(&fts_name),
328 quoted_cols.join(", "),
329 quote_ident(&entity.name),
330 );
331 let fts_ok = conn.execute(&fts_sql, []).is_ok();
334
335 if fts_ok {
336 let tbl = quote_ident(&entity.name);
347 let ftb = quote_ident(&fts_name);
348 let cols_list = quoted_cols.join(", ");
349 let new_list: Vec<String> = text_fields
350 .iter()
351 .map(|f| format!("new.{}", quote_ident(f)))
352 .collect();
353 let old_list: Vec<String> = text_fields
354 .iter()
355 .map(|f| format!("old.{}", quote_ident(f)))
356 .collect();
357
358 let trigger_ai = quote_ident(&format!("{}_ai", fts_name));
359 let trigger_ad = quote_ident(&format!("{}_ad", fts_name));
360 let trigger_au = quote_ident(&format!("{}_au", fts_name));
361
362 let trigger_ins = format!(
363 "CREATE TRIGGER IF NOT EXISTS {trigger_ai} AFTER INSERT ON {tbl} BEGIN \
364 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
365 new_vals = new_list.join(", "),
366 );
367 let trigger_del = format!(
368 "CREATE TRIGGER IF NOT EXISTS {trigger_ad} AFTER DELETE ON {tbl} BEGIN \
369 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); END",
370 old_vals = old_list.join(", "),
371 );
372 let trigger_upd = format!(
373 "CREATE TRIGGER IF NOT EXISTS {trigger_au} AFTER UPDATE ON {tbl} BEGIN \
374 INSERT INTO {ftb}({ftb}, rowid, {cols_list}) VALUES('delete', old.rowid, {old_vals}); \
375 INSERT INTO {ftb}(rowid, {cols_list}) VALUES (new.rowid, {new_vals}); END",
376 new_vals = new_list.join(", "),
377 old_vals = old_list.join(", "),
378 );
379 for (label, sql) in [
382 ("ai", &trigger_ins),
383 ("ad", &trigger_del),
384 ("au", &trigger_upd),
385 ] {
386 if let Err(e) = conn.execute(sql, []) {
387 tracing::warn!(
388 "[fts] failed to create {label} trigger for {}: {e}",
389 entity.name
390 );
391 }
392 }
393 }
394 }
395 }
396
397 let db_path = conn.path().filter(|p| !p.is_empty()).map(|p| p.to_string());
401
402 let read_pool = if let Some(ref path) = db_path {
403 let mut pool = Vec::with_capacity(READ_POOL_SIZE);
404 for _ in 0..READ_POOL_SIZE {
405 let read_conn = Connection::open_with_flags(
406 path,
407 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
408 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
409 )
410 .map_err(|e| RuntimeError {
411 code: "POOL_OPEN_FAILED".into(),
412 message: format!("Failed to open read connection: {e}"),
413 })?;
414 tune_runtime_connection(&read_conn, false);
415 pool.push(Mutex::new(read_conn));
416 }
417 pool
418 } else {
419 Vec::new()
421 };
422
423 crate::loro_store::ensure_sidecar(&conn).map_err(|e| RuntimeError {
426 code: "CRDT_SIDECAR_FAILED".into(),
427 message: format!("create CRDT sidecar table: {e}"),
428 })?;
429
430 Ok(Self {
431 write_conn: Mutex::new(conn),
432 read_pool,
433 read_counter: AtomicUsize::new(0),
434 manifest,
435 entities,
436 is_in_memory,
437 crdt: crate::loro_store::LoroStore::new(),
438 })
439 }
440
441 pub fn ensure_search_indexes(&self) -> Result<(), RuntimeError> {
452 let conn = self.lock_write_conn()?;
453 conn.execute(pylon_storage::search::create_facet_table_sql(), [])
454 .map_err(|e| RuntimeError {
455 code: "FACET_TABLE_FAILED".into(),
456 message: format!("create _facet_bitmap: {e}"),
457 })?;
458 for entity in &self.manifest.entities {
459 if let Some(cfg) = &entity.search {
460 if let Some(sql) = pylon_storage::search::create_fts_table_sql(&entity.name, cfg) {
461 conn.execute(&sql, []).map_err(|e| RuntimeError {
462 code: "FTS_TABLE_FAILED".into(),
463 message: format!("create FTS table for {}: {e}", entity.name),
464 })?;
465 }
466 for field in &cfg.sortable {
467 let idx_sql = format!(
468 "CREATE INDEX IF NOT EXISTS \"{}_sort_{field}\" ON \"{}\" (\"{field}\")",
469 entity.name, entity.name,
470 );
471 conn.execute(&idx_sql, []).map_err(|e| RuntimeError {
472 code: "SORT_INDEX_FAILED".into(),
473 message: format!("create sort index for {}.{field}: {e}", entity.name),
474 })?;
475 }
476 }
477 }
478 Ok(())
479 }
480
481 pub fn manifest(&self) -> &AppManifest {
483 &self.manifest
484 }
485
486 pub fn lock_conn_pub(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
488 self.lock_write_conn()
489 }
490
491 pub fn read_pool_size(&self) -> usize {
493 self.read_pool.len()
494 }
495
496 pub(crate) fn crdt_fields_for(
505 &self,
506 ent: &ManifestEntity,
507 ) -> Result<Vec<pylon_crdt::CrdtField>, RuntimeError> {
508 let mut out = Vec::with_capacity(ent.fields.len());
509 for f in &ent.fields {
510 if f.name == "id" {
513 continue;
514 }
515 let kind = pylon_crdt::field_kind(&f.field_type, f.crdt).map_err(|e| RuntimeError {
516 code: "INVALID_CRDT_FIELD".into(),
517 message: format!(
518 "{}.{}: {e} (declared type={}, crdt={:?})",
519 ent.name, f.name, f.field_type, f.crdt
520 ),
521 })?;
522 out.push(pylon_crdt::CrdtField {
523 name: f.name.clone(),
524 kind,
525 });
526 }
527 Ok(out)
528 }
529
530 pub fn crdt_store(&self) -> &crate::loro_store::LoroStore {
533 &self.crdt
534 }
535
536 pub fn insert(&self, entity: &str, data: &serde_json::Value) -> Result<String, RuntimeError> {
549 let ent = self.require_entity(entity)?;
550 let conn = self.lock_write_conn()?;
551
552 let id = generate_id();
553
554 let obj = data.as_object().ok_or_else(|| RuntimeError {
555 code: "INVALID_DATA".into(),
556 message: "Insert data must be a JSON object".into(),
557 })?;
558
559 for key in obj.keys() {
562 if key != "id" {
563 validate_column_name(key, ent)?;
564 }
565 }
566
567 with_write_tx(&conn, || {
571 if ent.crdt {
572 let crdt_fields = self.crdt_fields_for(ent)?;
573 self.crdt
574 .apply_patch(&conn, entity, &id, &crdt_fields, data)
575 .map_err(|e| RuntimeError {
576 code: "CRDT_APPLY_FAILED".into(),
577 message: format!("crdt write {entity}/{id}: {e}"),
578 })?;
579 }
580
581 let mut col_names = vec![quote_ident("id")];
582 let mut placeholders = vec!["?1".to_string()];
583 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
584
585 let mut idx = 2;
586 for (key, val) in obj {
587 if key == "id" {
588 continue;
589 }
590 col_names.push(quote_ident(key));
591 placeholders.push(format!("?{idx}"));
592 values.push(json_to_sql(val));
593 idx += 1;
594 }
595
596 let sql = format!(
597 "INSERT INTO {} ({}) VALUES ({})",
598 quote_ident(entity),
599 col_names.join(", "),
600 placeholders.join(", ")
601 );
602
603 let params: Vec<&dyn rusqlite::types::ToSql> =
604 values.iter().map(|v| v.as_ref()).collect();
605 conn.execute(&sql, params.as_slice())
606 .map_err(|e| RuntimeError {
607 code: "INSERT_FAILED".into(),
608 message: format!("Insert into {entity} failed: {e}"),
609 })?;
610
611 if let Some(cfg) = ent.search.as_ref() {
615 if !cfg.is_empty() {
616 pylon_storage::search_maintenance::apply_insert(&conn, entity, &id, data, cfg)
617 .map_err(|e| RuntimeError {
618 code: "SEARCH_MAINTENANCE_FAILED".into(),
619 message: format!("search index update on insert {entity}: {e}"),
620 })?;
621 }
622 }
623 Ok(())
624 })?;
625
626 Ok(id)
627 }
628
629 pub fn get_by_id(
631 &self,
632 entity: &str,
633 id: &str,
634 ) -> Result<Option<serde_json::Value>, RuntimeError> {
635 let ent = self.require_entity(entity)?;
636 let conn = self.lock_read_conn()?;
637
638 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
639 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
640 code: "QUERY_FAILED".into(),
641 message: format!("Failed to prepare query: {e}"),
642 })?;
643
644 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
645
646 let result = stmt
647 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
648 .ok();
649
650 Ok(result)
651 }
652
653 pub fn list(&self, entity: &str) -> Result<Vec<serde_json::Value>, RuntimeError> {
655 let ent = self.require_entity(entity)?;
656 let conn = self.lock_read_conn()?;
657
658 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
659 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
660 code: "QUERY_FAILED".into(),
661 message: format!("Failed to prepare query: {e}"),
662 })?;
663
664 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
665
666 let rows = stmt
667 .query_map([], |row| Ok(row_to_json(row, &columns)))
668 .map_err(|e| RuntimeError {
669 code: "QUERY_FAILED".into(),
670 message: format!("Query failed: {e}"),
671 })?;
672
673 let mut result = Vec::new();
674 for row in rows {
675 if let Ok(val) = row {
676 result.push(val);
677 }
678 }
679 Ok(result)
680 }
681
682 pub fn list_after(
684 &self,
685 entity: &str,
686 after: Option<&str>,
687 limit: usize,
688 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
689 let ent = self.require_entity(entity)?;
690 let conn = self.lock_read_conn()?;
691
692 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
693 let table = quote_ident(entity);
694
695 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
696 Some(cursor) => (
697 format!(
698 "SELECT * FROM {} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2",
699 table
700 ),
701 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
702 ),
703 None => (
704 format!("SELECT * FROM {} ORDER BY \"id\" LIMIT ?1", table),
705 vec![Box::new(limit as i64)],
706 ),
707 };
708
709 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
710 params.iter().map(|v| v.as_ref()).collect();
711
712 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
713 code: "QUERY_FAILED".into(),
714 message: format!("Failed to prepare query: {e}"),
715 })?;
716
717 let rows = stmt
718 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
719 .map_err(|e| RuntimeError {
720 code: "QUERY_FAILED".into(),
721 message: format!("Query failed: {e}"),
722 })?;
723
724 let mut result = Vec::new();
725 for row in rows {
726 if let Ok(val) = row {
727 result.push(val);
728 }
729 }
730 Ok(result)
731 }
732
733 pub fn update(
739 &self,
740 entity: &str,
741 id: &str,
742 data: &serde_json::Value,
743 ) -> Result<bool, RuntimeError> {
744 let ent = self.require_entity(entity)?;
745 let conn = self.lock_write_conn()?;
746
747 let obj = data.as_object().ok_or_else(|| RuntimeError {
748 code: "INVALID_DATA".into(),
749 message: "Update data must be a JSON object".into(),
750 })?;
751
752 for key in obj.keys() {
754 if key != "id" {
755 validate_column_name(key, ent)?;
756 }
757 }
758 let writable_keys: Vec<&String> = obj.keys().filter(|k| *k != "id").collect();
759 if writable_keys.is_empty() {
760 return Ok(false);
761 }
762
763 let affected = with_write_tx(&conn, || -> Result<i64, RuntimeError> {
766 if ent.crdt {
767 let crdt_fields = self.crdt_fields_for(ent)?;
768 self.crdt
769 .apply_patch(&conn, entity, id, &crdt_fields, data)
770 .map_err(|e| RuntimeError {
771 code: "CRDT_APPLY_FAILED".into(),
772 message: format!("crdt write {entity}/{id}: {e}"),
773 })?;
774 }
775
776 let mut set_clauses = Vec::new();
777 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
778 let mut idx = 1;
779 for key in &writable_keys {
780 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
781 values.push(json_to_sql(&obj[key.as_str()]));
782 idx += 1;
783 }
784
785 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
789 let old_row = if searchable {
790 self.get_by_id_with_conn(&conn, entity, id)?
791 } else {
792 None
793 };
794
795 values.push(Box::new(id.to_string()));
796 let sql = format!(
797 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
798 quote_ident(entity),
799 set_clauses.join(", ")
800 );
801
802 let params: Vec<&dyn rusqlite::types::ToSql> =
803 values.iter().map(|v| v.as_ref()).collect();
804 let affected = conn
805 .execute(&sql, params.as_slice())
806 .map_err(|e| RuntimeError {
807 code: "UPDATE_FAILED".into(),
808 message: format!("Update {entity}/{id} failed: {e}"),
809 })? as i64;
810
811 if affected > 0 && searchable {
812 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
813 pylon_storage::search_maintenance::apply_update(
814 &conn, entity, id, &old, data, cfg,
815 )
816 .map_err(|e| RuntimeError {
817 code: "SEARCH_MAINTENANCE_FAILED".into(),
818 message: format!("search index update on update {entity}: {e}"),
819 })?;
820 }
821 }
822 Ok(affected)
823 })?;
824
825 Ok(affected > 0)
826 }
827
828 pub fn delete(&self, entity: &str, id: &str) -> Result<bool, RuntimeError> {
830 let ent = self.require_entity(entity)?;
831 let conn = self.lock_write_conn()?;
832
833 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
836 if searchable {
837 if let (Some(cfg), Ok(Some(row))) = (
838 ent.search.as_ref(),
839 self.get_by_id_with_conn(&conn, entity, id),
840 ) {
841 pylon_storage::search_maintenance::apply_delete(&conn, entity, id, &row, cfg)
842 .map_err(|e| RuntimeError {
843 code: "SEARCH_MAINTENANCE_FAILED".into(),
844 message: format!("search index update on delete {entity}: {e}"),
845 })?;
846 }
847 }
848
849 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
850 let affected = conn
851 .execute(&sql, rusqlite::params![id])
852 .map_err(|e| RuntimeError {
853 code: "DELETE_FAILED".into(),
854 message: format!("Delete {entity}/{id} failed: {e}"),
855 })?;
856
857 Ok(affected > 0)
858 }
859
860 pub fn lookup(
862 &self,
863 entity: &str,
864 field: &str,
865 value: &str,
866 ) -> Result<Option<serde_json::Value>, RuntimeError> {
867 let ent = self.require_entity(entity)?;
868 validate_column_name(field, ent)?;
869 let conn = self.lock_read_conn()?;
870
871 let sql = format!(
872 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
873 quote_ident(entity),
874 quote_ident(field)
875 );
876 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
877
878 let result = conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
879 stmt.query_row(rusqlite::params![value], |row| {
880 Ok(row_to_json(row, &columns))
881 })
882 .ok()
883 });
884
885 Ok(result)
886 }
887
888 pub fn link(
890 &self,
891 entity: &str,
892 id: &str,
893 relation: &str,
894 target_id: &str,
895 ) -> Result<bool, RuntimeError> {
896 let ent = self.require_entity(entity)?;
897
898 let rel = ent
900 .relations
901 .iter()
902 .find(|r| r.name == relation)
903 .ok_or_else(|| RuntimeError {
904 code: "RELATION_NOT_FOUND".into(),
905 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
906 })?;
907
908 let data = serde_json::json!({ rel.field.clone(): target_id });
909 self.update(entity, id, &data)
910 }
911
912 pub fn unlink(&self, entity: &str, id: &str, relation: &str) -> Result<bool, RuntimeError> {
914 let ent = self.require_entity(entity)?;
915
916 let rel = ent
917 .relations
918 .iter()
919 .find(|r| r.name == relation)
920 .ok_or_else(|| RuntimeError {
921 code: "RELATION_NOT_FOUND".into(),
922 message: format!("Relation \"{relation}\" not found on entity \"{entity}\""),
923 })?;
924
925 let data = serde_json::json!({ rel.field.clone(): null });
926 self.update(entity, id, &data)
927 }
928
929 pub fn query_filtered(
931 &self,
932 entity: &str,
933 filter: &serde_json::Value,
934 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
935 let ent = self.require_entity(entity)?;
936 let conn = self.lock_read_conn()?;
937
938 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
939 let obj = filter
940 .as_object()
941 .unwrap_or(&serde_json::Map::new())
942 .clone();
943
944 let mut where_clauses = Vec::new();
945 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
946 let mut order_clause = String::new();
947 let mut limit_clause = String::new();
948 let mut join_clause = String::new();
949 let mut fts_order = false;
950 let mut idx = 1;
951
952 for (key, val) in &obj {
953 match key.as_str() {
954 "$order" => {
955 if let Some(order_obj) = val.as_object() {
956 let mut parts: Vec<String> = Vec::new();
957 for (col, dir) in order_obj {
958 validate_column_name(col, ent)?;
959 let d = match dir.as_str().unwrap_or("asc") {
960 "desc" | "DESC" => "DESC",
961 _ => "ASC",
962 };
963 parts.push(format!("{} {d}", quote_ident(col)));
964 }
965 if !parts.is_empty() {
966 order_clause = format!(" ORDER BY {}", parts.join(", "));
967 }
968 }
969 }
970 "$limit" => {
971 if let Some(n) = val.as_u64() {
972 limit_clause = format!(" LIMIT {n}");
973 }
974 }
975 "$offset" => {
976 if let Some(n) = val.as_u64() {
977 if limit_clause.is_empty() {
979 limit_clause = " LIMIT -1".into();
980 }
981 limit_clause = format!("{limit_clause} OFFSET {n}");
982 }
983 }
984 "$search" => {
985 if let Some(q) = val.as_str() {
986 let fts = format!("{}_fts", entity);
988 join_clause = format!(
989 " JOIN {fts} ON {ent}.rowid = {fts}.rowid",
990 fts = quote_ident(&fts),
991 ent = quote_ident(entity),
992 );
993 where_clauses.push(format!("{} MATCH ?{idx}", quote_ident(&fts)));
994 values.push(Box::new(q.to_string()));
995 fts_order = true;
996 idx += 1;
997 }
998 }
999 _ => {
1000 validate_column_name(key, ent)?;
1001 let quoted_key = quote_ident(key);
1002
1003 if let Some(op_obj) = val.as_object() {
1004 for (op, op_val) in op_obj {
1005 match op.as_str() {
1006 "$not" => {
1007 where_clauses.push(format!("{quoted_key} != ?{idx}"));
1008 values.push(json_to_sql(op_val));
1009 idx += 1;
1010 }
1011 "$gt" => {
1012 where_clauses.push(format!("{quoted_key} > ?{idx}"));
1013 values.push(json_to_sql(op_val));
1014 idx += 1;
1015 }
1016 "$gte" => {
1017 where_clauses.push(format!("{quoted_key} >= ?{idx}"));
1018 values.push(json_to_sql(op_val));
1019 idx += 1;
1020 }
1021 "$lt" => {
1022 where_clauses.push(format!("{quoted_key} < ?{idx}"));
1023 values.push(json_to_sql(op_val));
1024 idx += 1;
1025 }
1026 "$lte" => {
1027 where_clauses.push(format!("{quoted_key} <= ?{idx}"));
1028 values.push(json_to_sql(op_val));
1029 idx += 1;
1030 }
1031 "$like" => {
1032 where_clauses.push(format!("{quoted_key} LIKE ?{idx}"));
1033 let pattern = format!("%{}%", op_val.as_str().unwrap_or(""));
1034 values.push(Box::new(pattern));
1035 idx += 1;
1036 }
1037 "$in" => {
1038 if let Some(arr) = op_val.as_array() {
1039 let placeholders: Vec<String> = arr
1040 .iter()
1041 .map(|v| {
1042 let p = format!("?{idx}");
1043 values.push(json_to_sql(v));
1044 idx += 1;
1045 p
1046 })
1047 .collect();
1048 if !placeholders.is_empty() {
1049 where_clauses.push(format!(
1050 "{quoted_key} IN ({})",
1051 placeholders.join(", ")
1052 ));
1053 }
1054 }
1055 }
1056 _ => {}
1057 }
1058 }
1059 } else {
1060 where_clauses.push(format!("{quoted_key} = ?{idx}"));
1062 values.push(json_to_sql(val));
1063 idx += 1;
1064 }
1065 }
1066 }
1067 }
1068
1069 let where_sql = if where_clauses.is_empty() {
1070 String::new()
1071 } else {
1072 format!(" WHERE {}", where_clauses.join(" AND "))
1073 };
1074
1075 if order_clause.is_empty() {
1076 order_clause = if fts_order {
1077 " ORDER BY bm25(".to_string() + "e_ident(&format!("{}_fts", entity)) + ")"
1079 } else {
1080 format!(" ORDER BY {}.\"id\"", quote_ident(entity))
1081 };
1082 }
1083
1084 let select_prefix = format!("{}.*", quote_ident(entity));
1085 let sql = format!(
1086 "SELECT {} FROM {}{}{}{}{}",
1087 select_prefix,
1088 quote_ident(entity),
1089 join_clause,
1090 where_sql,
1091 order_clause,
1092 limit_clause
1093 );
1094 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1095 values.iter().map(|v| v.as_ref()).collect();
1096
1097 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1098 code: "QUERY_FAILED".into(),
1099 message: format!("Failed to prepare filtered query: {e}"),
1100 })?;
1101
1102 let rows = stmt
1103 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1104 .map_err(|e| RuntimeError {
1105 code: "QUERY_FAILED".into(),
1106 message: format!("Filtered query failed: {e}"),
1107 })?;
1108
1109 let mut result = Vec::new();
1110 for row in rows {
1111 if let Ok(val) = row {
1112 result.push(val);
1113 }
1114 }
1115 Ok(result)
1116 }
1117
1118 pub fn query_graph(
1123 &self,
1124 query: &serde_json::Value,
1125 ) -> Result<serde_json::Value, RuntimeError> {
1126 let obj = query.as_object().ok_or_else(|| RuntimeError {
1127 code: "INVALID_QUERY".into(),
1128 message: "Graph query must be a JSON object".into(),
1129 })?;
1130
1131 let mut results = serde_json::Map::new();
1132
1133 for (entity_name, query_opts) in obj {
1134 let _ent = self.require_entity(entity_name)?;
1135
1136 let filter = query_opts
1138 .get("where")
1139 .cloned()
1140 .unwrap_or(serde_json::json!({}));
1141 let rows = self.query_filtered(entity_name, &filter)?;
1142
1143 let rows = if let Some(include) = query_opts.get("include").and_then(|v| v.as_object())
1145 {
1146 let ent = self.entities.get(entity_name).ok_or_else(|| RuntimeError {
1152 code: "INVARIANT_BROKEN".into(),
1153 message: format!(
1154 "entity \"{entity_name}\" missing from registry during include expansion"
1155 ),
1156 })?;
1157 rows.into_iter()
1158 .map(|mut row| {
1159 for (rel_name, _sub_query) in include {
1160 if let Some(rel) = ent.relations.iter().find(|r| r.name == *rel_name) {
1161 let fk_value = row
1162 .get(&rel.field)
1163 .and_then(|v| v.as_str())
1164 .map(|s| s.to_string());
1165 if let Some(fk) = fk_value {
1166 if rel.many {
1167 let sub_filter = serde_json::json!({ &rel.field: &fk });
1169 if let Ok(related) =
1170 self.query_filtered(&rel.target, &sub_filter)
1171 {
1172 row[rel_name] = serde_json::json!(related);
1173 }
1174 } else {
1175 if let Ok(Some(related)) = self.get_by_id(&rel.target, &fk)
1177 {
1178 row[rel_name] = related;
1179 }
1180 }
1181 }
1182 }
1183 }
1184 row
1185 })
1186 .collect()
1187 } else {
1188 rows
1189 };
1190
1191 let rows = if let Some(limit) = query_opts.get("limit").and_then(|v| v.as_u64()) {
1193 rows.into_iter().take(limit as usize).collect()
1194 } else {
1195 rows
1196 };
1197
1198 results.insert(entity_name.clone(), serde_json::json!(rows));
1199 }
1200
1201 Ok(serde_json::Value::Object(results))
1202 }
1203
1204 pub fn insert_with_conn(
1210 &self,
1211 conn: &Connection,
1212 entity: &str,
1213 data: &serde_json::Value,
1214 ) -> Result<String, RuntimeError> {
1215 let ent = self.require_entity(entity)?;
1216 let id = generate_id();
1217 let obj = data.as_object().ok_or_else(|| RuntimeError {
1218 code: "INVALID_DATA".into(),
1219 message: "Insert data must be a JSON object".into(),
1220 })?;
1221
1222 let mut col_names = vec![quote_ident("id")];
1223 let mut placeholders = vec!["?1".to_string()];
1224 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(id.clone())];
1225 let mut idx = 2;
1226 for (key, val) in obj {
1227 if key == "id" {
1228 continue;
1229 }
1230 validate_column_name(key, ent)?;
1231 col_names.push(quote_ident(key));
1232 placeholders.push(format!("?{idx}"));
1233 values.push(json_to_sql(val));
1234 idx += 1;
1235 }
1236
1237 let sql = format!(
1238 "INSERT INTO {} ({}) VALUES ({})",
1239 quote_ident(entity),
1240 col_names.join(", "),
1241 placeholders.join(", ")
1242 );
1243 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1244 conn.execute(&sql, params.as_slice())
1245 .map_err(|e| RuntimeError {
1246 code: "INSERT_FAILED".into(),
1247 message: format!("Insert into {entity} failed: {e}"),
1248 })?;
1249
1250 if let Some(cfg) = ent.search.as_ref() {
1253 if !cfg.is_empty() {
1254 pylon_storage::search_maintenance::apply_insert(conn, entity, &id, data, cfg)
1255 .map_err(|e| RuntimeError {
1256 code: "SEARCH_MAINTENANCE_FAILED".into(),
1257 message: format!("search index update on insert {entity}: {e}"),
1258 })?;
1259 }
1260 }
1261
1262 Ok(id)
1263 }
1264
1265 pub fn update_with_conn(
1267 &self,
1268 conn: &Connection,
1269 entity: &str,
1270 id: &str,
1271 data: &serde_json::Value,
1272 ) -> Result<bool, RuntimeError> {
1273 let ent = self.require_entity(entity)?;
1274 let obj = data.as_object().ok_or_else(|| RuntimeError {
1275 code: "INVALID_DATA".into(),
1276 message: "Update data must be a JSON object".into(),
1277 })?;
1278
1279 let mut set_clauses = Vec::new();
1280 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1281 let mut idx = 1;
1282 for (key, val) in obj {
1283 if key == "id" {
1284 continue;
1285 }
1286 validate_column_name(key, ent)?;
1287 set_clauses.push(format!("{} = ?{idx}", quote_ident(key)));
1288 values.push(json_to_sql(val));
1289 idx += 1;
1290 }
1291 if set_clauses.is_empty() {
1292 return Ok(false);
1293 }
1294
1295 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1300 let old_row = if searchable {
1301 self.get_by_id_with_conn(conn, entity, id)?
1302 } else {
1303 None
1304 };
1305
1306 values.push(Box::new(id.to_string()));
1307 let sql = format!(
1308 "UPDATE {} SET {} WHERE \"id\" = ?{idx}",
1309 quote_ident(entity),
1310 set_clauses.join(", ")
1311 );
1312 let params: Vec<&dyn rusqlite::types::ToSql> = values.iter().map(|v| v.as_ref()).collect();
1313 let affected = conn
1314 .execute(&sql, params.as_slice())
1315 .map_err(|e| RuntimeError {
1316 code: "UPDATE_FAILED".into(),
1317 message: format!("Update {entity}/{id} failed: {e}"),
1318 })?;
1319
1320 if affected > 0 && searchable {
1321 if let (Some(cfg), Some(old)) = (ent.search.as_ref(), old_row) {
1322 pylon_storage::search_maintenance::apply_update(conn, entity, id, &old, data, cfg)
1323 .map_err(|e| RuntimeError {
1324 code: "SEARCH_MAINTENANCE_FAILED".into(),
1325 message: format!("search index update on update {entity}: {e}"),
1326 })?;
1327 }
1328 }
1329
1330 Ok(affected > 0)
1331 }
1332
1333 pub fn delete_with_conn(
1335 &self,
1336 conn: &Connection,
1337 entity: &str,
1338 id: &str,
1339 ) -> Result<bool, RuntimeError> {
1340 let ent = self.require_entity(entity)?;
1341
1342 let searchable = ent.search.as_ref().map(|c| !c.is_empty()).unwrap_or(false);
1345 if searchable {
1346 if let (Some(cfg), Ok(Some(row))) = (
1347 ent.search.as_ref(),
1348 self.get_by_id_with_conn(conn, entity, id),
1349 ) {
1350 pylon_storage::search_maintenance::apply_delete(conn, entity, id, &row, cfg)
1351 .map_err(|e| RuntimeError {
1352 code: "SEARCH_MAINTENANCE_FAILED".into(),
1353 message: format!("search index update on delete {entity}: {e}"),
1354 })?;
1355 }
1356 }
1357
1358 let sql = format!("DELETE FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1359 let affected = conn
1360 .execute(&sql, rusqlite::params![id])
1361 .map_err(|e| RuntimeError {
1362 code: "DELETE_FAILED".into(),
1363 message: format!("Delete {entity}/{id} failed: {e}"),
1364 })?;
1365 Ok(affected > 0)
1366 }
1367
1368 pub fn get_by_id_with_conn(
1370 &self,
1371 conn: &Connection,
1372 entity: &str,
1373 id: &str,
1374 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1375 let ent = self.require_entity(entity)?;
1376 let sql = format!("SELECT * FROM {} WHERE \"id\" = ?1", quote_ident(entity));
1377 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1378 code: "QUERY_FAILED".into(),
1379 message: format!("Failed to prepare query: {e}"),
1380 })?;
1381 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1382 Ok(stmt
1383 .query_row(rusqlite::params![id], |row| Ok(row_to_json(row, &columns)))
1384 .ok())
1385 }
1386
1387 pub fn list_with_conn(
1389 &self,
1390 conn: &Connection,
1391 entity: &str,
1392 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1393 let ent = self.require_entity(entity)?;
1394 let sql = format!("SELECT * FROM {} ORDER BY \"id\"", quote_ident(entity));
1395 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1396 code: "QUERY_FAILED".into(),
1397 message: format!("Failed to prepare query: {e}"),
1398 })?;
1399 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1400 let rows = stmt
1401 .query_map([], |row| Ok(row_to_json(row, &columns)))
1402 .map_err(|e| RuntimeError {
1403 code: "QUERY_FAILED".into(),
1404 message: format!("Query failed: {e}"),
1405 })?;
1406 Ok(rows.flatten().collect())
1407 }
1408
1409 pub fn list_after_with_conn(
1411 &self,
1412 conn: &Connection,
1413 entity: &str,
1414 after: Option<&str>,
1415 limit: usize,
1416 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1417 let ent = self.require_entity(entity)?;
1418 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1419 let table = quote_ident(entity);
1420 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match after {
1421 Some(cursor) => (
1422 format!("SELECT * FROM {table} WHERE \"id\" > ?1 ORDER BY \"id\" LIMIT ?2"),
1423 vec![Box::new(cursor.to_string()), Box::new(limit as i64)],
1424 ),
1425 None => (
1426 format!("SELECT * FROM {table} ORDER BY \"id\" LIMIT ?1"),
1427 vec![Box::new(limit as i64)],
1428 ),
1429 };
1430 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1431 params.iter().map(|v| v.as_ref()).collect();
1432 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1433 code: "QUERY_FAILED".into(),
1434 message: format!("Failed to prepare: {e}"),
1435 })?;
1436 let rows = stmt
1437 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1438 .map_err(|e| RuntimeError {
1439 code: "QUERY_FAILED".into(),
1440 message: format!("Query failed: {e}"),
1441 })?;
1442 Ok(rows.flatten().collect())
1443 }
1444
1445 pub fn lookup_with_conn(
1447 &self,
1448 conn: &Connection,
1449 entity: &str,
1450 field: &str,
1451 value: &str,
1452 ) -> Result<Option<serde_json::Value>, RuntimeError> {
1453 let ent = self.require_entity(entity)?;
1454 validate_column_name(field, ent)?;
1455 let sql = format!(
1456 "SELECT * FROM {} WHERE {} = ?1 LIMIT 1",
1457 quote_ident(entity),
1458 quote_ident(field)
1459 );
1460 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1461 Ok(conn.prepare_cached(&sql).ok().and_then(|mut stmt| {
1462 stmt.query_row(rusqlite::params![value], |row| {
1463 Ok(row_to_json(row, &columns))
1464 })
1465 .ok()
1466 }))
1467 }
1468
1469 pub fn link_with_conn(
1471 &self,
1472 conn: &Connection,
1473 entity: &str,
1474 id: &str,
1475 relation: &str,
1476 target_id: &str,
1477 ) -> Result<bool, RuntimeError> {
1478 let ent = self.require_entity(entity)?;
1479 let rel = ent
1480 .relations
1481 .iter()
1482 .find(|r| r.name == relation)
1483 .ok_or_else(|| RuntimeError {
1484 code: "RELATION_NOT_FOUND".into(),
1485 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1486 })?;
1487 let data = serde_json::json!({ rel.field.clone(): target_id });
1488 self.update_with_conn(conn, entity, id, &data)
1489 }
1490
1491 pub fn unlink_with_conn(
1493 &self,
1494 conn: &Connection,
1495 entity: &str,
1496 id: &str,
1497 relation: &str,
1498 ) -> Result<bool, RuntimeError> {
1499 let ent = self.require_entity(entity)?;
1500 let rel = ent
1501 .relations
1502 .iter()
1503 .find(|r| r.name == relation)
1504 .ok_or_else(|| RuntimeError {
1505 code: "RELATION_NOT_FOUND".into(),
1506 message: format!("Relation \"{relation}\" not found on \"{entity}\""),
1507 })?;
1508 let data = serde_json::json!({ rel.field.clone(): serde_json::Value::Null });
1509 self.update_with_conn(conn, entity, id, &data)
1510 }
1511
1512 pub fn query_filtered_with_conn(
1517 &self,
1518 conn: &Connection,
1519 entity: &str,
1520 filter: &serde_json::Value,
1521 ) -> Result<Vec<serde_json::Value>, RuntimeError> {
1522 let ent = self.require_entity(entity)?;
1523 let columns: Vec<String> = ent.fields.iter().map(|f| f.name.clone()).collect();
1524 let empty = serde_json::Map::new();
1525 let obj = filter.as_object().unwrap_or(&empty);
1526
1527 let mut where_clauses = Vec::new();
1528 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1529 let mut order_clause = String::new();
1530 let mut limit_clause = String::new();
1531 let mut idx = 1;
1532
1533 for (key, val) in obj {
1534 match key.as_str() {
1535 "$order" => {
1536 if let Some(o) = val.as_object() {
1537 let mut parts: Vec<String> = Vec::new();
1538 for (col, dir) in o {
1539 validate_column_name(col, ent)?;
1540 let d = match dir.as_str().unwrap_or("asc") {
1541 "desc" | "DESC" => "DESC",
1542 _ => "ASC",
1543 };
1544 parts.push(format!("{} {d}", quote_ident(col)));
1545 }
1546 if !parts.is_empty() {
1547 order_clause = format!(" ORDER BY {}", parts.join(", "));
1548 }
1549 }
1550 }
1551 "$limit" => {
1552 if let Some(n) = val.as_u64() {
1553 limit_clause = format!(" LIMIT {n}");
1554 }
1555 }
1556 "$offset" => {
1557 if let Some(n) = val.as_u64() {
1558 if limit_clause.is_empty() {
1559 limit_clause = " LIMIT -1".into();
1560 }
1561 limit_clause = format!("{limit_clause} OFFSET {n}");
1562 }
1563 }
1564 _ => {
1565 validate_column_name(key, ent)?;
1566 let qk = quote_ident(key);
1567 if let Some(op_obj) = val.as_object() {
1568 for (op, op_val) in op_obj {
1569 match op.as_str() {
1570 "$not" => {
1571 where_clauses.push(format!("{qk} != ?{idx}"));
1572 values.push(json_to_sql(op_val));
1573 idx += 1;
1574 }
1575 "$gt" => {
1576 where_clauses.push(format!("{qk} > ?{idx}"));
1577 values.push(json_to_sql(op_val));
1578 idx += 1;
1579 }
1580 "$gte" => {
1581 where_clauses.push(format!("{qk} >= ?{idx}"));
1582 values.push(json_to_sql(op_val));
1583 idx += 1;
1584 }
1585 "$lt" => {
1586 where_clauses.push(format!("{qk} < ?{idx}"));
1587 values.push(json_to_sql(op_val));
1588 idx += 1;
1589 }
1590 "$lte" => {
1591 where_clauses.push(format!("{qk} <= ?{idx}"));
1592 values.push(json_to_sql(op_val));
1593 idx += 1;
1594 }
1595 "$like" => {
1596 where_clauses.push(format!("{qk} LIKE ?{idx}"));
1597 let p = format!("%{}%", op_val.as_str().unwrap_or(""));
1598 values.push(Box::new(p));
1599 idx += 1;
1600 }
1601 "$in" => {
1602 if let Some(arr) = op_val.as_array() {
1603 let ph: Vec<String> = arr
1604 .iter()
1605 .map(|v| {
1606 let p = format!("?{idx}");
1607 values.push(json_to_sql(v));
1608 idx += 1;
1609 p
1610 })
1611 .collect();
1612 if !ph.is_empty() {
1613 where_clauses
1614 .push(format!("{qk} IN ({})", ph.join(", ")));
1615 }
1616 }
1617 }
1618 _ => {}
1619 }
1620 }
1621 } else {
1622 where_clauses.push(format!("{qk} = ?{idx}"));
1623 values.push(json_to_sql(val));
1624 idx += 1;
1625 }
1626 }
1627 }
1628 }
1629
1630 let where_sql = if where_clauses.is_empty() {
1631 String::new()
1632 } else {
1633 format!(" WHERE {}", where_clauses.join(" AND "))
1634 };
1635 if order_clause.is_empty() {
1636 order_clause = " ORDER BY \"id\"".into();
1637 }
1638
1639 let sql = format!(
1640 "SELECT * FROM {}{}{}{}",
1641 quote_ident(entity),
1642 where_sql,
1643 order_clause,
1644 limit_clause
1645 );
1646 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1647 values.iter().map(|v| v.as_ref()).collect();
1648 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1649 code: "QUERY_FAILED".into(),
1650 message: format!("Failed to prepare: {e}"),
1651 })?;
1652 let rows = stmt
1653 .query_map(param_refs.as_slice(), |row| Ok(row_to_json(row, &columns)))
1654 .map_err(|e| RuntimeError {
1655 code: "QUERY_FAILED".into(),
1656 message: format!("Query failed: {e}"),
1657 })?;
1658 Ok(rows.flatten().collect())
1659 }
1660
1661 pub fn query_graph_with_conn(
1663 &self,
1664 conn: &Connection,
1665 query: &serde_json::Value,
1666 ) -> Result<serde_json::Value, RuntimeError> {
1667 let obj = query.as_object().ok_or_else(|| RuntimeError {
1668 code: "INVALID_QUERY".into(),
1669 message: "Graph query must be a JSON object".into(),
1670 })?;
1671 let mut results = serde_json::Map::new();
1672 for (entity_name, query_opts) in obj {
1673 let _ent = self.require_entity(entity_name)?;
1674 let filter = query_opts
1675 .get("where")
1676 .cloned()
1677 .unwrap_or(serde_json::json!({}));
1678 let rows = self.query_filtered_with_conn(conn, entity_name, &filter)?;
1679 results.insert(entity_name.clone(), serde_json::json!(rows));
1680 }
1681 Ok(serde_json::Value::Object(results))
1682 }
1683
1684 pub fn aggregate(
1691 &self,
1692 entity: &str,
1693 spec: &serde_json::Value,
1694 ) -> Result<serde_json::Value, RuntimeError> {
1695 let ent = self.require_entity(entity)?;
1696 let conn = self.lock_read_conn()?;
1697 let obj = spec.as_object().ok_or_else(|| RuntimeError {
1698 code: "INVALID_QUERY".into(),
1699 message: "aggregate spec must be an object".into(),
1700 })?;
1701
1702 let mut select_parts: Vec<String> = Vec::new();
1704 let mut result_fields: Vec<String> = Vec::new();
1705
1706 if let Some(count) = obj.get("count") {
1707 match count {
1708 serde_json::Value::String(s) if s == "*" => {
1709 select_parts.push("COUNT(*) AS count".into());
1710 result_fields.push("count".into());
1711 }
1712 serde_json::Value::String(field) => {
1713 validate_column_name(field, ent)?;
1714 let alias = format!("count_{field}");
1715 select_parts.push(format!(
1716 "COUNT({}) AS {}",
1717 quote_ident(field),
1718 quote_ident(&alias)
1719 ));
1720 result_fields.push(alias);
1721 }
1722 _ => {}
1723 }
1724 }
1725
1726 for (fn_name, alias_prefix) in [
1727 ("sum", "sum_"),
1728 ("avg", "avg_"),
1729 ("min", "min_"),
1730 ("max", "max_"),
1731 ] {
1732 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1733 for field in fields {
1734 if let Some(f) = field.as_str() {
1735 validate_column_name(f, ent)?;
1736 let alias = format!("{alias_prefix}{f}");
1737 let sql_fn = fn_name.to_uppercase();
1738 select_parts.push(format!(
1739 "{}({}) AS {}",
1740 sql_fn,
1741 quote_ident(f),
1742 quote_ident(&alias)
1743 ));
1744 result_fields.push(alias);
1745 }
1746 }
1747 }
1748 }
1749
1750 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1755 for field in fields {
1756 if let Some(f) = field.as_str() {
1757 validate_column_name(f, ent)?;
1758 let alias = format!("count_distinct_{f}");
1759 select_parts.push(format!(
1760 "COUNT(DISTINCT {}) AS {}",
1761 quote_ident(f),
1762 quote_ident(&alias)
1763 ));
1764 result_fields.push(alias);
1765 }
1766 }
1767 }
1768
1769 let mut group_by: Vec<String> = Vec::new();
1775 let mut group_select: Vec<String> = Vec::new();
1776 let mut group_field_names: Vec<String> = Vec::new();
1777 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1778 for g in groups {
1779 if let Some(f) = g.as_str() {
1780 validate_column_name(f, ent)?;
1781 let quoted = quote_ident(f);
1782 group_by.push(quoted.clone());
1783 group_select.push(quoted);
1784 group_field_names.push(f.to_string());
1785 } else if let Some(spec) = g.as_object() {
1786 let field =
1787 spec.get("field")
1788 .and_then(|v| v.as_str())
1789 .ok_or_else(|| RuntimeError {
1790 code: "INVALID_QUERY".into(),
1791 message: "groupBy object spec requires `field`".into(),
1792 })?;
1793 validate_column_name(field, ent)?;
1794 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1795 let fmt = match bucket {
1796 "hour" => "%Y-%m-%d %H:00:00",
1797 "day" => "%Y-%m-%d",
1798 "month" => "%Y-%m",
1799 "year" => "%Y",
1800 "week" => "%Y-W%W",
1801 _ => {
1802 return Err(RuntimeError {
1803 code: "INVALID_QUERY".into(),
1804 message: format!(
1805 "bucket must be one of hour/day/week/month/year, got {bucket}"
1806 ),
1807 });
1808 }
1809 };
1810 let alias = format!("{field}_{bucket}");
1811 let expr = format!("strftime('{}', {})", fmt, quote_ident(field));
1812 group_by.push(expr.clone());
1813 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1814 group_field_names.push(alias);
1815 }
1816 }
1817 }
1818 let mut full_select = group_select.clone();
1819 full_select.extend(select_parts.iter().cloned());
1820 if full_select.is_empty() {
1821 return Err(RuntimeError {
1822 code: "INVALID_QUERY".into(),
1823 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1824 });
1825 }
1826
1827 let mut where_clauses = Vec::new();
1829 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1830 let mut idx = 1;
1831 if let Some(where_obj) = obj.get("where").and_then(|v| v.as_object()) {
1832 for (k, v) in where_obj {
1833 validate_column_name(k, ent)?;
1834 where_clauses.push(format!("{} = ?{idx}", quote_ident(k)));
1835 values.push(json_to_sql(v));
1836 idx += 1;
1837 }
1838 }
1839 let where_sql = if where_clauses.is_empty() {
1840 String::new()
1841 } else {
1842 format!(" WHERE {}", where_clauses.join(" AND "))
1843 };
1844
1845 let group_sql = if group_by.is_empty() {
1846 String::new()
1847 } else {
1848 format!(" GROUP BY {}", group_by.join(", "))
1849 };
1850
1851 let sql = format!(
1852 "SELECT {} FROM {}{}{}",
1853 full_select.join(", "),
1854 quote_ident(entity),
1855 where_sql,
1856 group_sql
1857 );
1858
1859 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1860 values.iter().map(|v| v.as_ref()).collect();
1861 let mut stmt = conn.prepare_cached(&sql).map_err(|e| RuntimeError {
1862 code: "QUERY_FAILED".into(),
1863 message: format!("Failed to prepare aggregate: {e}"),
1864 })?;
1865
1866 let column_names: Vec<String> = {
1867 let mut v = group_field_names.clone();
1868 v.extend(result_fields.iter().cloned());
1869 v
1870 };
1871
1872 let rows = stmt
1873 .query_map(param_refs.as_slice(), |row| {
1874 let mut obj = serde_json::Map::new();
1875 for (i, name) in column_names.iter().enumerate() {
1876 if let Ok(n) = row.get::<_, i64>(i) {
1878 obj.insert(name.clone(), serde_json::Value::Number(n.into()));
1879 } else if let Ok(f) = row.get::<_, f64>(i) {
1880 if let Some(num) = serde_json::Number::from_f64(f) {
1881 obj.insert(name.clone(), serde_json::Value::Number(num));
1882 } else {
1883 obj.insert(name.clone(), serde_json::Value::Null);
1884 }
1885 } else if let Ok(s) = row.get::<_, String>(i) {
1886 obj.insert(name.clone(), serde_json::Value::String(s));
1887 } else {
1888 obj.insert(name.clone(), serde_json::Value::Null);
1889 }
1890 }
1891 Ok(serde_json::Value::Object(obj))
1892 })
1893 .map_err(|e| RuntimeError {
1894 code: "QUERY_FAILED".into(),
1895 message: format!("Aggregate failed: {e}"),
1896 })?;
1897
1898 let mut result = Vec::new();
1899 for row in rows {
1900 if let Ok(val) = row {
1901 result.push(val);
1902 }
1903 }
1904 Ok(serde_json::json!({ "rows": result }))
1905 }
1906
1907 fn require_entity(&self, name: &str) -> Result<&ManifestEntity, RuntimeError> {
1912 self.entities.get(name).ok_or_else(|| RuntimeError {
1913 code: "ENTITY_NOT_FOUND".into(),
1914 message: format!("Unknown entity: \"{name}\""),
1915 })
1916 }
1917
1918 fn lock_write_conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>, RuntimeError> {
1920 self.write_conn.lock().map_err(|e| RuntimeError {
1921 code: "LOCK_FAILED".into(),
1922 message: format!("Failed to acquire write connection lock: {e}"),
1923 })
1924 }
1925
1926 fn lock_read_conn(&self) -> Result<ReadConnGuard<'_>, RuntimeError> {
1930 if !self.read_pool.is_empty() {
1931 let idx = self.read_counter.fetch_add(1, Ordering::Relaxed) % self.read_pool.len();
1932 let guard = self.read_pool[idx].lock().map_err(|e| RuntimeError {
1933 code: "LOCK_FAILED".into(),
1934 message: format!("Failed to acquire read connection: {e}"),
1935 })?;
1936 Ok(ReadConnGuard::Pooled(guard))
1937 } else {
1938 let guard = self.write_conn.lock().map_err(|e| RuntimeError {
1940 code: "LOCK_FAILED".into(),
1941 message: format!("Failed to acquire connection: {e}"),
1942 })?;
1943 Ok(ReadConnGuard::Write(guard))
1944 }
1945 }
1946}
1947
1948fn with_write_tx<T, F>(conn: &rusqlite::Connection, body: F) -> Result<T, RuntimeError>
1976where
1977 F: FnOnce() -> Result<T, RuntimeError>,
1978{
1979 conn.execute("BEGIN IMMEDIATE", [])
1980 .map_err(|e| RuntimeError {
1981 code: "TX_BEGIN_FAILED".into(),
1982 message: format!("BEGIN: {e}"),
1983 })?;
1984 match body() {
1985 Ok(v) => {
1986 conn.execute("COMMIT", []).map_err(|e| RuntimeError {
1987 code: "TX_COMMIT_FAILED".into(),
1988 message: format!("COMMIT: {e}"),
1989 })?;
1990 Ok(v)
1991 }
1992 Err(e) => {
1993 let _ = conn.execute("ROLLBACK", []);
1996 Err(e)
1997 }
1998 }
1999}
2000
2001fn generate_id() -> String {
2002 use std::sync::atomic::{AtomicU32, Ordering};
2003 use std::time::{SystemTime, UNIX_EPOCH};
2004 static COUNTER: AtomicU32 = AtomicU32::new(0);
2005 let nanos = SystemTime::now()
2006 .duration_since(UNIX_EPOCH)
2007 .unwrap_or_default()
2008 .as_nanos();
2009 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
2010 format!("{nanos:032x}{seq:08x}")
2011}
2012
2013fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
2015 match val {
2016 serde_json::Value::Null => Box::new(rusqlite::types::Null),
2017 serde_json::Value::Bool(b) => Box::new(*b as i32),
2018 serde_json::Value::Number(n) => {
2019 if let Some(i) = n.as_i64() {
2020 Box::new(i)
2021 } else if let Some(f) = n.as_f64() {
2022 Box::new(f)
2023 } else {
2024 Box::new(n.to_string())
2025 }
2026 }
2027 serde_json::Value::String(s) => Box::new(s.clone()),
2028 other => Box::new(other.to_string()),
2029 }
2030}
2031
2032fn row_to_json(row: &rusqlite::Row<'_>, _field_names: &[String]) -> serde_json::Value {
2047 let mut obj = serde_json::Map::new();
2048
2049 let stmt = row.as_ref();
2050 let count = stmt.column_count();
2051 for i in 0..count {
2052 let name = match stmt.column_name(i) {
2056 Ok(n) => n.to_string(),
2057 Err(_) => continue,
2058 };
2059 let value = if let Ok(s) = row.get::<_, String>(i) {
2060 serde_json::Value::String(s)
2061 } else if let Ok(n) = row.get::<_, i64>(i) {
2062 serde_json::Value::Number(serde_json::Number::from(n))
2063 } else if let Ok(f) = row.get::<_, f64>(i) {
2064 serde_json::Number::from_f64(f)
2065 .map(serde_json::Value::Number)
2066 .unwrap_or(serde_json::Value::Null)
2067 } else {
2068 serde_json::Value::Null
2069 };
2070 obj.insert(name, value);
2071 }
2072
2073 serde_json::Value::Object(obj)
2074}
2075
2076#[cfg(test)]
2077mod tests {
2078 use super::*;
2079 use pylon_kernel::{ManifestField, ManifestIndex};
2080
2081 fn test_manifest() -> AppManifest {
2082 AppManifest {
2083 manifest_version: 1,
2084 name: "Test".into(),
2085 version: "0.1.0".into(),
2086 entities: vec![pylon_kernel::ManifestEntity {
2087 name: "User".into(),
2088 fields: vec![
2089 ManifestField {
2090 name: "email".into(),
2091 field_type: "string".into(),
2092 optional: false,
2093 unique: true,
2094 crdt: None,
2095 },
2096 ManifestField {
2097 name: "displayName".into(),
2098 field_type: "string".into(),
2099 optional: false,
2100 unique: false,
2101 crdt: None,
2102 },
2103 ],
2104 indexes: vec![ManifestIndex {
2105 name: "user_email".into(),
2106 fields: vec!["email".into()],
2107 unique: true,
2108 }],
2109 relations: vec![],
2110 search: None,
2111 crdt: true,
2112 }],
2113 routes: vec![],
2114 queries: vec![],
2115 actions: vec![],
2116 policies: vec![],
2117 }
2118 }
2119
2120 #[test]
2121 fn reset_for_tests_wipes_in_memory() {
2122 let rt = Runtime::in_memory(test_manifest()).unwrap();
2123 rt.insert(
2124 "User",
2125 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2126 )
2127 .unwrap();
2128 assert_eq!(rt.list("User").unwrap().len(), 1);
2129 rt.reset_for_tests().unwrap();
2130 assert_eq!(rt.list("User").unwrap().len(), 0);
2131 }
2132
2133 #[test]
2134 fn reset_for_tests_refuses_file_db() {
2135 let dir = std::env::temp_dir().join("pylon-reset-refuse");
2136 let _ = std::fs::create_dir_all(&dir);
2137 let db_path = dir.join("db.sqlite");
2138 let _ = std::fs::remove_file(&db_path);
2139 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2140 let err = rt.reset_for_tests().unwrap_err();
2141 assert_eq!(err.code, "RESET_REFUSED");
2142 let _ = std::fs::remove_file(&db_path);
2143 }
2144
2145 #[test]
2146 fn insert_and_get() {
2147 let rt = Runtime::in_memory(test_manifest()).unwrap();
2148 let id = rt
2149 .insert(
2150 "User",
2151 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2152 )
2153 .unwrap();
2154 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2155 assert_eq!(row["email"], "a@b.com");
2156 }
2157
2158 #[test]
2167 fn row_to_json_handles_columns_added_out_of_manifest_order() {
2168 let mut manifest = test_manifest();
2170 manifest.entities[0].fields = vec![
2171 ManifestField {
2172 name: "email".into(),
2173 field_type: "string".into(),
2174 optional: false,
2175 unique: true,
2176 crdt: None,
2177 },
2178 ManifestField {
2179 name: "displayName".into(),
2180 field_type: "string".into(),
2181 optional: false,
2182 unique: false,
2183 crdt: None,
2184 },
2185 ManifestField {
2186 name: "avatarColor".into(),
2187 field_type: "string".into(),
2188 optional: true,
2189 unique: false,
2190 crdt: None,
2191 },
2192 ManifestField {
2193 name: "createdAt".into(),
2194 field_type: "datetime".into(),
2195 optional: true,
2196 unique: false,
2197 crdt: None,
2198 },
2199 ];
2200 manifest.entities[0].crdt = false;
2207 let rt = Runtime::in_memory(manifest).unwrap();
2208 let id = rt
2209 .insert(
2210 "User",
2211 &serde_json::json!({
2212 "email": "a@b.com",
2213 "displayName": "Alice",
2214 "avatarColor": "#abc",
2215 "createdAt": "2026-01-01T00:00:00Z",
2216 }),
2217 )
2218 .unwrap();
2219
2220 {
2226 let conn = rt.lock_write_conn().unwrap();
2227 conn.execute("ALTER TABLE \"User\" ADD COLUMN \"passwordHash\" TEXT", [])
2228 .unwrap();
2229 conn.execute(
2230 "UPDATE \"User\" SET \"passwordHash\" = ?1 WHERE \"id\" = ?2",
2231 rusqlite::params!["hashed-password", &id],
2232 )
2233 .unwrap();
2234 }
2235 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2241 assert_eq!(row["email"], "a@b.com");
2245 assert_eq!(row["displayName"], "Alice");
2246 assert_eq!(row["avatarColor"], "#abc");
2247 assert_eq!(row["createdAt"], "2026-01-01T00:00:00Z");
2248 assert_eq!(row["passwordHash"], "hashed-password");
2249 }
2250
2251 #[test]
2256 fn crdt_default_writes_through_loro_store() {
2257 let rt = Runtime::in_memory(test_manifest()).unwrap();
2258 let id = rt
2259 .insert(
2260 "User",
2261 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2262 )
2263 .unwrap();
2264
2265 let conn = rt.lock_write_conn().unwrap();
2267 let snap_count: i64 = conn
2268 .query_row(
2269 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2270 WHERE entity = ?1 AND row_id = ?2",
2271 rusqlite::params!["User", &id],
2272 |r| r.get(0),
2273 )
2274 .unwrap();
2275 assert_eq!(snap_count, 1, "sidecar should have one row after insert");
2276
2277 assert!(rt.crdt_store().cached_rows() >= 1);
2280
2281 drop(conn);
2283 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2284 assert_eq!(row["email"], "x@y.com");
2285 assert_eq!(row["displayName"], "Eric");
2286 }
2287
2288 #[test]
2292 fn crdt_update_persists_new_snapshot() {
2293 let rt = Runtime::in_memory(test_manifest()).unwrap();
2294 let id = rt
2295 .insert(
2296 "User",
2297 &serde_json::json!({"email": "x@y.com", "displayName": "Eric"}),
2298 )
2299 .unwrap();
2300
2301 let snap_after_insert: Vec<u8> = {
2302 let conn = rt.lock_write_conn().unwrap();
2303 conn.query_row(
2304 "SELECT snapshot FROM _pylon_crdt_snapshots
2305 WHERE entity = 'User' AND row_id = ?1",
2306 rusqlite::params![&id],
2307 |r| r.get(0),
2308 )
2309 .unwrap()
2310 };
2311
2312 rt.update("User", &id, &serde_json::json!({"displayName": "Eric C"}))
2313 .unwrap();
2314
2315 let snap_after_update: Vec<u8> = {
2316 let conn = rt.lock_write_conn().unwrap();
2317 conn.query_row(
2318 "SELECT snapshot FROM _pylon_crdt_snapshots
2319 WHERE entity = 'User' AND row_id = ?1",
2320 rusqlite::params![&id],
2321 |r| r.get(0),
2322 )
2323 .unwrap()
2324 };
2325
2326 assert_ne!(
2327 snap_after_insert, snap_after_update,
2328 "snapshot bytes should change after an update"
2329 );
2330
2331 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2332 assert_eq!(row["displayName"], "Eric C");
2333 assert_eq!(row["email"], "x@y.com");
2334 }
2335
2336 #[test]
2343 fn crdt_insert_rolls_back_when_sql_step_fails() {
2344 let rt = Runtime::in_memory(test_manifest()).unwrap();
2345 rt.insert(
2347 "User",
2348 &serde_json::json!({"email": "x@y.com", "displayName": "First"}),
2349 )
2350 .unwrap();
2351
2352 let snap_count_before: i64 = {
2354 let conn = rt.lock_write_conn().unwrap();
2355 conn.query_row(
2356 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2357 [],
2358 |r| r.get(0),
2359 )
2360 .unwrap()
2361 };
2362
2363 let err = rt
2365 .insert(
2366 "User",
2367 &serde_json::json!({"email": "x@y.com", "displayName": "Second"}),
2368 )
2369 .expect_err("duplicate email must fail");
2370 assert_eq!(err.code, "INSERT_FAILED");
2371
2372 let snap_count_after: i64 = {
2375 let conn = rt.lock_write_conn().unwrap();
2376 conn.query_row(
2377 "SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'User'",
2378 [],
2379 |r| r.get(0),
2380 )
2381 .unwrap()
2382 };
2383 assert_eq!(
2384 snap_count_after, snap_count_before,
2385 "failed insert should not leave a sidecar snapshot behind"
2386 );
2387 }
2388
2389 #[test]
2392 fn crdt_false_skips_loro_store() {
2393 let mut manifest = test_manifest();
2394 manifest.entities[0].crdt = false;
2396 let rt = Runtime::in_memory(manifest).unwrap();
2397
2398 let id = rt
2399 .insert(
2400 "User",
2401 &serde_json::json!({"email": "lww@example.com", "displayName": "Plain"}),
2402 )
2403 .unwrap();
2404
2405 let conn = rt.lock_write_conn().unwrap();
2406 let snap_count: i64 = conn
2407 .query_row(
2408 "SELECT COUNT(*) FROM _pylon_crdt_snapshots
2409 WHERE entity = 'User' AND row_id = ?1",
2410 rusqlite::params![&id],
2411 |r| r.get(0),
2412 )
2413 .unwrap();
2414 assert_eq!(snap_count, 0, "crdt:false should not touch the sidecar");
2415 assert_eq!(
2416 rt.crdt_store().cached_rows(),
2417 0,
2418 "crdt:false should not warm the cache"
2419 );
2420
2421 drop(conn);
2424 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2425 assert_eq!(row["email"], "lww@example.com");
2426 }
2427
2428 #[test]
2429 fn list_entities() {
2430 let rt = Runtime::in_memory(test_manifest()).unwrap();
2431 rt.insert(
2432 "User",
2433 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2434 )
2435 .unwrap();
2436 rt.insert(
2437 "User",
2438 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2439 )
2440 .unwrap();
2441 let rows = rt.list("User").unwrap();
2442 assert_eq!(rows.len(), 2);
2443 }
2444
2445 #[test]
2446 fn update_entity() {
2447 let rt = Runtime::in_memory(test_manifest()).unwrap();
2448 let id = rt
2449 .insert(
2450 "User",
2451 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2452 )
2453 .unwrap();
2454 let updated = rt
2455 .update("User", &id, &serde_json::json!({"displayName": "Updated"}))
2456 .unwrap();
2457 assert!(updated);
2458 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2459 assert_eq!(row["displayName"], "Updated");
2460 }
2461
2462 #[test]
2463 fn delete_entity() {
2464 let rt = Runtime::in_memory(test_manifest()).unwrap();
2465 let id = rt
2466 .insert(
2467 "User",
2468 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2469 )
2470 .unwrap();
2471 let deleted = rt.delete("User", &id).unwrap();
2472 assert!(deleted);
2473 assert!(rt.get_by_id("User", &id).unwrap().is_none());
2474 }
2475
2476 #[test]
2477 fn lookup_by_field() {
2478 let rt = Runtime::in_memory(test_manifest()).unwrap();
2479 rt.insert(
2480 "User",
2481 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2482 )
2483 .unwrap();
2484 let row = rt.lookup("User", "email", "a@b.com").unwrap().unwrap();
2485 assert_eq!(row["displayName"], "A");
2486 }
2487
2488 #[test]
2489 fn unknown_entity_returns_error() {
2490 let rt = Runtime::in_memory(test_manifest()).unwrap();
2491 let err = rt.list("Nonexistent").unwrap_err();
2492 assert_eq!(err.code, "ENTITY_NOT_FOUND");
2493 }
2494
2495 #[test]
2496 fn insert_rejects_unknown_column() {
2497 let rt = Runtime::in_memory(test_manifest()).unwrap();
2498 let err = rt
2499 .insert(
2500 "User",
2501 &serde_json::json!({"email": "a@b.com", "displayName": "A", "evil_col": "x"}),
2502 )
2503 .unwrap_err();
2504 assert_eq!(err.code, "INVALID_COLUMN");
2505 }
2506
2507 #[test]
2508 fn update_rejects_unknown_column() {
2509 let rt = Runtime::in_memory(test_manifest()).unwrap();
2510 let id = rt
2511 .insert(
2512 "User",
2513 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2514 )
2515 .unwrap();
2516 let err = rt
2517 .update("User", &id, &serde_json::json!({"bad_field": "x"}))
2518 .unwrap_err();
2519 assert_eq!(err.code, "INVALID_COLUMN");
2520 }
2521
2522 #[test]
2523 fn lookup_rejects_unknown_column() {
2524 let rt = Runtime::in_memory(test_manifest()).unwrap();
2525 let err = rt.lookup("User", "nonexistent", "val").unwrap_err();
2526 assert_eq!(err.code, "INVALID_COLUMN");
2527 }
2528
2529 #[test]
2530 fn query_filtered_rejects_unknown_column() {
2531 let rt = Runtime::in_memory(test_manifest()).unwrap();
2532 let err = rt
2533 .query_filtered("User", &serde_json::json!({"bad_col": "x"}))
2534 .unwrap_err();
2535 assert_eq!(err.code, "INVALID_COLUMN");
2536 }
2537
2538 #[test]
2539 fn query_filtered_rejects_unknown_order_column() {
2540 let rt = Runtime::in_memory(test_manifest()).unwrap();
2541 let err = rt
2542 .query_filtered("User", &serde_json::json!({"$order": {"bad_col": "asc"}}))
2543 .unwrap_err();
2544 assert_eq!(err.code, "INVALID_COLUMN");
2545 }
2546
2547 #[test]
2548 fn query_filtered_sanitizes_order_direction() {
2549 let rt = Runtime::in_memory(test_manifest()).unwrap();
2550 rt.insert(
2551 "User",
2552 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2553 )
2554 .unwrap();
2555 let rows = rt
2557 .query_filtered(
2558 "User",
2559 &serde_json::json!({"$order": {"email": "DROP TABLE User"}}),
2560 )
2561 .unwrap();
2562 assert_eq!(rows.len(), 1);
2563 }
2564
2565 #[test]
2566 fn in_memory_has_no_read_pool() {
2567 let rt = Runtime::in_memory(test_manifest()).unwrap();
2568 assert_eq!(rt.read_pool_size(), 0);
2569 }
2570
2571 #[test]
2572 fn open_creates_read_pool() {
2573 let dir = std::env::temp_dir().join(format!("pylon_test_{}", std::process::id()));
2574 std::fs::create_dir_all(&dir).unwrap();
2575 let db_path = dir.join("test_read_pool.db");
2576
2577 let rt = Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap();
2578 assert_eq!(rt.read_pool_size(), READ_POOL_SIZE);
2579
2580 let id = rt
2582 .insert(
2583 "User",
2584 &serde_json::json!({"email": "pool@test.com", "displayName": "Pool"}),
2585 )
2586 .unwrap();
2587 let row = rt.get_by_id("User", &id).unwrap().unwrap();
2588 assert_eq!(row["email"], "pool@test.com");
2589
2590 let _ = std::fs::remove_dir_all(&dir);
2592 }
2593
2594 #[test]
2595 fn concurrent_reads_dont_block_on_write() {
2596 use std::sync::Arc;
2597
2598 let dir = std::env::temp_dir().join(format!("pylon_conc_{}", std::process::id()));
2599 std::fs::create_dir_all(&dir).unwrap();
2600 let db_path = dir.join("test_concurrent.db");
2601
2602 let rt = Arc::new(Runtime::open(db_path.to_str().unwrap(), test_manifest()).unwrap());
2603
2604 rt.insert(
2606 "User",
2607 &serde_json::json!({"email": "a@b.com", "displayName": "A"}),
2608 )
2609 .unwrap();
2610 rt.insert(
2611 "User",
2612 &serde_json::json!({"email": "b@c.com", "displayName": "B"}),
2613 )
2614 .unwrap();
2615
2616 let write_guard = rt.lock_write_conn().unwrap();
2618
2619 let mut handles = Vec::new();
2621 for _ in 0..4 {
2622 let rt_clone = Arc::clone(&rt);
2623 handles.push(std::thread::spawn(move || {
2624 let rows = rt_clone.list("User").unwrap();
2625 assert_eq!(rows.len(), 2);
2626 }));
2627 }
2628
2629 for h in handles {
2630 h.join().expect("reader thread panicked");
2631 }
2632
2633 drop(write_guard);
2635
2636 let _ = std::fs::remove_dir_all(&dir);
2638 }
2639}