1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29pub(crate) fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46 quote_ident(name)
47}
48
49#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54 live::row_to_json(row)
55}
56
57#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64 entity: &str,
65 filter: &serde_json::Value,
66 valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68 live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76 entity: &str,
77 spec: &serde_json::Value,
78 valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80 live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87 rows: &[postgres::Row],
88 column_names: &[String],
89) -> serde_json::Value {
90 live::aggregate_rows_to_json(rows, column_names)
91}
92
93pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101 for field in fields {
102 let col_type = pg_column_type(&field.field_type);
103 let not_null = if field.optional { "" } else { " NOT NULL" };
104 let unique = if field.unique { " UNIQUE" } else { "" };
105 columns.push(format!(
106 "{} {}{}{}",
107 quote_ident(&field.name),
108 col_type,
109 not_null,
110 unique
111 ));
112 }
113
114 format!(
115 "CREATE TABLE IF NOT EXISTS {} ({})",
116 quote_ident(entity_name),
117 columns.join(", ")
118 )
119}
120
121pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125 let col_type = pg_column_type(&field.field_type);
126 let unique = if field.unique { " UNIQUE" } else { "" };
127 format!(
128 "ALTER TABLE {} ADD COLUMN {} {}{}",
129 quote_ident(entity_name),
130 quote_ident(&field.name),
131 col_type,
132 unique
133 )
134}
135
136pub fn create_index_sql(
138 entity_name: &str,
139 index_name: &str,
140 fields: &[String],
141 unique: bool,
142) -> String {
143 let unique_str = if unique { "UNIQUE " } else { "" };
144 let full_index_name = format!("{}_{}", entity_name, index_name);
145 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146 format!(
147 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148 unique_str,
149 quote_ident(&full_index_name),
150 quote_ident(entity_name),
151 quoted_fields.join(", ")
152 )
153}
154
155pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165 let mut operations = Vec::new();
167
168 for entity in &target.entities {
169 let fields: Vec<FieldSpec> = entity
170 .fields
171 .iter()
172 .map(|f| FieldSpec {
173 name: f.name.clone(),
174 field_type: f.field_type.clone(),
175 optional: f.optional,
176 unique: f.unique,
177 })
178 .collect();
179
180 operations.push(SchemaOperation::CreateEntity {
181 name: entity.name.clone(),
182 fields,
183 });
184
185 for index in &entity.indexes {
186 operations.push(SchemaOperation::AddIndex {
187 entity: entity.name.clone(),
188 name: index.name.clone(),
189 fields: index.fields.clone(),
190 unique: index.unique,
191 });
192 }
193 }
194
195 if operations.is_empty() {
196 operations.push(SchemaOperation::Noop);
197 }
198
199 Ok(SchemaPlan { operations })
200 }
201
202 }
204
205pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208 let mut statements = Vec::new();
209
210 for op in &plan.operations {
211 match op {
212 SchemaOperation::CreateEntity { name, fields } => {
213 statements.push(create_table_sql(name, fields));
214 }
215 SchemaOperation::AddField { entity, field } => {
216 statements.push(add_column_sql(entity, field));
217 }
218 SchemaOperation::AlterField {
219 entity,
220 previous,
221 target,
222 } => {
223 if previous.optional && !target.optional {
232 statements.push(format!(
233 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234 quote_ident(entity),
235 quote_ident(&target.name)
236 ));
237 } else if !previous.optional && target.optional {
238 statements.push(format!(
239 "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240 quote_ident(entity),
241 quote_ident(&target.name)
242 ));
243 }
244 }
250 SchemaOperation::AddIndex {
251 entity,
252 name,
253 fields,
254 unique,
255 } => {
256 statements.push(create_index_sql(entity, name, fields, *unique));
257 }
258 SchemaOperation::CreateSearchIndex { entity, config } => {
259 #[cfg(feature = "postgres-live")]
260 {
261 statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262 }
263 #[cfg(not(feature = "postgres-live"))]
264 {
265 let _ = (entity, config);
266 return Err(StorageError {
267 code: "PG_SEARCH_FEATURE_OFF".into(),
268 message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269 });
270 }
271 }
272 SchemaOperation::RemoveSearchIndex { entity } => {
273 statements.push(format!(
286 "DROP TABLE IF EXISTS {} CASCADE",
287 quote_ident(&format!("_fts_{entity}"))
288 ));
289 statements.push(format!(
290 "DROP INDEX IF EXISTS {}",
291 quote_ident(&format!("{entity}_fts_gin"))
292 ));
293 }
294 SchemaOperation::Noop => {}
295 other => {
296 return Err(StorageError {
297 code: "PG_OP_UNSUPPORTED".into(),
298 message: format!("Operation not supported by Postgres adapter: {other:?}"),
299 });
300 }
301 }
302 }
303
304 Ok(statements)
305}
306
307pub const INTROSPECT_TABLES_SQL: &str = "\
316 SELECT table_name \
317 FROM information_schema.tables \
318 WHERE table_schema = 'public' \
319 AND table_type = 'BASE TABLE' \
320 AND table_name NOT LIKE '_pylon_%' \
321 ORDER BY table_name";
322
323pub const INTROSPECT_COLUMNS_SQL: &str = "\
326 SELECT column_name, data_type, is_nullable, \
327 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328 JOIN information_schema.key_column_usage kcu \
329 ON tc.constraint_name = kcu.constraint_name \
330 WHERE tc.table_name = c.table_name \
331 AND kcu.column_name = c.column_name \
332 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333 FROM information_schema.columns c \
334 WHERE table_schema = 'public' AND table_name = $1 \
335 ORDER BY ordinal_position";
336
337pub const INTROSPECT_INDEXES_SQL: &str = "\
340 SELECT i.relname as index_name, \
341 ix.indisunique as is_unique, \
342 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343 FROM pg_index ix \
344 JOIN pg_class t ON t.oid = ix.indrelid \
345 JOIN pg_class i ON i.oid = ix.indexrelid \
346 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347 JOIN pg_namespace n ON n.oid = t.relnamespace \
348 WHERE n.nspname = 'public' \
349 AND t.relname = $1 \
350 AND NOT ix.indisprimary \
351 GROUP BY i.relname, ix.indisunique \
352 ORDER BY i.relname";
353
354pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357 crate::plan_from_snapshot(snapshot, target)
358}
359
360pub fn generate_id() -> String {
373 use std::sync::atomic::{AtomicU32, Ordering};
374 use std::time::{SystemTime, UNIX_EPOCH};
375 static COUNTER: AtomicU32 = AtomicU32::new(0);
376 let ts = SystemTime::now()
377 .duration_since(UNIX_EPOCH)
378 .unwrap_or_default()
379 .as_nanos();
380 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381 format!("{ts:032x}{seq:08x}")
382}
383
384pub fn json_value_to_string(val: &serde_json::Value) -> String {
393 match val {
394 serde_json::Value::String(s) => s.clone(),
395 serde_json::Value::Number(n) => n.to_string(),
396 serde_json::Value::Bool(b) => b.to_string(),
397 serde_json::Value::Null => String::new(),
398 other => other.to_string(),
399 }
400}
401
402#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419 Null,
420 Text(String),
421 Int(i64),
422 Float(f64),
423 Bool(bool),
424}
425
426impl JsonParam {
427 pub fn from_json(val: &serde_json::Value) -> Self {
431 match val {
432 serde_json::Value::Null => JsonParam::Null,
433 serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434 serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435 serde_json::Value::Number(n) => {
436 if let Some(i) = n.as_i64() {
437 JsonParam::Int(i)
438 } else if let Some(f) = n.as_f64() {
439 JsonParam::Float(f)
440 } else {
441 JsonParam::Text(n.to_string())
442 }
443 }
444 other => JsonParam::Text(other.to_string()),
445 }
446 }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451 fn to_sql(
452 &self,
453 ty: &postgres::types::Type,
454 out: &mut bytes::BytesMut,
455 ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456 use postgres::types::Type;
457
458 if matches!(self, JsonParam::Null) {
462 return Ok(postgres::types::IsNull::Yes);
463 }
464
465 match (self, ty) {
472 (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474 (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475 (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476 (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477 (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478 (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480 (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481 (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482 (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483 (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485 (JsonParam::Text(s), &Type::TEXT)
486 | (JsonParam::Text(s), &Type::VARCHAR)
487 | (JsonParam::Text(s), &Type::BPCHAR)
488 | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489 (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490 let dt = chrono::DateTime::parse_from_rfc3339(s)
504 .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505 .with_timezone(&chrono::Utc);
506 dt.to_sql(ty, out)
507 }
508 (JsonParam::Text(s), &Type::TIMESTAMP) => {
509 let dt = chrono::DateTime::parse_from_rfc3339(s)
513 .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514 .with_timezone(&chrono::Utc)
515 .naive_utc();
516 dt.to_sql(ty, out)
517 }
518 (JsonParam::Text(s), &Type::DATE) => {
519 let dt = chrono::DateTime::parse_from_rfc3339(s)
520 .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521 .with_timezone(&chrono::Utc)
522 .date_naive();
523 dt.to_sql(ty, out)
524 }
525
526 (other, _) => {
531 let s = match other {
532 JsonParam::Bool(b) => b.to_string(),
533 JsonParam::Int(n) => n.to_string(),
534 JsonParam::Float(f) => f.to_string(),
535 JsonParam::Text(s) => s.clone(),
536 JsonParam::Null => unreachable!(),
537 };
538 s.to_sql(ty, out)
539 }
540 }
541 }
542
543 fn accepts(_ty: &postgres::types::Type) -> bool {
544 true
548 }
549
550 postgres::types::to_sql_checked!();
551}
552
553pub fn build_insert_sql(
561 entity: &str,
562 data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564 let obj = data.as_object().ok_or_else(|| StorageError {
565 code: "PG_INVALID_DATA".into(),
566 message: "Insert data must be a JSON object".into(),
567 })?;
568
569 let id = match obj.get("id") {
576 None | Some(serde_json::Value::Null) => generate_id(),
577 Some(serde_json::Value::String(s)) => s.clone(),
578 Some(other) => {
579 return Err(StorageError {
580 code: "PG_INVALID_ID".into(),
581 message: format!(
582 "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583 are always strings (40-char hex). Drop the `id` field to let the \
584 server generate one, or supply a string."
585 ),
586 });
587 }
588 };
589
590 let mut col_names = vec!["id".to_string()];
591 let mut placeholders = vec!["$1".to_string()];
592 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594 let mut i = 0usize;
595 for (key, val) in obj {
596 if key == "id" {
597 continue;
601 }
602 col_names.push(quote_ident(key));
603 placeholders.push(format!("${}", i + 2));
604 values.push(JsonParam::from_json(val));
605 i += 1;
606 }
607
608 let sql = format!(
609 "INSERT INTO {} ({}) VALUES ({})",
610 quote_ident(entity),
611 col_names.join(", "),
612 placeholders.join(", ")
613 );
614
615 Ok((sql, values))
616}
617
618pub fn build_update_sql(
621 entity: &str,
622 id: &str,
623 data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625 let obj = data.as_object().ok_or_else(|| StorageError {
626 code: "PG_INVALID_DATA".into(),
627 message: "Update data must be a JSON object".into(),
628 })?;
629
630 if obj.is_empty() {
631 return Err(StorageError {
632 code: "PG_INVALID_DATA".into(),
633 message: "Update data must contain at least one field".into(),
634 });
635 }
636
637 let mut set_clauses = Vec::new();
638 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640 let mut i = 0usize;
641 for (key, val) in obj {
642 if key == "id" {
643 return Err(StorageError {
650 code: "PG_INVALID_UPDATE".into(),
651 message:
652 "Updating the `id` column is not allowed — Pylon row ids are immutable; \
653 drop the field from the patch."
654 .into(),
655 });
656 }
657 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
658 values.push(JsonParam::from_json(val));
659 i += 1;
660 }
661
662 if set_clauses.is_empty() {
663 return Err(StorageError {
664 code: "PG_INVALID_DATA".into(),
665 message: "Update data must contain at least one non-id field".into(),
666 });
667 }
668
669 let sql = format!(
670 "UPDATE {} SET {} WHERE id = $1",
671 quote_ident(entity),
672 set_clauses.join(", ")
673 );
674
675 Ok((sql, values))
676}
677
678#[cfg(feature = "postgres-live")]
682fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
683 values
684 .iter()
685 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
686 .collect()
687}
688
689#[cfg(feature = "postgres-live")]
694pub mod live {
695 use super::*;
696 use crate::{
697 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
698 };
699
700 pub struct LivePostgresAdapter {
702 client: postgres::Client,
703 }
704
705 impl LivePostgresAdapter {
706 pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
714 &mut self.client
715 }
716
717 pub fn connect(url: &str) -> Result<Self, StorageError> {
719 let client =
720 postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
721 code: "PG_CONNECT_FAILED".into(),
722 message: format!("Failed to connect to Postgres: {e}"),
723 })?;
724 Ok(Self { client })
725 }
726
727 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
729 let table_rows = self
730 .client
731 .query(INTROSPECT_TABLES_SQL, &[])
732 .map_err(pg_err)?;
733
734 let mut tables = Vec::new();
735 for row in &table_rows {
736 let table_name: String = row.get(0);
737 let columns = self.read_columns(&table_name)?;
738 let indexes = self.read_indexes(&table_name)?;
739 tables.push(TableSnapshot {
740 name: table_name,
741 columns,
742 indexes,
743 });
744 }
745
746 Ok(SchemaSnapshot { tables })
747 }
748
749 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
750 let rows = self
751 .client
752 .query(INTROSPECT_COLUMNS_SQL, &[&table])
753 .map_err(pg_err)?;
754
755 let mut columns = Vec::new();
756 for row in &rows {
757 let name: String = row.get(0);
758 let data_type: String = row.get(1);
759 let is_nullable: String = row.get(2);
760 let is_pk: i64 = row.get(3);
761 columns.push(ColumnSnapshot {
762 name,
763 column_type: data_type,
764 notnull: is_nullable == "NO",
765 primary_key: is_pk > 0,
766 });
767 }
768 Ok(columns)
769 }
770
771 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
772 let rows = self
773 .client
774 .query(INTROSPECT_INDEXES_SQL, &[&table])
775 .map_err(pg_err)?;
776
777 let mut indexes = Vec::new();
778 for row in &rows {
779 let name: String = row.get(0);
780 let unique: bool = row.get(1);
781 let columns: Vec<String> = row.get(2);
782 indexes.push(IndexSnapshot {
783 name,
784 columns,
785 unique,
786 });
787 }
788 Ok(indexes)
789 }
790
791 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
793 let snapshot = self.read_schema()?;
794 Ok(crate::plan_from_snapshot(&snapshot, target))
795 }
796 }
797
798 impl StorageAdapter for LivePostgresAdapter {
799 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
800 Err(StorageError {
801 code: "PG_PLAN_NEEDS_MUTABLE".into(),
802 message: "Use plan_from_live() instead for live Postgres planning".into(),
803 })
804 }
805
806 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
807 Err(StorageError {
808 code: "PG_APPLY_USE_METHOD".into(),
809 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
810 })
811 }
812 }
813
814 impl LivePostgresAdapter {
815 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
817 let statements = plan_to_sql(plan)?;
818 for sql in &statements {
819 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
820 }
821 Ok(())
822 }
823
824 pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
830 self.client.execute(sql, &[]).map_err(pg_err)
831 }
832
833 pub fn insert(
835 &mut self,
836 entity: &str,
837 data: &serde_json::Value,
838 ) -> Result<String, StorageError> {
839 let (sql, values) = build_insert_sql(entity, data)?;
840 let id = match &values[0] {
843 JsonParam::Text(s) => s.clone(),
844 _ => {
845 return Err(StorageError {
846 code: "PG_INTERNAL".into(),
847 message: "build_insert_sql produced non-text id param".into(),
848 });
849 }
850 };
851 let params = as_pg_params(&values);
852 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
853 Ok(id)
854 }
855
856 pub fn get_by_id(
858 &mut self,
859 entity: &str,
860 id: &str,
861 ) -> Result<Option<serde_json::Value>, StorageError> {
862 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
863 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
864
865 match rows.first() {
866 Some(row) => Ok(Some(row_to_json(row))),
867 None => Ok(None),
868 }
869 }
870
871 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
873 let sql = format!("SELECT * FROM {}", quote_ident(entity));
874 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
875
876 Ok(rows.iter().map(row_to_json).collect())
877 }
878
879 pub fn list_after(
883 &mut self,
884 entity: &str,
885 after: Option<&str>,
886 limit: usize,
887 ) -> Result<Vec<serde_json::Value>, StorageError> {
888 let capped: i64 = limit.min(10_000) as i64;
891 let sql = match after {
892 Some(_) => format!(
893 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
894 quote_ident(entity)
895 ),
896 None => format!(
897 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
898 quote_ident(entity)
899 ),
900 };
901 let rows = match after {
902 Some(cursor) => self
903 .client
904 .query(sql.as_str(), &[&cursor, &capped])
905 .map_err(pg_err)?,
906 None => self
907 .client
908 .query(sql.as_str(), &[&capped])
909 .map_err(pg_err)?,
910 };
911 Ok(rows.iter().map(row_to_json).collect())
912 }
913
914 pub fn update(
916 &mut self,
917 entity: &str,
918 id: &str,
919 data: &serde_json::Value,
920 ) -> Result<bool, StorageError> {
921 let (sql, values) = build_update_sql(entity, id, data)?;
922 let params = as_pg_params(&values);
923 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
924 Ok(rows_affected > 0)
925 }
926
927 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
929 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
930 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
931 Ok(rows_affected > 0)
932 }
933
934 pub fn lookup_field(
938 &mut self,
939 entity: &str,
940 field: &str,
941 value: &str,
942 ) -> Result<Option<serde_json::Value>, StorageError> {
943 let sql = format!(
944 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
945 quote_ident(entity),
946 quote_ident(field),
947 );
948 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
949 Ok(rows.first().map(row_to_json))
950 }
951
952 pub fn query_filtered(
975 &mut self,
976 entity: &str,
977 filter: &serde_json::Value,
978 valid_columns: &[String],
979 ) -> Result<Vec<serde_json::Value>, StorageError> {
980 let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
981 let pg_params = as_pg_params(¶ms);
982 let rows = self
983 .client
984 .query(sql.as_str(), &pg_params)
985 .map_err(pg_err)?;
986 Ok(rows.iter().map(row_to_json).collect())
987 }
988
989 pub(crate) fn build_query_filtered_sql(
996 entity: &str,
997 filter: &serde_json::Value,
998 valid_columns: &[String],
999 ) -> Result<(String, Vec<JsonParam>), StorageError> {
1000 let empty = serde_json::Map::new();
1001 let obj = filter.as_object().unwrap_or(&empty);
1002
1003 let validate = |col: &str| -> Result<(), StorageError> {
1004 if col == "id" || valid_columns.iter().any(|c| c == col) {
1005 Ok(())
1006 } else {
1007 Err(StorageError {
1008 code: "UNKNOWN_COLUMN".into(),
1009 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1010 })
1011 }
1012 };
1013
1014 let mut where_clauses: Vec<String> = Vec::new();
1015 let mut order_clause = String::new();
1016 let mut limit_clause = String::new();
1017 let mut offset_clause = String::new();
1018 let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1022
1023 for (key, val) in obj {
1024 match key.as_str() {
1025 "$search" => {
1026 let raw = match val {
1034 serde_json::Value::String(s) => s.clone(),
1035 other => other.to_string(),
1036 };
1037 let placeholder_n = planned.len() + 1;
1046 where_clauses.push(format!(
1047 "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1048 WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1049 quote_ident(entity),
1050 ));
1051 planned.push((
1055 format!("__search_{}", planned.len()),
1056 "__INLINE__".into(),
1057 JsonParam::Text(raw),
1058 ));
1059 }
1060 "$order" => {
1061 if let Some(ord) = val.as_object() {
1062 let mut parts = Vec::new();
1063 for (col, dir) in ord {
1064 validate(col)?;
1065 let d = match dir.as_str().unwrap_or("asc") {
1066 "desc" | "DESC" => "DESC",
1067 _ => "ASC",
1068 };
1069 parts.push(format!("{} {d}", quote_ident(col)));
1070 }
1071 if !parts.is_empty() {
1072 order_clause = format!(" ORDER BY {}", parts.join(", "));
1073 }
1074 }
1075 }
1076 "$limit" => {
1077 if let Some(n) = val.as_u64() {
1078 limit_clause = format!(" LIMIT {}", n);
1079 }
1080 }
1081 "$offset" => {
1082 if let Some(n) = val.as_u64() {
1083 offset_clause = format!(" OFFSET {}", n);
1084 }
1085 }
1086 field => {
1087 validate(field)?;
1088 match val {
1089 serde_json::Value::Object(ops) => {
1090 for (op, v) in ops {
1091 match op.as_str() {
1092 "$not" => planned.push((
1093 field.into(),
1094 "!=".into(),
1095 value_to_pg(v),
1096 )),
1097 "$gt" => {
1098 planned.push((field.into(), ">".into(), value_to_pg(v)))
1099 }
1100 "$gte" => planned.push((
1101 field.into(),
1102 ">=".into(),
1103 value_to_pg(v),
1104 )),
1105 "$lt" => {
1106 planned.push((field.into(), "<".into(), value_to_pg(v)))
1107 }
1108 "$lte" => planned.push((
1109 field.into(),
1110 "<=".into(),
1111 value_to_pg(v),
1112 )),
1113 "$like" => {
1114 let raw = match v {
1125 serde_json::Value::String(s) => s.clone(),
1126 other => other.to_string(),
1127 };
1128 planned.push((
1129 field.into(),
1130 "LIKE".into(),
1131 JsonParam::Text(format!("%{raw}%")),
1132 ));
1133 }
1134 "$in" => {
1135 if let Some(arr) = v.as_array() {
1136 if arr.is_empty() {
1137 where_clauses.push("FALSE".into());
1148 } else {
1149 let placeholders: Vec<String> = (0..arr.len())
1150 .map(|i| {
1151 format!("${}", planned.len() + 1 + i)
1152 })
1153 .collect();
1154 where_clauses.push(format!(
1155 "{} IN ({})",
1156 quote_ident(field),
1157 placeholders.join(", "),
1158 ));
1159 for x in arr {
1160 planned.push((
1161 format!("__inline_{}", planned.len()),
1162 "__INLINE__".into(),
1163 value_to_pg(x),
1164 ));
1165 }
1166 }
1167 }
1168 }
1169 _ => {}
1170 }
1171 }
1172 }
1173 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1174 }
1175 }
1176 }
1177 }
1178
1179 let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1181 for (field, op, v) in &planned {
1182 if op == "__INLINE__" {
1183 params.push(v.clone());
1185 } else {
1186 let placeholder = format!("${}", params.len() + 1);
1187 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1188 params.push(v.clone());
1189 }
1190 }
1191
1192 let where_sql = if where_clauses.is_empty() {
1193 String::new()
1194 } else {
1195 format!(" WHERE {}", where_clauses.join(" AND "))
1196 };
1197 let final_order = if order_clause.is_empty() {
1202 format!(" ORDER BY {}", quote_ident("id"))
1203 } else {
1204 order_clause
1205 };
1206 let sql = format!(
1207 "SELECT * FROM {}{}{}{}{}",
1208 quote_ident(entity),
1209 where_sql,
1210 final_order,
1211 limit_clause,
1212 offset_clause,
1213 );
1214
1215 Ok((sql, params))
1216 }
1217
1218 pub fn aggregate(
1237 &mut self,
1238 entity: &str,
1239 spec: &serde_json::Value,
1240 valid_columns: &[String],
1241 ) -> Result<serde_json::Value, StorageError> {
1242 let (sql, params, column_names) =
1243 Self::build_aggregate_sql(entity, spec, valid_columns)?;
1244 let pg_params = as_pg_params(¶ms);
1245 let rows = self
1246 .client
1247 .query(sql.as_str(), &pg_params)
1248 .map_err(pg_err)?;
1249 Ok(aggregate_rows_to_json(&rows, &column_names))
1250 }
1251
1252 pub(crate) fn build_aggregate_sql(
1259 entity: &str,
1260 spec: &serde_json::Value,
1261 valid_columns: &[String],
1262 ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1263 let obj = spec.as_object().ok_or_else(|| StorageError {
1264 code: "INVALID_QUERY".into(),
1265 message: "aggregate spec must be a JSON object".into(),
1266 })?;
1267
1268 let validate = |col: &str| -> Result<(), StorageError> {
1269 if col == "id" || valid_columns.iter().any(|c| c == col) {
1270 Ok(())
1271 } else {
1272 Err(StorageError {
1273 code: "UNKNOWN_COLUMN".into(),
1274 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1275 })
1276 }
1277 };
1278
1279 let mut select_parts: Vec<String> = Vec::new();
1280 let mut result_fields: Vec<String> = Vec::new();
1281
1282 if let Some(count) = obj.get("count") {
1283 match count {
1284 serde_json::Value::String(s) if s == "*" => {
1285 select_parts.push("COUNT(*) AS count".into());
1286 result_fields.push("count".into());
1287 }
1288 serde_json::Value::String(field) => {
1289 validate(field)?;
1290 let alias = format!("count_{field}");
1291 select_parts.push(format!(
1292 "COUNT({}) AS {}",
1293 quote_ident(field),
1294 quote_ident(&alias),
1295 ));
1296 result_fields.push(alias);
1297 }
1298 _ => {}
1299 }
1300 }
1301
1302 for (fn_name, prefix) in [
1303 ("sum", "sum_"),
1304 ("avg", "avg_"),
1305 ("min", "min_"),
1306 ("max", "max_"),
1307 ] {
1308 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1309 for field in fields {
1310 if let Some(f) = field.as_str() {
1311 validate(f)?;
1312 let alias = format!("{prefix}{f}");
1313 let sql_fn = fn_name.to_uppercase();
1314 select_parts.push(format!(
1315 "{}({}) AS {}",
1316 sql_fn,
1317 quote_ident(f),
1318 quote_ident(&alias),
1319 ));
1320 result_fields.push(alias);
1321 }
1322 }
1323 }
1324 }
1325
1326 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1327 for field in fields {
1328 if let Some(f) = field.as_str() {
1329 validate(f)?;
1330 let alias = format!("count_distinct_{f}");
1331 select_parts.push(format!(
1332 "COUNT(DISTINCT {}) AS {}",
1333 quote_ident(f),
1334 quote_ident(&alias),
1335 ));
1336 result_fields.push(alias);
1337 }
1338 }
1339 }
1340
1341 let mut group_by: Vec<String> = Vec::new();
1346 let mut group_select: Vec<String> = Vec::new();
1347 let mut group_field_names: Vec<String> = Vec::new();
1348 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1349 for g in groups {
1350 if let Some(f) = g.as_str() {
1351 validate(f)?;
1352 let q = quote_ident(f);
1353 group_by.push(q.clone());
1354 group_select.push(q);
1355 group_field_names.push(f.to_string());
1356 } else if let Some(spec) = g.as_object() {
1357 let field =
1358 spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1359 StorageError {
1360 code: "INVALID_QUERY".into(),
1361 message: "groupBy object spec requires `field`".into(),
1362 }
1363 })?;
1364 validate(field)?;
1365 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1366 let trunc_unit = match bucket {
1367 "hour" | "day" | "week" | "month" | "year" => bucket,
1368 _ => {
1369 return Err(StorageError {
1370 code: "INVALID_QUERY".into(),
1371 message: format!(
1372 "bucket must be one of hour/day/week/month/year, got {bucket}"
1373 ),
1374 });
1375 }
1376 };
1377 let alias = format!("{field}_{bucket}");
1378 let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1379 group_by.push(expr.clone());
1380 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1381 group_field_names.push(alias);
1382 }
1383 }
1384 }
1385
1386 let mut full_select = group_select.clone();
1387 full_select.extend(select_parts.iter().cloned());
1388 if full_select.is_empty() {
1389 return Err(StorageError {
1390 code: "INVALID_QUERY".into(),
1391 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1392 });
1393 }
1394
1395 let mut where_clauses: Vec<String> = Vec::new();
1396 let mut params: Vec<JsonParam> = Vec::new();
1397 if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1398 for (k, v) in w {
1399 validate(k)?;
1400 let placeholder = format!("${}", params.len() + 1);
1401 where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1402 params.push(value_to_pg(v));
1403 }
1404 }
1405 let where_sql = if where_clauses.is_empty() {
1406 String::new()
1407 } else {
1408 format!(" WHERE {}", where_clauses.join(" AND "))
1409 };
1410 let group_sql = if group_by.is_empty() {
1411 String::new()
1412 } else {
1413 format!(" GROUP BY {}", group_by.join(", "))
1414 };
1415
1416 let sql = format!(
1417 "SELECT {} FROM {}{}{}",
1418 full_select.join(", "),
1419 quote_ident(entity),
1420 where_sql,
1421 group_sql,
1422 );
1423
1424 let column_names: Vec<String> = group_field_names
1425 .iter()
1426 .chain(result_fields.iter())
1427 .cloned()
1428 .collect();
1429
1430 Ok((sql, params, column_names))
1431 }
1432 }
1433
1434 pub fn aggregate_rows_to_json(
1439 rows: &[postgres::Row],
1440 column_names: &[String],
1441 ) -> serde_json::Value {
1442 let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1443 for row in rows {
1444 let row_json = row_to_json(row);
1445 if let serde_json::Value::Object(map) = &row_json {
1446 let mut filtered = serde_json::Map::new();
1447 for name in column_names {
1448 if let Some(v) = map.get(name) {
1449 filtered.insert(name.clone(), v.clone());
1450 }
1451 }
1452 out.push(serde_json::Value::Object(filtered));
1453 } else {
1454 out.push(row_json);
1455 }
1456 }
1457 serde_json::json!({ "rows": out })
1458 }
1459
1460 pub enum TxOp<'a> {
1462 Insert {
1463 entity: &'a str,
1464 data: &'a serde_json::Value,
1465 },
1466 Update {
1467 entity: &'a str,
1468 id: &'a str,
1469 data: &'a serde_json::Value,
1470 },
1471 Delete {
1472 entity: &'a str,
1473 id: &'a str,
1474 },
1475 }
1476
1477 #[derive(Debug, Clone)]
1479 pub enum TxResult {
1480 Inserted(String),
1481 Updated(bool),
1482 Deleted(bool),
1483 }
1484
1485 impl LivePostgresAdapter {
1486 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1491 let mut tx = self.client.transaction().map_err(pg_err)?;
1492 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1493
1494 for op in ops {
1495 match op {
1496 TxOp::Insert { entity, data } => {
1497 let (sql, values) = build_insert_sql(entity, data)?;
1498 let id = match &values[0] {
1499 JsonParam::Text(s) => s.clone(),
1500 _ => {
1501 return Err(StorageError {
1502 code: "PG_INTERNAL".into(),
1503 message: "build_insert_sql produced non-text id param".into(),
1504 });
1505 }
1506 };
1507 let params = as_pg_params(&values);
1508 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1509 results.push(TxResult::Inserted(id));
1510 }
1511 TxOp::Update { entity, id, data } => {
1512 let (sql, values) = build_update_sql(entity, id, data)?;
1513 let params = as_pg_params(&values);
1514 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1515 results.push(TxResult::Updated(n > 0));
1516 }
1517 TxOp::Delete { entity, id } => {
1518 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1519 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1520 results.push(TxResult::Deleted(n > 0));
1521 }
1522 }
1523 }
1524
1525 tx.commit().map_err(pg_err)?;
1526 Ok(results)
1527 }
1528 }
1529
1530 fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1536 JsonParam::from_json(v)
1537 }
1538
1539 pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1540 use postgres::types::Type;
1541 let mut obj = serde_json::Map::new();
1542 for (i, col) in row.columns().iter().enumerate() {
1543 let name = col.name().to_string();
1544
1545 let value: serde_json::Value = match *col.type_() {
1556 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1557 .flatten()
1558 .map(serde_json::Value::Bool)
1559 .unwrap_or(serde_json::Value::Null),
1560 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1561 .flatten()
1562 .map(|v| serde_json::Value::Number(v.into()))
1563 .unwrap_or(serde_json::Value::Null),
1564 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1565 .flatten()
1566 .map(|v| serde_json::Value::Number(v.into()))
1567 .unwrap_or(serde_json::Value::Null),
1568 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1569 .flatten()
1570 .map(|v| serde_json::Value::Number(v.into()))
1571 .unwrap_or(serde_json::Value::Null),
1572 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1573 .flatten()
1574 .and_then(|v| serde_json::Number::from_f64(v as f64))
1575 .map(serde_json::Value::Number)
1576 .unwrap_or(serde_json::Value::Null),
1577 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1578 .flatten()
1579 .and_then(serde_json::Number::from_f64)
1580 .map(serde_json::Value::Number)
1581 .unwrap_or(serde_json::Value::Null),
1582 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1583 .flatten()
1584 .unwrap_or(serde_json::Value::Null),
1585 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1586 .flatten()
1587 .map(|b| serde_json::Value::String(b64(&b)))
1588 .unwrap_or(serde_json::Value::Null),
1589 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1590 try_get_or_null::<Option<String>>(row, i)
1591 .flatten()
1592 .map(serde_json::Value::String)
1593 .unwrap_or(serde_json::Value::Null)
1594 }
1595 Type::TIMESTAMPTZ => {
1596 try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1603 .flatten()
1604 .map(|dt| {
1605 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1606 })
1607 .unwrap_or(serde_json::Value::Null)
1608 }
1609 Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1610 .flatten()
1611 .map(|dt| {
1612 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1613 })
1614 .unwrap_or(serde_json::Value::Null),
1615 Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1616 .flatten()
1617 .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1618 .unwrap_or(serde_json::Value::Null),
1619 _ => {
1620 match row.try_get::<_, Option<String>>(i) {
1625 Ok(Some(s)) => serde_json::Value::String(s),
1626 Ok(None) => serde_json::Value::Null,
1627 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1628 Ok(Some(bytes)) => serde_json::Value::String(
1629 String::from_utf8_lossy(&bytes).into_owned(),
1630 ),
1631 _ => serde_json::Value::Null,
1632 },
1633 }
1634 }
1635 };
1636 obj.insert(name, value);
1637 }
1638 serde_json::Value::Object(obj)
1639 }
1640
1641 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1642 where
1643 T: postgres::types::FromSql<'a>,
1644 {
1645 match row.try_get::<_, T>(i) {
1646 Ok(v) => Some(v),
1647 Err(e) => {
1648 tracing::warn!(
1649 "[postgres] decode failed for column {} ({}): {e}",
1650 i,
1651 row.columns()[i].name()
1652 );
1653 None
1654 }
1655 }
1656 }
1657
1658 fn b64(bytes: &[u8]) -> String {
1661 const TABLE: &[u8; 64] =
1662 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1663 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1664 let chunks = bytes.chunks(3);
1665 for chunk in chunks {
1666 let b = [
1667 chunk.first().copied().unwrap_or(0),
1668 chunk.get(1).copied().unwrap_or(0),
1669 chunk.get(2).copied().unwrap_or(0),
1670 ];
1671 out.push(TABLE[(b[0] >> 2) as usize] as char);
1672 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1673 if chunk.len() > 1 {
1674 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1675 } else {
1676 out.push('=');
1677 }
1678 if chunk.len() > 2 {
1679 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1680 } else {
1681 out.push('=');
1682 }
1683 }
1684 out
1685 }
1686
1687 fn pg_err(e: postgres::Error) -> StorageError {
1688 use std::error::Error;
1694 let mut detail = format!("{e}");
1695 let mut src: Option<&dyn Error> = e.source();
1696 while let Some(s) = src {
1697 detail.push_str(": ");
1698 detail.push_str(&format!("{s}"));
1699 src = s.source();
1700 }
1701 StorageError {
1702 code: "PG_QUERY_FAILED".into(),
1703 message: format!("Postgres query failed: {detail}"),
1704 }
1705 }
1706}
1707
1708#[cfg(test)]
1713mod tests {
1714 use super::*;
1715
1716 fn test_manifest() -> AppManifest {
1720 use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1721 let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1722 name: name.into(),
1723 field_type: ty.into(),
1724 optional: opt,
1725 unique: uniq,
1726 crdt: None,
1727 };
1728 AppManifest {
1729 manifest_version: 1,
1730 name: "test".into(),
1731 version: "0.0.0".into(),
1732 entities: vec![
1733 ManifestEntity {
1734 name: "User".into(),
1735 fields: vec![
1736 f("email", "string", false, true),
1737 f("displayName", "string", false, false),
1738 f("createdAt", "datetime", false, false),
1739 ],
1740 indexes: vec![],
1741 relations: vec![],
1742 search: None,
1743 crdt: true,
1744 },
1745 ManifestEntity {
1746 name: "Todo".into(),
1747 fields: vec![
1748 f("title", "string", false, false),
1749 f("done", "bool", false, false),
1750 f("userId", "id(User)", false, false),
1751 f("createdAt", "datetime", false, false),
1752 ],
1753 indexes: vec![ManifestIndex {
1754 name: "by_user".into(),
1755 fields: vec!["userId".into()],
1756 unique: false,
1757 }],
1758 relations: vec![],
1759 search: None,
1760 crdt: true,
1761 },
1762 ],
1763 queries: vec![],
1764 actions: vec![],
1765 policies: vec![],
1766 routes: vec![],
1767 auth: Default::default(),
1768 }
1769 }
1770
1771 #[test]
1772 fn pg_type_mapping() {
1773 assert_eq!(pg_column_type("string"), "TEXT");
1774 assert_eq!(pg_column_type("int"), "INTEGER");
1775 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1776 assert_eq!(pg_column_type("bool"), "BOOLEAN");
1777 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1778 assert_eq!(pg_column_type("richtext"), "TEXT");
1779 assert_eq!(pg_column_type("id(User)"), "TEXT");
1780 }
1781
1782 #[test]
1783 fn quote_ident_simple() {
1784 assert_eq!(quote_ident("User"), "\"User\"");
1785 assert_eq!(quote_ident("email"), "\"email\"");
1786 }
1787
1788 #[test]
1789 fn quote_ident_escapes_embedded_double_quotes() {
1790 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1791 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1792 }
1793
1794 #[test]
1795 fn create_table_sql_basic() {
1796 let fields = vec![
1797 FieldSpec {
1798 name: "email".into(),
1799 field_type: "string".into(),
1800 optional: false,
1801 unique: true,
1802 },
1803 FieldSpec {
1804 name: "age".into(),
1805 field_type: "int".into(),
1806 optional: true,
1807 unique: false,
1808 },
1809 ];
1810 let sql = create_table_sql("User", &fields);
1811 assert_eq!(
1812 sql,
1813 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1814 );
1815 }
1816
1817 #[test]
1818 fn create_table_sql_escapes_identifiers() {
1819 let fields = vec![FieldSpec {
1820 name: "col\"x".into(),
1821 field_type: "string".into(),
1822 optional: false,
1823 unique: false,
1824 }];
1825 let sql = create_table_sql("my\"table", &fields);
1826 assert!(sql.contains("\"my\"\"table\""));
1827 assert!(sql.contains("\"col\"\"x\""));
1828 }
1829
1830 #[test]
1831 fn create_index_sql_unique() {
1832 let sql = create_index_sql("User", "by_email", &["email".into()], true);
1833 assert_eq!(
1834 sql,
1835 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1836 );
1837 }
1838
1839 #[test]
1840 fn create_index_sql_non_unique() {
1841 let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1842 assert_eq!(
1843 sql,
1844 "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1845 );
1846 }
1847
1848 #[test]
1849 fn add_column_sql_basic() {
1850 let field = FieldSpec {
1851 name: "bio".into(),
1852 field_type: "string".into(),
1853 optional: true,
1854 unique: false,
1855 };
1856 let sql = add_column_sql("User", &field);
1857 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1858 }
1859
1860 #[test]
1861 fn plan_from_manifest() {
1862 let adapter = PostgresAdapter;
1863 let manifest = test_manifest();
1864 let plan = adapter.plan_schema(&manifest).unwrap();
1865
1866 assert!(plan.operations.iter().any(|op| matches!(
1868 op,
1869 SchemaOperation::CreateEntity { name, .. } if name == "User"
1870 )));
1871 assert!(plan.operations.iter().any(|op| matches!(
1872 op,
1873 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1874 )));
1875 assert!(plan.operations.iter().any(|op| matches!(
1876 op,
1877 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1878 )));
1879 }
1880
1881 #[test]
1882 fn plan_to_sql_produces_statements() {
1883 let adapter = PostgresAdapter;
1884 let manifest = test_manifest();
1885 let plan = adapter.plan_schema(&manifest).unwrap();
1886 let stmts = plan_to_sql(&plan).unwrap();
1887
1888 let create_tables = stmts
1893 .iter()
1894 .filter(|s| s.starts_with("CREATE TABLE"))
1895 .count();
1896 let create_indexes = stmts
1897 .iter()
1898 .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1899 .count();
1900 assert_eq!(create_tables, 2);
1901 assert!(create_indexes >= 1);
1902 assert!(stmts[0].starts_with("CREATE TABLE"));
1903 assert!(stmts[1].starts_with("CREATE TABLE"));
1904 }
1905
1906 #[test]
1907 fn plan_to_sql_rejects_unsupported() {
1908 let plan = SchemaPlan {
1909 operations: vec![SchemaOperation::RemoveEntity {
1910 name: "User".into(),
1911 }],
1912 };
1913 let result = plan_to_sql(&plan);
1914 assert!(result.is_err());
1915 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1916 }
1917
1918 #[test]
1919 fn apply_not_implemented() {
1920 let adapter = PostgresAdapter;
1921 let plan = SchemaPlan {
1922 operations: vec![SchemaOperation::Noop],
1923 };
1924 let result = adapter.apply_schema(&plan);
1925 assert!(result.is_err());
1926 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1927 }
1928
1929 #[test]
1930 fn sql_uses_quoted_identifiers() {
1931 let fields = vec![FieldSpec {
1932 name: "createdAt".into(),
1933 field_type: "datetime".into(),
1934 optional: false,
1935 unique: false,
1936 }];
1937 let sql = create_table_sql("User", &fields);
1938 assert!(sql.contains("\"User\""));
1940 assert!(sql.contains("\"createdAt\""));
1941 assert!(sql.contains("TIMESTAMPTZ"));
1942 }
1943
1944 #[test]
1947 fn introspect_sql_constants_are_valid() {
1948 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1950 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1951 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1952 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1953 }
1954
1955 #[test]
1958 fn plan_from_empty_snapshot_creates_all() {
1959 let snapshot = crate::SchemaSnapshot { tables: vec![] };
1960 let manifest = test_manifest();
1961 let plan = plan_from_snapshot(&snapshot, &manifest);
1962
1963 assert!(plan.operations.iter().any(|op| matches!(
1964 op,
1965 SchemaOperation::CreateEntity { name, .. } if name == "User"
1966 )));
1967 assert!(plan.operations.iter().any(|op| matches!(
1968 op,
1969 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1970 )));
1971 assert!(plan.operations.iter().any(|op| matches!(
1972 op,
1973 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1974 )));
1975 }
1976
1977 #[test]
1978 fn plan_from_full_snapshot_is_noop() {
1979 let snapshot = crate::SchemaSnapshot {
1980 tables: vec![
1981 crate::TableSnapshot {
1982 name: "User".into(),
1983 columns: vec![
1984 crate::ColumnSnapshot {
1985 name: "id".into(),
1986 column_type: "TEXT".into(),
1987 notnull: true,
1988 primary_key: true,
1989 },
1990 crate::ColumnSnapshot {
1991 name: "email".into(),
1992 column_type: "TEXT".into(),
1993 notnull: true,
1994 primary_key: false,
1995 },
1996 crate::ColumnSnapshot {
1997 name: "displayName".into(),
1998 column_type: "TEXT".into(),
1999 notnull: true,
2000 primary_key: false,
2001 },
2002 crate::ColumnSnapshot {
2003 name: "createdAt".into(),
2004 column_type: "TIMESTAMPTZ".into(),
2005 notnull: true,
2006 primary_key: false,
2007 },
2008 ],
2009 indexes: vec![],
2010 },
2011 crate::TableSnapshot {
2012 name: "Todo".into(),
2013 columns: vec![
2014 crate::ColumnSnapshot {
2015 name: "id".into(),
2016 column_type: "TEXT".into(),
2017 notnull: true,
2018 primary_key: true,
2019 },
2020 crate::ColumnSnapshot {
2021 name: "title".into(),
2022 column_type: "TEXT".into(),
2023 notnull: true,
2024 primary_key: false,
2025 },
2026 crate::ColumnSnapshot {
2027 name: "done".into(),
2028 column_type: "BOOLEAN".into(),
2029 notnull: true,
2030 primary_key: false,
2031 },
2032 crate::ColumnSnapshot {
2033 name: "userId".into(),
2034 column_type: "TEXT".into(),
2035 notnull: true,
2036 primary_key: false,
2037 },
2038 crate::ColumnSnapshot {
2039 name: "createdAt".into(),
2040 column_type: "TIMESTAMPTZ".into(),
2041 notnull: true,
2042 primary_key: false,
2043 },
2044 ],
2045 indexes: vec![crate::IndexSnapshot {
2046 name: "Todo_by_user".into(),
2047 columns: vec!["userId".into()],
2048 unique: false,
2049 }],
2050 },
2051 ],
2052 };
2053 let manifest = test_manifest();
2054 let plan = plan_from_snapshot(&snapshot, &manifest);
2055 assert!(plan.is_empty());
2056 }
2057
2058 #[test]
2059 fn plan_detects_missing_column_in_snapshot() {
2060 let snapshot = crate::SchemaSnapshot {
2061 tables: vec![
2062 crate::TableSnapshot {
2063 name: "User".into(),
2064 columns: vec![
2065 crate::ColumnSnapshot {
2066 name: "id".into(),
2067 column_type: "TEXT".into(),
2068 notnull: true,
2069 primary_key: true,
2070 },
2071 crate::ColumnSnapshot {
2072 name: "email".into(),
2073 column_type: "TEXT".into(),
2074 notnull: true,
2075 primary_key: false,
2076 },
2077 ],
2079 indexes: vec![],
2080 },
2081 crate::TableSnapshot {
2082 name: "Todo".into(),
2083 columns: vec![
2084 crate::ColumnSnapshot {
2085 name: "id".into(),
2086 column_type: "TEXT".into(),
2087 notnull: true,
2088 primary_key: true,
2089 },
2090 crate::ColumnSnapshot {
2091 name: "title".into(),
2092 column_type: "TEXT".into(),
2093 notnull: true,
2094 primary_key: false,
2095 },
2096 crate::ColumnSnapshot {
2097 name: "done".into(),
2098 column_type: "BOOLEAN".into(),
2099 notnull: true,
2100 primary_key: false,
2101 },
2102 crate::ColumnSnapshot {
2103 name: "userId".into(),
2104 column_type: "TEXT".into(),
2105 notnull: true,
2106 primary_key: false,
2107 },
2108 crate::ColumnSnapshot {
2109 name: "createdAt".into(),
2110 column_type: "TIMESTAMPTZ".into(),
2111 notnull: true,
2112 primary_key: false,
2113 },
2114 ],
2115 indexes: vec![crate::IndexSnapshot {
2116 name: "Todo_by_user".into(),
2117 columns: vec!["userId".into()],
2118 unique: false,
2119 }],
2120 },
2121 ],
2122 };
2123 let manifest = test_manifest();
2124 let plan = plan_from_snapshot(&snapshot, &manifest);
2125
2126 let add_fields: Vec<_> = plan
2127 .operations
2128 .iter()
2129 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2130 .collect();
2131 assert_eq!(add_fields.len(), 2); }
2133
2134 #[test]
2137 fn json_value_to_string_handles_all_types() {
2138 assert_eq!(
2139 json_value_to_string(&serde_json::Value::String("hello".into())),
2140 "hello"
2141 );
2142 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2143 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2144 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2145 assert_eq!(
2146 json_value_to_string(&serde_json::Value::Bool(false)),
2147 "false"
2148 );
2149 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2150 assert_eq!(
2152 json_value_to_string(&serde_json::json!([1, 2, 3])),
2153 "[1,2,3]"
2154 );
2155 assert_eq!(
2156 json_value_to_string(&serde_json::json!({"a": 1})),
2157 "{\"a\":1}"
2158 );
2159 }
2160
2161 #[test]
2162 fn generate_id_returns_hex_string() {
2163 let id = generate_id();
2164 assert!(!id.is_empty());
2165 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2167 }
2168
2169 #[test]
2170 fn generate_id_is_unique_across_calls() {
2171 let id1 = generate_id();
2172 let id2 = generate_id();
2173 assert_ne!(id1, id2);
2174 }
2175
2176 #[test]
2177 fn generate_id_is_lex_sortable() {
2178 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2181 let sorted = {
2182 let mut s = ids.clone();
2183 s.sort();
2184 s
2185 };
2186 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2187 let len0 = ids[0].len();
2190 assert!(ids.iter().all(|id| id.len() == len0));
2191 ids.dedup();
2192 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2193 }
2194
2195 #[test]
2196 fn build_insert_sql_simple() {
2197 let data = serde_json::json!({
2198 "email": "alice@example.com",
2199 "displayName": "Alice"
2200 });
2201 let (sql, values) = build_insert_sql("User", &data).unwrap();
2202
2203 assert!(sql.starts_with("INSERT INTO \"User\""));
2204 assert!(sql.contains("id"));
2205 assert!(sql.contains("$1"));
2206 assert!(sql.contains("$2"));
2207 assert!(sql.contains("$3"));
2208 match &values[0] {
2210 JsonParam::Text(s) => assert!(!s.is_empty()),
2211 other => panic!("expected Text id param, got {other:?}"),
2212 }
2213 assert_eq!(values.len(), 3); }
2215
2216 #[test]
2217 fn build_insert_sql_preserves_json_types() {
2218 let data = serde_json::json!({
2219 "n": 42,
2220 "f": 1.5,
2221 "b": true,
2222 "s": "hi",
2223 "z": null,
2224 });
2225 let (_sql, values) = build_insert_sql("T", &data).unwrap();
2226 let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2228 assert!(matches!(kinds[0], JsonParam::Bool(true)));
2229 assert!(matches!(kinds[1], JsonParam::Float(_)));
2230 assert!(matches!(kinds[2], JsonParam::Int(42)));
2231 assert!(matches!(kinds[3], JsonParam::Text(_)));
2232 assert!(matches!(kinds[4], JsonParam::Null));
2233 }
2234
2235 #[test]
2236 fn build_insert_sql_quotes_column_names() {
2237 let data = serde_json::json!({"createdAt": "2026-01-01"});
2238 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2239 assert!(sql.contains("\"createdAt\""));
2240 assert!(sql.contains("\"Todo\""));
2241 }
2242
2243 #[test]
2244 fn build_insert_sql_rejects_non_object() {
2245 let data = serde_json::json!("not an object");
2246 let result = build_insert_sql("User", &data);
2247 assert!(result.is_err());
2248 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2249 }
2250
2251 #[test]
2252 fn build_update_sql_simple() {
2253 let data = serde_json::json!({
2254 "displayName": "Bob",
2255 "email": "bob@example.com"
2256 });
2257 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2258
2259 assert!(sql.starts_with("UPDATE \"User\" SET"));
2260 assert!(sql.contains("WHERE id = $1"));
2261 assert!(sql.contains("$2"));
2262 assert!(sql.contains("$3"));
2263 match &values[0] {
2264 JsonParam::Text(s) => assert_eq!(s, "abc123"),
2265 other => panic!("expected Text id param, got {other:?}"),
2266 }
2267 assert_eq!(values.len(), 3); }
2269
2270 #[test]
2271 fn build_update_sql_quotes_column_names() {
2272 let data = serde_json::json!({"displayName": "Carol"});
2273 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2274 assert!(sql.contains("\"displayName\" = $2"));
2275 }
2276
2277 #[test]
2278 fn build_update_sql_rejects_non_object() {
2279 let data = serde_json::json!(42);
2280 let result = build_update_sql("User", "id1", &data);
2281 assert!(result.is_err());
2282 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2283 }
2284
2285 #[test]
2286 fn build_update_sql_rejects_empty_object() {
2287 let data = serde_json::json!({});
2288 let err = build_update_sql("User", "id1", &data).unwrap_err();
2289 assert_eq!(err.code, "PG_INVALID_DATA");
2290 assert!(err.message.contains("at least one field"));
2291 }
2292}