1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48 for field in fields {
49 let col_type = pg_column_type(&field.field_type);
50 let not_null = if field.optional { "" } else { " NOT NULL" };
51 let unique = if field.unique { " UNIQUE" } else { "" };
52 columns.push(format!(
53 "{} {}{}{}",
54 quote_ident(&field.name),
55 col_type,
56 not_null,
57 unique
58 ));
59 }
60
61 format!(
62 "CREATE TABLE IF NOT EXISTS {} ({})",
63 quote_ident(entity_name),
64 columns.join(", ")
65 )
66}
67
68pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72 let col_type = pg_column_type(&field.field_type);
73 let unique = if field.unique { " UNIQUE" } else { "" };
74 format!(
75 "ALTER TABLE {} ADD COLUMN {} {}{}",
76 quote_ident(entity_name),
77 quote_ident(&field.name),
78 col_type,
79 unique
80 )
81}
82
83pub fn create_index_sql(
85 entity_name: &str,
86 index_name: &str,
87 fields: &[String],
88 unique: bool,
89) -> String {
90 let unique_str = if unique { "UNIQUE " } else { "" };
91 let full_index_name = format!("{}_{}", entity_name, index_name);
92 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93 format!(
94 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95 unique_str,
96 quote_ident(&full_index_name),
97 quote_ident(entity_name),
98 quoted_fields.join(", ")
99 )
100}
101
102pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112 let mut operations = Vec::new();
114
115 for entity in &target.entities {
116 let fields: Vec<FieldSpec> = entity
117 .fields
118 .iter()
119 .map(|f| FieldSpec {
120 name: f.name.clone(),
121 field_type: f.field_type.clone(),
122 optional: f.optional,
123 unique: f.unique,
124 })
125 .collect();
126
127 operations.push(SchemaOperation::CreateEntity {
128 name: entity.name.clone(),
129 fields,
130 });
131
132 for index in &entity.indexes {
133 operations.push(SchemaOperation::AddIndex {
134 entity: entity.name.clone(),
135 name: index.name.clone(),
136 fields: index.fields.clone(),
137 unique: index.unique,
138 });
139 }
140 }
141
142 if operations.is_empty() {
143 operations.push(SchemaOperation::Noop);
144 }
145
146 Ok(SchemaPlan { operations })
147 }
148
149 }
151
152pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155 let mut statements = Vec::new();
156
157 for op in &plan.operations {
158 match op {
159 SchemaOperation::CreateEntity { name, fields } => {
160 statements.push(create_table_sql(name, fields));
161 }
162 SchemaOperation::AddField { entity, field } => {
163 statements.push(add_column_sql(entity, field));
164 }
165 SchemaOperation::AlterField {
166 entity,
167 previous,
168 target,
169 } => {
170 if previous.optional && !target.optional {
179 statements.push(format!(
180 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
181 quote_ident(entity),
182 quote_ident(&target.name)
183 ));
184 } else if !previous.optional && target.optional {
185 statements.push(format!(
186 "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
187 quote_ident(entity),
188 quote_ident(&target.name)
189 ));
190 }
191 }
197 SchemaOperation::AddIndex {
198 entity,
199 name,
200 fields,
201 unique,
202 } => {
203 statements.push(create_index_sql(entity, name, fields, *unique));
204 }
205 SchemaOperation::Noop => {}
206 other => {
207 return Err(StorageError {
208 code: "PG_OP_UNSUPPORTED".into(),
209 message: format!("Operation not supported by Postgres adapter: {other:?}"),
210 });
211 }
212 }
213 }
214
215 Ok(statements)
216}
217
218pub const INTROSPECT_TABLES_SQL: &str = "\
227 SELECT table_name \
228 FROM information_schema.tables \
229 WHERE table_schema = 'public' \
230 AND table_type = 'BASE TABLE' \
231 AND table_name NOT LIKE '_pylon_%' \
232 ORDER BY table_name";
233
234pub const INTROSPECT_COLUMNS_SQL: &str = "\
237 SELECT column_name, data_type, is_nullable, \
238 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
239 JOIN information_schema.key_column_usage kcu \
240 ON tc.constraint_name = kcu.constraint_name \
241 WHERE tc.table_name = c.table_name \
242 AND kcu.column_name = c.column_name \
243 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
244 FROM information_schema.columns c \
245 WHERE table_schema = 'public' AND table_name = $1 \
246 ORDER BY ordinal_position";
247
248pub const INTROSPECT_INDEXES_SQL: &str = "\
251 SELECT i.relname as index_name, \
252 ix.indisunique as is_unique, \
253 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
254 FROM pg_index ix \
255 JOIN pg_class t ON t.oid = ix.indrelid \
256 JOIN pg_class i ON i.oid = ix.indexrelid \
257 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
258 JOIN pg_namespace n ON n.oid = t.relnamespace \
259 WHERE n.nspname = 'public' \
260 AND t.relname = $1 \
261 AND NOT ix.indisprimary \
262 GROUP BY i.relname, ix.indisunique \
263 ORDER BY i.relname";
264
265pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
268 crate::plan_from_snapshot(snapshot, target)
269}
270
271pub fn generate_id() -> String {
284 use std::sync::atomic::{AtomicU32, Ordering};
285 use std::time::{SystemTime, UNIX_EPOCH};
286 static COUNTER: AtomicU32 = AtomicU32::new(0);
287 let ts = SystemTime::now()
288 .duration_since(UNIX_EPOCH)
289 .unwrap_or_default()
290 .as_nanos();
291 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
292 format!("{ts:032x}{seq:08x}")
293}
294
295pub fn json_value_to_string(val: &serde_json::Value) -> String {
304 match val {
305 serde_json::Value::String(s) => s.clone(),
306 serde_json::Value::Number(n) => n.to_string(),
307 serde_json::Value::Bool(b) => b.to_string(),
308 serde_json::Value::Null => String::new(),
309 other => other.to_string(),
310 }
311}
312
313#[derive(Debug, Clone, PartialEq)]
329pub enum JsonParam {
330 Null,
331 Text(String),
332 Int(i64),
333 Float(f64),
334 Bool(bool),
335}
336
337impl JsonParam {
338 pub fn from_json(val: &serde_json::Value) -> Self {
342 match val {
343 serde_json::Value::Null => JsonParam::Null,
344 serde_json::Value::String(s) => JsonParam::Text(s.clone()),
345 serde_json::Value::Bool(b) => JsonParam::Bool(*b),
346 serde_json::Value::Number(n) => {
347 if let Some(i) = n.as_i64() {
348 JsonParam::Int(i)
349 } else if let Some(f) = n.as_f64() {
350 JsonParam::Float(f)
351 } else {
352 JsonParam::Text(n.to_string())
353 }
354 }
355 other => JsonParam::Text(other.to_string()),
356 }
357 }
358}
359
360#[cfg(feature = "postgres-live")]
361impl postgres::types::ToSql for JsonParam {
362 fn to_sql(
363 &self,
364 ty: &postgres::types::Type,
365 out: &mut bytes::BytesMut,
366 ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
367 use postgres::types::Type;
368
369 if matches!(self, JsonParam::Null) {
373 return Ok(postgres::types::IsNull::Yes);
374 }
375
376 match (self, ty) {
383 (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
384
385 (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
386 (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
387 (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
388 (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
389 (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
390
391 (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
392 (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
393 (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
394 (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
395
396 (JsonParam::Text(s), &Type::TEXT)
397 | (JsonParam::Text(s), &Type::VARCHAR)
398 | (JsonParam::Text(s), &Type::BPCHAR)
399 | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
400 (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
401 let dt = chrono::DateTime::parse_from_rfc3339(s)
415 .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
416 .with_timezone(&chrono::Utc);
417 dt.to_sql(ty, out)
418 }
419 (JsonParam::Text(s), &Type::TIMESTAMP) => {
420 let dt = chrono::DateTime::parse_from_rfc3339(s)
424 .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
425 .with_timezone(&chrono::Utc)
426 .naive_utc();
427 dt.to_sql(ty, out)
428 }
429 (JsonParam::Text(s), &Type::DATE) => {
430 let dt = chrono::DateTime::parse_from_rfc3339(s)
431 .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
432 .with_timezone(&chrono::Utc)
433 .date_naive();
434 dt.to_sql(ty, out)
435 }
436
437 (other, _) => {
442 let s = match other {
443 JsonParam::Bool(b) => b.to_string(),
444 JsonParam::Int(n) => n.to_string(),
445 JsonParam::Float(f) => f.to_string(),
446 JsonParam::Text(s) => s.clone(),
447 JsonParam::Null => unreachable!(),
448 };
449 s.to_sql(ty, out)
450 }
451 }
452 }
453
454 fn accepts(_ty: &postgres::types::Type) -> bool {
455 true
459 }
460
461 postgres::types::to_sql_checked!();
462}
463
464pub fn build_insert_sql(
472 entity: &str,
473 data: &serde_json::Value,
474) -> Result<(String, Vec<JsonParam>), StorageError> {
475 let id = generate_id();
476 let obj = data.as_object().ok_or_else(|| StorageError {
477 code: "PG_INVALID_DATA".into(),
478 message: "Insert data must be a JSON object".into(),
479 })?;
480
481 let mut col_names = vec!["id".to_string()];
482 let mut placeholders = vec!["$1".to_string()];
483 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
484
485 for (i, (key, val)) in obj.iter().enumerate() {
486 col_names.push(quote_ident(key));
487 placeholders.push(format!("${}", i + 2));
488 values.push(JsonParam::from_json(val));
489 }
490
491 let sql = format!(
492 "INSERT INTO {} ({}) VALUES ({})",
493 quote_ident(entity),
494 col_names.join(", "),
495 placeholders.join(", ")
496 );
497
498 Ok((sql, values))
499}
500
501pub fn build_update_sql(
504 entity: &str,
505 id: &str,
506 data: &serde_json::Value,
507) -> Result<(String, Vec<JsonParam>), StorageError> {
508 let obj = data.as_object().ok_or_else(|| StorageError {
509 code: "PG_INVALID_DATA".into(),
510 message: "Update data must be a JSON object".into(),
511 })?;
512
513 if obj.is_empty() {
514 return Err(StorageError {
515 code: "PG_INVALID_DATA".into(),
516 message: "Update data must contain at least one field".into(),
517 });
518 }
519
520 let mut set_clauses = Vec::new();
521 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
522
523 for (i, (key, val)) in obj.iter().enumerate() {
524 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
525 values.push(JsonParam::from_json(val));
526 }
527
528 let sql = format!(
529 "UPDATE {} SET {} WHERE id = $1",
530 quote_ident(entity),
531 set_clauses.join(", ")
532 );
533
534 Ok((sql, values))
535}
536
537#[cfg(feature = "postgres-live")]
541fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
542 values
543 .iter()
544 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
545 .collect()
546}
547
548#[cfg(feature = "postgres-live")]
553pub mod live {
554 use super::*;
555 use crate::{
556 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
557 };
558
559 pub struct LivePostgresAdapter {
561 client: postgres::Client,
562 }
563
564 impl LivePostgresAdapter {
565 pub fn connect(url: &str) -> Result<Self, StorageError> {
567 let client =
568 postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
569 code: "PG_CONNECT_FAILED".into(),
570 message: format!("Failed to connect to Postgres: {e}"),
571 })?;
572 Ok(Self { client })
573 }
574
575 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
577 let table_rows = self
578 .client
579 .query(INTROSPECT_TABLES_SQL, &[])
580 .map_err(pg_err)?;
581
582 let mut tables = Vec::new();
583 for row in &table_rows {
584 let table_name: String = row.get(0);
585 let columns = self.read_columns(&table_name)?;
586 let indexes = self.read_indexes(&table_name)?;
587 tables.push(TableSnapshot {
588 name: table_name,
589 columns,
590 indexes,
591 });
592 }
593
594 Ok(SchemaSnapshot { tables })
595 }
596
597 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
598 let rows = self
599 .client
600 .query(INTROSPECT_COLUMNS_SQL, &[&table])
601 .map_err(pg_err)?;
602
603 let mut columns = Vec::new();
604 for row in &rows {
605 let name: String = row.get(0);
606 let data_type: String = row.get(1);
607 let is_nullable: String = row.get(2);
608 let is_pk: i64 = row.get(3);
609 columns.push(ColumnSnapshot {
610 name,
611 column_type: data_type,
612 notnull: is_nullable == "NO",
613 primary_key: is_pk > 0,
614 });
615 }
616 Ok(columns)
617 }
618
619 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
620 let rows = self
621 .client
622 .query(INTROSPECT_INDEXES_SQL, &[&table])
623 .map_err(pg_err)?;
624
625 let mut indexes = Vec::new();
626 for row in &rows {
627 let name: String = row.get(0);
628 let unique: bool = row.get(1);
629 let columns: Vec<String> = row.get(2);
630 indexes.push(IndexSnapshot {
631 name,
632 columns,
633 unique,
634 });
635 }
636 Ok(indexes)
637 }
638
639 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
641 let snapshot = self.read_schema()?;
642 Ok(crate::plan_from_snapshot(&snapshot, target))
643 }
644 }
645
646 impl StorageAdapter for LivePostgresAdapter {
647 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
648 Err(StorageError {
649 code: "PG_PLAN_NEEDS_MUTABLE".into(),
650 message: "Use plan_from_live() instead for live Postgres planning".into(),
651 })
652 }
653
654 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
655 Err(StorageError {
656 code: "PG_APPLY_USE_METHOD".into(),
657 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
658 })
659 }
660 }
661
662 impl LivePostgresAdapter {
663 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
665 let statements = plan_to_sql(plan)?;
666 for sql in &statements {
667 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
668 }
669 Ok(())
670 }
671
672 pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
678 self.client.execute(sql, &[]).map_err(pg_err)
679 }
680
681 pub fn insert(
683 &mut self,
684 entity: &str,
685 data: &serde_json::Value,
686 ) -> Result<String, StorageError> {
687 let (sql, values) = build_insert_sql(entity, data)?;
688 let id = match &values[0] {
691 JsonParam::Text(s) => s.clone(),
692 _ => {
693 return Err(StorageError {
694 code: "PG_INTERNAL".into(),
695 message: "build_insert_sql produced non-text id param".into(),
696 });
697 }
698 };
699 let params = as_pg_params(&values);
700 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
701 Ok(id)
702 }
703
704 pub fn get_by_id(
706 &mut self,
707 entity: &str,
708 id: &str,
709 ) -> Result<Option<serde_json::Value>, StorageError> {
710 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
711 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
712
713 match rows.first() {
714 Some(row) => Ok(Some(row_to_json(row))),
715 None => Ok(None),
716 }
717 }
718
719 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
721 let sql = format!("SELECT * FROM {}", quote_ident(entity));
722 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
723
724 Ok(rows.iter().map(row_to_json).collect())
725 }
726
727 pub fn list_after(
731 &mut self,
732 entity: &str,
733 after: Option<&str>,
734 limit: usize,
735 ) -> Result<Vec<serde_json::Value>, StorageError> {
736 let capped: i64 = limit.min(10_000) as i64;
739 let sql = match after {
740 Some(_) => format!(
741 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
742 quote_ident(entity)
743 ),
744 None => format!(
745 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
746 quote_ident(entity)
747 ),
748 };
749 let rows = match after {
750 Some(cursor) => self
751 .client
752 .query(sql.as_str(), &[&cursor, &capped])
753 .map_err(pg_err)?,
754 None => self
755 .client
756 .query(sql.as_str(), &[&capped])
757 .map_err(pg_err)?,
758 };
759 Ok(rows.iter().map(row_to_json).collect())
760 }
761
762 pub fn update(
764 &mut self,
765 entity: &str,
766 id: &str,
767 data: &serde_json::Value,
768 ) -> Result<bool, StorageError> {
769 let (sql, values) = build_update_sql(entity, id, data)?;
770 let params = as_pg_params(&values);
771 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
772 Ok(rows_affected > 0)
773 }
774
775 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
777 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
778 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
779 Ok(rows_affected > 0)
780 }
781
782 pub fn lookup_field(
786 &mut self,
787 entity: &str,
788 field: &str,
789 value: &str,
790 ) -> Result<Option<serde_json::Value>, StorageError> {
791 let sql = format!(
792 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
793 quote_ident(entity),
794 quote_ident(field),
795 );
796 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
797 Ok(rows.first().map(row_to_json))
798 }
799
800 pub fn query_filtered(
823 &mut self,
824 entity: &str,
825 filter: &serde_json::Value,
826 valid_columns: &[String],
827 ) -> Result<Vec<serde_json::Value>, StorageError> {
828 let empty = serde_json::Map::new();
829 let obj = filter.as_object().unwrap_or(&empty);
830
831 let validate = |col: &str| -> Result<(), StorageError> {
832 if col == "id" || valid_columns.iter().any(|c| c == col) {
833 Ok(())
834 } else {
835 Err(StorageError {
836 code: "UNKNOWN_COLUMN".into(),
837 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
838 })
839 }
840 };
841
842 let mut where_clauses: Vec<String> = Vec::new();
843 let mut order_clause = String::new();
844 let mut limit_clause = String::new();
845 let mut offset_clause = String::new();
846 let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
850
851 for (key, val) in obj {
852 match key.as_str() {
853 "$search" => {
854 return Err(StorageError {
861 code: "SEARCH_NOT_SUPPORTED".into(),
862 message: "$search is SQLite-FTS5-only; use a Postgres tsvector column with the storage adapter's full-text path"
863 .into(),
864 });
865 }
866 "$order" => {
867 if let Some(ord) = val.as_object() {
868 let mut parts = Vec::new();
869 for (col, dir) in ord {
870 validate(col)?;
871 let d = match dir.as_str().unwrap_or("asc") {
872 "desc" | "DESC" => "DESC",
873 _ => "ASC",
874 };
875 parts.push(format!("{} {d}", quote_ident(col)));
876 }
877 if !parts.is_empty() {
878 order_clause = format!(" ORDER BY {}", parts.join(", "));
879 }
880 }
881 }
882 "$limit" => {
883 if let Some(n) = val.as_u64() {
884 limit_clause = format!(" LIMIT {}", n);
885 }
886 }
887 "$offset" => {
888 if let Some(n) = val.as_u64() {
889 offset_clause = format!(" OFFSET {}", n);
890 }
891 }
892 field => {
893 validate(field)?;
894 match val {
895 serde_json::Value::Object(ops) => {
896 for (op, v) in ops {
897 match op.as_str() {
898 "$not" => planned.push((
899 field.into(),
900 "!=".into(),
901 value_to_pg(v),
902 )),
903 "$gt" => {
904 planned.push((field.into(), ">".into(), value_to_pg(v)))
905 }
906 "$gte" => planned.push((
907 field.into(),
908 ">=".into(),
909 value_to_pg(v),
910 )),
911 "$lt" => {
912 planned.push((field.into(), "<".into(), value_to_pg(v)))
913 }
914 "$lte" => planned.push((
915 field.into(),
916 "<=".into(),
917 value_to_pg(v),
918 )),
919 "$like" => {
920 let raw = match v {
931 serde_json::Value::String(s) => s.clone(),
932 other => other.to_string(),
933 };
934 planned.push((
935 field.into(),
936 "LIKE".into(),
937 JsonParam::Text(format!("%{raw}%")),
938 ));
939 }
940 "$in" => {
941 if let Some(arr) = v.as_array() {
942 if arr.is_empty() {
943 where_clauses.push("FALSE".into());
954 } else {
955 let placeholders: Vec<String> = (0..arr.len())
956 .map(|i| {
957 format!("${}", planned.len() + 1 + i)
958 })
959 .collect();
960 where_clauses.push(format!(
961 "{} IN ({})",
962 quote_ident(field),
963 placeholders.join(", "),
964 ));
965 for x in arr {
966 planned.push((
967 format!("__inline_{}", planned.len()),
968 "__INLINE__".into(),
969 value_to_pg(x),
970 ));
971 }
972 }
973 }
974 }
975 _ => {}
976 }
977 }
978 }
979 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
980 }
981 }
982 }
983 }
984
985 let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
987 for (field, op, v) in &planned {
988 if op == "__INLINE__" {
989 params.push(v.clone());
991 } else {
992 let placeholder = format!("${}", params.len() + 1);
993 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
994 params.push(v.clone());
995 }
996 }
997
998 let where_sql = if where_clauses.is_empty() {
999 String::new()
1000 } else {
1001 format!(" WHERE {}", where_clauses.join(" AND "))
1002 };
1003 let final_order = if order_clause.is_empty() {
1008 format!(" ORDER BY {}", quote_ident("id"))
1009 } else {
1010 order_clause
1011 };
1012 let sql = format!(
1013 "SELECT * FROM {}{}{}{}{}",
1014 quote_ident(entity),
1015 where_sql,
1016 final_order,
1017 limit_clause,
1018 offset_clause,
1019 );
1020
1021 let pg_params = as_pg_params(¶ms);
1022 let rows = self
1023 .client
1024 .query(sql.as_str(), &pg_params)
1025 .map_err(pg_err)?;
1026 Ok(rows.iter().map(row_to_json).collect())
1027 }
1028
1029 pub fn aggregate(
1048 &mut self,
1049 entity: &str,
1050 spec: &serde_json::Value,
1051 valid_columns: &[String],
1052 ) -> Result<serde_json::Value, StorageError> {
1053 let obj = spec.as_object().ok_or_else(|| StorageError {
1054 code: "INVALID_QUERY".into(),
1055 message: "aggregate spec must be a JSON object".into(),
1056 })?;
1057
1058 let validate = |col: &str| -> Result<(), StorageError> {
1059 if col == "id" || valid_columns.iter().any(|c| c == col) {
1060 Ok(())
1061 } else {
1062 Err(StorageError {
1063 code: "UNKNOWN_COLUMN".into(),
1064 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1065 })
1066 }
1067 };
1068
1069 let mut select_parts: Vec<String> = Vec::new();
1070 let mut result_fields: Vec<String> = Vec::new();
1071
1072 if let Some(count) = obj.get("count") {
1073 match count {
1074 serde_json::Value::String(s) if s == "*" => {
1075 select_parts.push("COUNT(*) AS count".into());
1076 result_fields.push("count".into());
1077 }
1078 serde_json::Value::String(field) => {
1079 validate(field)?;
1080 let alias = format!("count_{field}");
1081 select_parts.push(format!(
1082 "COUNT({}) AS {}",
1083 quote_ident(field),
1084 quote_ident(&alias),
1085 ));
1086 result_fields.push(alias);
1087 }
1088 _ => {}
1089 }
1090 }
1091
1092 for (fn_name, prefix) in [
1093 ("sum", "sum_"),
1094 ("avg", "avg_"),
1095 ("min", "min_"),
1096 ("max", "max_"),
1097 ] {
1098 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1099 for field in fields {
1100 if let Some(f) = field.as_str() {
1101 validate(f)?;
1102 let alias = format!("{prefix}{f}");
1103 let sql_fn = fn_name.to_uppercase();
1104 select_parts.push(format!(
1105 "{}({}) AS {}",
1106 sql_fn,
1107 quote_ident(f),
1108 quote_ident(&alias),
1109 ));
1110 result_fields.push(alias);
1111 }
1112 }
1113 }
1114 }
1115
1116 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1117 for field in fields {
1118 if let Some(f) = field.as_str() {
1119 validate(f)?;
1120 let alias = format!("count_distinct_{f}");
1121 select_parts.push(format!(
1122 "COUNT(DISTINCT {}) AS {}",
1123 quote_ident(f),
1124 quote_ident(&alias),
1125 ));
1126 result_fields.push(alias);
1127 }
1128 }
1129 }
1130
1131 let mut group_by: Vec<String> = Vec::new();
1136 let mut group_select: Vec<String> = Vec::new();
1137 let mut group_field_names: Vec<String> = Vec::new();
1138 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1139 for g in groups {
1140 if let Some(f) = g.as_str() {
1141 validate(f)?;
1142 let q = quote_ident(f);
1143 group_by.push(q.clone());
1144 group_select.push(q);
1145 group_field_names.push(f.to_string());
1146 } else if let Some(spec) = g.as_object() {
1147 let field =
1148 spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1149 StorageError {
1150 code: "INVALID_QUERY".into(),
1151 message: "groupBy object spec requires `field`".into(),
1152 }
1153 })?;
1154 validate(field)?;
1155 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1156 let trunc_unit = match bucket {
1157 "hour" | "day" | "week" | "month" | "year" => bucket,
1158 _ => {
1159 return Err(StorageError {
1160 code: "INVALID_QUERY".into(),
1161 message: format!(
1162 "bucket must be one of hour/day/week/month/year, got {bucket}"
1163 ),
1164 });
1165 }
1166 };
1167 let alias = format!("{field}_{bucket}");
1168 let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1169 group_by.push(expr.clone());
1170 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1171 group_field_names.push(alias);
1172 }
1173 }
1174 }
1175
1176 let mut full_select = group_select.clone();
1177 full_select.extend(select_parts.iter().cloned());
1178 if full_select.is_empty() {
1179 return Err(StorageError {
1180 code: "INVALID_QUERY".into(),
1181 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1182 });
1183 }
1184
1185 let mut where_clauses: Vec<String> = Vec::new();
1186 let mut params: Vec<JsonParam> = Vec::new();
1187 if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1188 for (k, v) in w {
1189 validate(k)?;
1190 let placeholder = format!("${}", params.len() + 1);
1191 where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1192 params.push(value_to_pg(v));
1193 }
1194 }
1195 let where_sql = if where_clauses.is_empty() {
1196 String::new()
1197 } else {
1198 format!(" WHERE {}", where_clauses.join(" AND "))
1199 };
1200 let group_sql = if group_by.is_empty() {
1201 String::new()
1202 } else {
1203 format!(" GROUP BY {}", group_by.join(", "))
1204 };
1205
1206 let sql = format!(
1207 "SELECT {} FROM {}{}{}",
1208 full_select.join(", "),
1209 quote_ident(entity),
1210 where_sql,
1211 group_sql,
1212 );
1213
1214 let pg_params = as_pg_params(¶ms);
1215 let rows = self
1216 .client
1217 .query(sql.as_str(), &pg_params)
1218 .map_err(pg_err)?;
1219
1220 let column_names: Vec<String> = group_field_names
1221 .iter()
1222 .chain(result_fields.iter())
1223 .cloned()
1224 .collect();
1225
1226 let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1227 for row in &rows {
1228 let row_json = row_to_json(row);
1229 if let serde_json::Value::Object(map) = &row_json {
1230 let mut filtered = serde_json::Map::new();
1231 for name in &column_names {
1232 if let Some(v) = map.get(name) {
1233 filtered.insert(name.clone(), v.clone());
1234 }
1235 }
1236 out.push(serde_json::Value::Object(filtered));
1237 } else {
1238 out.push(row_json);
1239 }
1240 }
1241 Ok(serde_json::json!({ "rows": out }))
1242 }
1243 }
1244
1245 pub enum TxOp<'a> {
1247 Insert {
1248 entity: &'a str,
1249 data: &'a serde_json::Value,
1250 },
1251 Update {
1252 entity: &'a str,
1253 id: &'a str,
1254 data: &'a serde_json::Value,
1255 },
1256 Delete {
1257 entity: &'a str,
1258 id: &'a str,
1259 },
1260 }
1261
1262 #[derive(Debug, Clone)]
1264 pub enum TxResult {
1265 Inserted(String),
1266 Updated(bool),
1267 Deleted(bool),
1268 }
1269
1270 impl LivePostgresAdapter {
1271 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1276 let mut tx = self.client.transaction().map_err(pg_err)?;
1277 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1278
1279 for op in ops {
1280 match op {
1281 TxOp::Insert { entity, data } => {
1282 let (sql, values) = build_insert_sql(entity, data)?;
1283 let id = match &values[0] {
1284 JsonParam::Text(s) => s.clone(),
1285 _ => {
1286 return Err(StorageError {
1287 code: "PG_INTERNAL".into(),
1288 message: "build_insert_sql produced non-text id param".into(),
1289 });
1290 }
1291 };
1292 let params = as_pg_params(&values);
1293 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1294 results.push(TxResult::Inserted(id));
1295 }
1296 TxOp::Update { entity, id, data } => {
1297 let (sql, values) = build_update_sql(entity, id, data)?;
1298 let params = as_pg_params(&values);
1299 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1300 results.push(TxResult::Updated(n > 0));
1301 }
1302 TxOp::Delete { entity, id } => {
1303 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1304 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1305 results.push(TxResult::Deleted(n > 0));
1306 }
1307 }
1308 }
1309
1310 tx.commit().map_err(pg_err)?;
1311 Ok(results)
1312 }
1313 }
1314
1315 fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1321 JsonParam::from_json(v)
1322 }
1323
1324 fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1325 use postgres::types::Type;
1326 let mut obj = serde_json::Map::new();
1327 for (i, col) in row.columns().iter().enumerate() {
1328 let name = col.name().to_string();
1329
1330 let value: serde_json::Value = match *col.type_() {
1341 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1342 .flatten()
1343 .map(serde_json::Value::Bool)
1344 .unwrap_or(serde_json::Value::Null),
1345 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1346 .flatten()
1347 .map(|v| serde_json::Value::Number(v.into()))
1348 .unwrap_or(serde_json::Value::Null),
1349 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1350 .flatten()
1351 .map(|v| serde_json::Value::Number(v.into()))
1352 .unwrap_or(serde_json::Value::Null),
1353 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1354 .flatten()
1355 .map(|v| serde_json::Value::Number(v.into()))
1356 .unwrap_or(serde_json::Value::Null),
1357 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1358 .flatten()
1359 .and_then(|v| serde_json::Number::from_f64(v as f64))
1360 .map(serde_json::Value::Number)
1361 .unwrap_or(serde_json::Value::Null),
1362 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1363 .flatten()
1364 .and_then(serde_json::Number::from_f64)
1365 .map(serde_json::Value::Number)
1366 .unwrap_or(serde_json::Value::Null),
1367 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1368 .flatten()
1369 .unwrap_or(serde_json::Value::Null),
1370 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1371 .flatten()
1372 .map(|b| serde_json::Value::String(b64(&b)))
1373 .unwrap_or(serde_json::Value::Null),
1374 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1375 try_get_or_null::<Option<String>>(row, i)
1376 .flatten()
1377 .map(serde_json::Value::String)
1378 .unwrap_or(serde_json::Value::Null)
1379 }
1380 Type::TIMESTAMPTZ => {
1381 try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1388 .flatten()
1389 .map(|dt| {
1390 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1391 })
1392 .unwrap_or(serde_json::Value::Null)
1393 }
1394 Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1395 .flatten()
1396 .map(|dt| {
1397 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1398 })
1399 .unwrap_or(serde_json::Value::Null),
1400 Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1401 .flatten()
1402 .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1403 .unwrap_or(serde_json::Value::Null),
1404 _ => {
1405 match row.try_get::<_, Option<String>>(i) {
1410 Ok(Some(s)) => serde_json::Value::String(s),
1411 Ok(None) => serde_json::Value::Null,
1412 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1413 Ok(Some(bytes)) => serde_json::Value::String(
1414 String::from_utf8_lossy(&bytes).into_owned(),
1415 ),
1416 _ => serde_json::Value::Null,
1417 },
1418 }
1419 }
1420 };
1421 obj.insert(name, value);
1422 }
1423 serde_json::Value::Object(obj)
1424 }
1425
1426 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1427 where
1428 T: postgres::types::FromSql<'a>,
1429 {
1430 match row.try_get::<_, T>(i) {
1431 Ok(v) => Some(v),
1432 Err(e) => {
1433 tracing::warn!(
1434 "[postgres] decode failed for column {} ({}): {e}",
1435 i,
1436 row.columns()[i].name()
1437 );
1438 None
1439 }
1440 }
1441 }
1442
1443 fn b64(bytes: &[u8]) -> String {
1446 const TABLE: &[u8; 64] =
1447 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1448 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1449 let chunks = bytes.chunks(3);
1450 for chunk in chunks {
1451 let b = [
1452 chunk.first().copied().unwrap_or(0),
1453 chunk.get(1).copied().unwrap_or(0),
1454 chunk.get(2).copied().unwrap_or(0),
1455 ];
1456 out.push(TABLE[(b[0] >> 2) as usize] as char);
1457 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1458 if chunk.len() > 1 {
1459 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1460 } else {
1461 out.push('=');
1462 }
1463 if chunk.len() > 2 {
1464 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1465 } else {
1466 out.push('=');
1467 }
1468 }
1469 out
1470 }
1471
1472 fn pg_err(e: postgres::Error) -> StorageError {
1473 use std::error::Error;
1479 let mut detail = format!("{e}");
1480 let mut src: Option<&dyn Error> = e.source();
1481 while let Some(s) = src {
1482 detail.push_str(": ");
1483 detail.push_str(&format!("{s}"));
1484 src = s.source();
1485 }
1486 StorageError {
1487 code: "PG_QUERY_FAILED".into(),
1488 message: format!("Postgres query failed: {detail}"),
1489 }
1490 }
1491}
1492
1493#[cfg(test)]
1498mod tests {
1499 use super::*;
1500
1501 fn test_manifest() -> AppManifest {
1505 use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1506 let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1507 name: name.into(),
1508 field_type: ty.into(),
1509 optional: opt,
1510 unique: uniq,
1511 crdt: None,
1512 };
1513 AppManifest {
1514 manifest_version: 1,
1515 name: "test".into(),
1516 version: "0.0.0".into(),
1517 entities: vec![
1518 ManifestEntity {
1519 name: "User".into(),
1520 fields: vec![
1521 f("email", "string", false, true),
1522 f("displayName", "string", false, false),
1523 f("createdAt", "datetime", false, false),
1524 ],
1525 indexes: vec![],
1526 relations: vec![],
1527 search: None,
1528 crdt: true,
1529 },
1530 ManifestEntity {
1531 name: "Todo".into(),
1532 fields: vec![
1533 f("title", "string", false, false),
1534 f("done", "bool", false, false),
1535 f("userId", "id(User)", false, false),
1536 f("createdAt", "datetime", false, false),
1537 ],
1538 indexes: vec![ManifestIndex {
1539 name: "by_user".into(),
1540 fields: vec!["userId".into()],
1541 unique: false,
1542 }],
1543 relations: vec![],
1544 search: None,
1545 crdt: true,
1546 },
1547 ],
1548 queries: vec![],
1549 actions: vec![],
1550 policies: vec![],
1551 routes: vec![],
1552 auth: Default::default(),
1553 }
1554 }
1555
1556 #[test]
1557 fn pg_type_mapping() {
1558 assert_eq!(pg_column_type("string"), "TEXT");
1559 assert_eq!(pg_column_type("int"), "INTEGER");
1560 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1561 assert_eq!(pg_column_type("bool"), "BOOLEAN");
1562 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1563 assert_eq!(pg_column_type("richtext"), "TEXT");
1564 assert_eq!(pg_column_type("id(User)"), "TEXT");
1565 }
1566
1567 #[test]
1568 fn quote_ident_simple() {
1569 assert_eq!(quote_ident("User"), "\"User\"");
1570 assert_eq!(quote_ident("email"), "\"email\"");
1571 }
1572
1573 #[test]
1574 fn quote_ident_escapes_embedded_double_quotes() {
1575 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1576 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1577 }
1578
1579 #[test]
1580 fn create_table_sql_basic() {
1581 let fields = vec![
1582 FieldSpec {
1583 name: "email".into(),
1584 field_type: "string".into(),
1585 optional: false,
1586 unique: true,
1587 },
1588 FieldSpec {
1589 name: "age".into(),
1590 field_type: "int".into(),
1591 optional: true,
1592 unique: false,
1593 },
1594 ];
1595 let sql = create_table_sql("User", &fields);
1596 assert_eq!(
1597 sql,
1598 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1599 );
1600 }
1601
1602 #[test]
1603 fn create_table_sql_escapes_identifiers() {
1604 let fields = vec![FieldSpec {
1605 name: "col\"x".into(),
1606 field_type: "string".into(),
1607 optional: false,
1608 unique: false,
1609 }];
1610 let sql = create_table_sql("my\"table", &fields);
1611 assert!(sql.contains("\"my\"\"table\""));
1612 assert!(sql.contains("\"col\"\"x\""));
1613 }
1614
1615 #[test]
1616 fn create_index_sql_unique() {
1617 let sql = create_index_sql("User", "by_email", &["email".into()], true);
1618 assert_eq!(
1619 sql,
1620 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1621 );
1622 }
1623
1624 #[test]
1625 fn create_index_sql_non_unique() {
1626 let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1627 assert_eq!(
1628 sql,
1629 "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1630 );
1631 }
1632
1633 #[test]
1634 fn add_column_sql_basic() {
1635 let field = FieldSpec {
1636 name: "bio".into(),
1637 field_type: "string".into(),
1638 optional: true,
1639 unique: false,
1640 };
1641 let sql = add_column_sql("User", &field);
1642 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1643 }
1644
1645 #[test]
1646 fn plan_from_manifest() {
1647 let adapter = PostgresAdapter;
1648 let manifest = test_manifest();
1649 let plan = adapter.plan_schema(&manifest).unwrap();
1650
1651 assert!(plan.operations.iter().any(|op| matches!(
1653 op,
1654 SchemaOperation::CreateEntity { name, .. } if name == "User"
1655 )));
1656 assert!(plan.operations.iter().any(|op| matches!(
1657 op,
1658 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1659 )));
1660 assert!(plan.operations.iter().any(|op| matches!(
1661 op,
1662 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1663 )));
1664 }
1665
1666 #[test]
1667 fn plan_to_sql_produces_statements() {
1668 let adapter = PostgresAdapter;
1669 let manifest = test_manifest();
1670 let plan = adapter.plan_schema(&manifest).unwrap();
1671 let stmts = plan_to_sql(&plan).unwrap();
1672
1673 let create_tables = stmts
1678 .iter()
1679 .filter(|s| s.starts_with("CREATE TABLE"))
1680 .count();
1681 let create_indexes = stmts
1682 .iter()
1683 .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1684 .count();
1685 assert_eq!(create_tables, 2);
1686 assert!(create_indexes >= 1);
1687 assert!(stmts[0].starts_with("CREATE TABLE"));
1688 assert!(stmts[1].starts_with("CREATE TABLE"));
1689 }
1690
1691 #[test]
1692 fn plan_to_sql_rejects_unsupported() {
1693 let plan = SchemaPlan {
1694 operations: vec![SchemaOperation::RemoveEntity {
1695 name: "User".into(),
1696 }],
1697 };
1698 let result = plan_to_sql(&plan);
1699 assert!(result.is_err());
1700 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1701 }
1702
1703 #[test]
1704 fn apply_not_implemented() {
1705 let adapter = PostgresAdapter;
1706 let plan = SchemaPlan {
1707 operations: vec![SchemaOperation::Noop],
1708 };
1709 let result = adapter.apply_schema(&plan);
1710 assert!(result.is_err());
1711 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1712 }
1713
1714 #[test]
1715 fn sql_uses_quoted_identifiers() {
1716 let fields = vec![FieldSpec {
1717 name: "createdAt".into(),
1718 field_type: "datetime".into(),
1719 optional: false,
1720 unique: false,
1721 }];
1722 let sql = create_table_sql("User", &fields);
1723 assert!(sql.contains("\"User\""));
1725 assert!(sql.contains("\"createdAt\""));
1726 assert!(sql.contains("TIMESTAMPTZ"));
1727 }
1728
1729 #[test]
1732 fn introspect_sql_constants_are_valid() {
1733 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1735 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1736 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1737 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1738 }
1739
1740 #[test]
1743 fn plan_from_empty_snapshot_creates_all() {
1744 let snapshot = crate::SchemaSnapshot { tables: vec![] };
1745 let manifest = test_manifest();
1746 let plan = plan_from_snapshot(&snapshot, &manifest);
1747
1748 assert!(plan.operations.iter().any(|op| matches!(
1749 op,
1750 SchemaOperation::CreateEntity { name, .. } if name == "User"
1751 )));
1752 assert!(plan.operations.iter().any(|op| matches!(
1753 op,
1754 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1755 )));
1756 assert!(plan.operations.iter().any(|op| matches!(
1757 op,
1758 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1759 )));
1760 }
1761
1762 #[test]
1763 fn plan_from_full_snapshot_is_noop() {
1764 let snapshot = crate::SchemaSnapshot {
1765 tables: vec![
1766 crate::TableSnapshot {
1767 name: "User".into(),
1768 columns: vec![
1769 crate::ColumnSnapshot {
1770 name: "id".into(),
1771 column_type: "TEXT".into(),
1772 notnull: true,
1773 primary_key: true,
1774 },
1775 crate::ColumnSnapshot {
1776 name: "email".into(),
1777 column_type: "TEXT".into(),
1778 notnull: true,
1779 primary_key: false,
1780 },
1781 crate::ColumnSnapshot {
1782 name: "displayName".into(),
1783 column_type: "TEXT".into(),
1784 notnull: true,
1785 primary_key: false,
1786 },
1787 crate::ColumnSnapshot {
1788 name: "createdAt".into(),
1789 column_type: "TIMESTAMPTZ".into(),
1790 notnull: true,
1791 primary_key: false,
1792 },
1793 ],
1794 indexes: vec![],
1795 },
1796 crate::TableSnapshot {
1797 name: "Todo".into(),
1798 columns: vec![
1799 crate::ColumnSnapshot {
1800 name: "id".into(),
1801 column_type: "TEXT".into(),
1802 notnull: true,
1803 primary_key: true,
1804 },
1805 crate::ColumnSnapshot {
1806 name: "title".into(),
1807 column_type: "TEXT".into(),
1808 notnull: true,
1809 primary_key: false,
1810 },
1811 crate::ColumnSnapshot {
1812 name: "done".into(),
1813 column_type: "BOOLEAN".into(),
1814 notnull: true,
1815 primary_key: false,
1816 },
1817 crate::ColumnSnapshot {
1818 name: "userId".into(),
1819 column_type: "TEXT".into(),
1820 notnull: true,
1821 primary_key: false,
1822 },
1823 crate::ColumnSnapshot {
1824 name: "createdAt".into(),
1825 column_type: "TIMESTAMPTZ".into(),
1826 notnull: true,
1827 primary_key: false,
1828 },
1829 ],
1830 indexes: vec![crate::IndexSnapshot {
1831 name: "Todo_by_user".into(),
1832 columns: vec!["userId".into()],
1833 unique: false,
1834 }],
1835 },
1836 ],
1837 };
1838 let manifest = test_manifest();
1839 let plan = plan_from_snapshot(&snapshot, &manifest);
1840 assert!(plan.is_empty());
1841 }
1842
1843 #[test]
1844 fn plan_detects_missing_column_in_snapshot() {
1845 let snapshot = crate::SchemaSnapshot {
1846 tables: vec![
1847 crate::TableSnapshot {
1848 name: "User".into(),
1849 columns: vec![
1850 crate::ColumnSnapshot {
1851 name: "id".into(),
1852 column_type: "TEXT".into(),
1853 notnull: true,
1854 primary_key: true,
1855 },
1856 crate::ColumnSnapshot {
1857 name: "email".into(),
1858 column_type: "TEXT".into(),
1859 notnull: true,
1860 primary_key: false,
1861 },
1862 ],
1864 indexes: vec![],
1865 },
1866 crate::TableSnapshot {
1867 name: "Todo".into(),
1868 columns: vec![
1869 crate::ColumnSnapshot {
1870 name: "id".into(),
1871 column_type: "TEXT".into(),
1872 notnull: true,
1873 primary_key: true,
1874 },
1875 crate::ColumnSnapshot {
1876 name: "title".into(),
1877 column_type: "TEXT".into(),
1878 notnull: true,
1879 primary_key: false,
1880 },
1881 crate::ColumnSnapshot {
1882 name: "done".into(),
1883 column_type: "BOOLEAN".into(),
1884 notnull: true,
1885 primary_key: false,
1886 },
1887 crate::ColumnSnapshot {
1888 name: "userId".into(),
1889 column_type: "TEXT".into(),
1890 notnull: true,
1891 primary_key: false,
1892 },
1893 crate::ColumnSnapshot {
1894 name: "createdAt".into(),
1895 column_type: "TIMESTAMPTZ".into(),
1896 notnull: true,
1897 primary_key: false,
1898 },
1899 ],
1900 indexes: vec![crate::IndexSnapshot {
1901 name: "Todo_by_user".into(),
1902 columns: vec!["userId".into()],
1903 unique: false,
1904 }],
1905 },
1906 ],
1907 };
1908 let manifest = test_manifest();
1909 let plan = plan_from_snapshot(&snapshot, &manifest);
1910
1911 let add_fields: Vec<_> = plan
1912 .operations
1913 .iter()
1914 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1915 .collect();
1916 assert_eq!(add_fields.len(), 2); }
1918
1919 #[test]
1922 fn json_value_to_string_handles_all_types() {
1923 assert_eq!(
1924 json_value_to_string(&serde_json::Value::String("hello".into())),
1925 "hello"
1926 );
1927 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1928 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1929 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1930 assert_eq!(
1931 json_value_to_string(&serde_json::Value::Bool(false)),
1932 "false"
1933 );
1934 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1935 assert_eq!(
1937 json_value_to_string(&serde_json::json!([1, 2, 3])),
1938 "[1,2,3]"
1939 );
1940 assert_eq!(
1941 json_value_to_string(&serde_json::json!({"a": 1})),
1942 "{\"a\":1}"
1943 );
1944 }
1945
1946 #[test]
1947 fn generate_id_returns_hex_string() {
1948 let id = generate_id();
1949 assert!(!id.is_empty());
1950 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1952 }
1953
1954 #[test]
1955 fn generate_id_is_unique_across_calls() {
1956 let id1 = generate_id();
1957 let id2 = generate_id();
1958 assert_ne!(id1, id2);
1959 }
1960
1961 #[test]
1962 fn generate_id_is_lex_sortable() {
1963 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1966 let sorted = {
1967 let mut s = ids.clone();
1968 s.sort();
1969 s
1970 };
1971 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1972 let len0 = ids[0].len();
1975 assert!(ids.iter().all(|id| id.len() == len0));
1976 ids.dedup();
1977 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1978 }
1979
1980 #[test]
1981 fn build_insert_sql_simple() {
1982 let data = serde_json::json!({
1983 "email": "alice@example.com",
1984 "displayName": "Alice"
1985 });
1986 let (sql, values) = build_insert_sql("User", &data).unwrap();
1987
1988 assert!(sql.starts_with("INSERT INTO \"User\""));
1989 assert!(sql.contains("id"));
1990 assert!(sql.contains("$1"));
1991 assert!(sql.contains("$2"));
1992 assert!(sql.contains("$3"));
1993 match &values[0] {
1995 JsonParam::Text(s) => assert!(!s.is_empty()),
1996 other => panic!("expected Text id param, got {other:?}"),
1997 }
1998 assert_eq!(values.len(), 3); }
2000
2001 #[test]
2002 fn build_insert_sql_preserves_json_types() {
2003 let data = serde_json::json!({
2004 "n": 42,
2005 "f": 1.5,
2006 "b": true,
2007 "s": "hi",
2008 "z": null,
2009 });
2010 let (_sql, values) = build_insert_sql("T", &data).unwrap();
2011 let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2013 assert!(matches!(kinds[0], JsonParam::Bool(true)));
2014 assert!(matches!(kinds[1], JsonParam::Float(_)));
2015 assert!(matches!(kinds[2], JsonParam::Int(42)));
2016 assert!(matches!(kinds[3], JsonParam::Text(_)));
2017 assert!(matches!(kinds[4], JsonParam::Null));
2018 }
2019
2020 #[test]
2021 fn build_insert_sql_quotes_column_names() {
2022 let data = serde_json::json!({"createdAt": "2026-01-01"});
2023 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2024 assert!(sql.contains("\"createdAt\""));
2025 assert!(sql.contains("\"Todo\""));
2026 }
2027
2028 #[test]
2029 fn build_insert_sql_rejects_non_object() {
2030 let data = serde_json::json!("not an object");
2031 let result = build_insert_sql("User", &data);
2032 assert!(result.is_err());
2033 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2034 }
2035
2036 #[test]
2037 fn build_update_sql_simple() {
2038 let data = serde_json::json!({
2039 "displayName": "Bob",
2040 "email": "bob@example.com"
2041 });
2042 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2043
2044 assert!(sql.starts_with("UPDATE \"User\" SET"));
2045 assert!(sql.contains("WHERE id = $1"));
2046 assert!(sql.contains("$2"));
2047 assert!(sql.contains("$3"));
2048 match &values[0] {
2049 JsonParam::Text(s) => assert_eq!(s, "abc123"),
2050 other => panic!("expected Text id param, got {other:?}"),
2051 }
2052 assert_eq!(values.len(), 3); }
2054
2055 #[test]
2056 fn build_update_sql_quotes_column_names() {
2057 let data = serde_json::json!({"displayName": "Carol"});
2058 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2059 assert!(sql.contains("\"displayName\" = $2"));
2060 }
2061
2062 #[test]
2063 fn build_update_sql_rejects_non_object() {
2064 let data = serde_json::json!(42);
2065 let result = build_update_sql("User", "id1", &data);
2066 assert!(result.is_err());
2067 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2068 }
2069
2070 #[test]
2071 fn build_update_sql_rejects_empty_object() {
2072 let data = serde_json::json!({});
2073 let err = build_update_sql("User", "id1", &data).unwrap_err();
2074 assert_eq!(err.code, "PG_INVALID_DATA");
2075 assert!(err.message.contains("at least one field"));
2076 }
2077}