1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48 for field in fields {
49 let col_type = pg_column_type(&field.field_type);
50 let not_null = if field.optional { "" } else { " NOT NULL" };
51 let unique = if field.unique { " UNIQUE" } else { "" };
52 columns.push(format!(
53 "{} {}{}{}",
54 quote_ident(&field.name),
55 col_type,
56 not_null,
57 unique
58 ));
59 }
60
61 format!(
62 "CREATE TABLE IF NOT EXISTS {} ({})",
63 quote_ident(entity_name),
64 columns.join(", ")
65 )
66}
67
68pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72 let col_type = pg_column_type(&field.field_type);
73 let unique = if field.unique { " UNIQUE" } else { "" };
74 format!(
75 "ALTER TABLE {} ADD COLUMN {} {}{}",
76 quote_ident(entity_name),
77 quote_ident(&field.name),
78 col_type,
79 unique
80 )
81}
82
83pub fn create_index_sql(
85 entity_name: &str,
86 index_name: &str,
87 fields: &[String],
88 unique: bool,
89) -> String {
90 let unique_str = if unique { "UNIQUE " } else { "" };
91 let full_index_name = format!("{}_{}", entity_name, index_name);
92 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93 format!(
94 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95 unique_str,
96 quote_ident(&full_index_name),
97 quote_ident(entity_name),
98 quoted_fields.join(", ")
99 )
100}
101
102pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112 let mut operations = Vec::new();
114
115 for entity in &target.entities {
116 let fields: Vec<FieldSpec> = entity
117 .fields
118 .iter()
119 .map(|f| FieldSpec {
120 name: f.name.clone(),
121 field_type: f.field_type.clone(),
122 optional: f.optional,
123 unique: f.unique,
124 })
125 .collect();
126
127 operations.push(SchemaOperation::CreateEntity {
128 name: entity.name.clone(),
129 fields,
130 });
131
132 for index in &entity.indexes {
133 operations.push(SchemaOperation::AddIndex {
134 entity: entity.name.clone(),
135 name: index.name.clone(),
136 fields: index.fields.clone(),
137 unique: index.unique,
138 });
139 }
140 }
141
142 if operations.is_empty() {
143 operations.push(SchemaOperation::Noop);
144 }
145
146 Ok(SchemaPlan { operations })
147 }
148
149 }
151
152pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155 let mut statements = Vec::new();
156
157 for op in &plan.operations {
158 match op {
159 SchemaOperation::CreateEntity { name, fields } => {
160 statements.push(create_table_sql(name, fields));
161 }
162 SchemaOperation::AddField { entity, field } => {
163 statements.push(add_column_sql(entity, field));
164 }
165 SchemaOperation::AlterField {
166 entity,
167 previous,
168 target,
169 } => {
170 if previous.optional && !target.optional {
179 statements.push(format!(
180 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
181 quote_ident(entity),
182 quote_ident(&target.name)
183 ));
184 } else if !previous.optional && target.optional {
185 statements.push(format!(
186 "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
187 quote_ident(entity),
188 quote_ident(&target.name)
189 ));
190 }
191 }
197 SchemaOperation::AddIndex {
198 entity,
199 name,
200 fields,
201 unique,
202 } => {
203 statements.push(create_index_sql(entity, name, fields, *unique));
204 }
205 SchemaOperation::Noop => {}
206 other => {
207 return Err(StorageError {
208 code: "PG_OP_UNSUPPORTED".into(),
209 message: format!("Operation not supported by Postgres adapter: {other:?}"),
210 });
211 }
212 }
213 }
214
215 Ok(statements)
216}
217
218pub const INTROSPECT_TABLES_SQL: &str = "\
227 SELECT table_name \
228 FROM information_schema.tables \
229 WHERE table_schema = 'public' \
230 AND table_type = 'BASE TABLE' \
231 AND table_name NOT LIKE '_pylon_%' \
232 ORDER BY table_name";
233
234pub const INTROSPECT_COLUMNS_SQL: &str = "\
237 SELECT column_name, data_type, is_nullable, \
238 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
239 JOIN information_schema.key_column_usage kcu \
240 ON tc.constraint_name = kcu.constraint_name \
241 WHERE tc.table_name = c.table_name \
242 AND kcu.column_name = c.column_name \
243 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
244 FROM information_schema.columns c \
245 WHERE table_schema = 'public' AND table_name = $1 \
246 ORDER BY ordinal_position";
247
248pub const INTROSPECT_INDEXES_SQL: &str = "\
251 SELECT i.relname as index_name, \
252 ix.indisunique as is_unique, \
253 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
254 FROM pg_index ix \
255 JOIN pg_class t ON t.oid = ix.indrelid \
256 JOIN pg_class i ON i.oid = ix.indexrelid \
257 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
258 JOIN pg_namespace n ON n.oid = t.relnamespace \
259 WHERE n.nspname = 'public' \
260 AND t.relname = $1 \
261 AND NOT ix.indisprimary \
262 GROUP BY i.relname, ix.indisunique \
263 ORDER BY i.relname";
264
265pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
268 crate::plan_from_snapshot(snapshot, target)
269}
270
271pub fn generate_id() -> String {
284 use std::sync::atomic::{AtomicU32, Ordering};
285 use std::time::{SystemTime, UNIX_EPOCH};
286 static COUNTER: AtomicU32 = AtomicU32::new(0);
287 let ts = SystemTime::now()
288 .duration_since(UNIX_EPOCH)
289 .unwrap_or_default()
290 .as_nanos();
291 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
292 format!("{ts:032x}{seq:08x}")
293}
294
295pub fn json_value_to_string(val: &serde_json::Value) -> String {
304 match val {
305 serde_json::Value::String(s) => s.clone(),
306 serde_json::Value::Number(n) => n.to_string(),
307 serde_json::Value::Bool(b) => b.to_string(),
308 serde_json::Value::Null => String::new(),
309 other => other.to_string(),
310 }
311}
312
313#[derive(Debug, Clone, PartialEq)]
329pub enum JsonParam {
330 Null,
331 Text(String),
332 Int(i64),
333 Float(f64),
334 Bool(bool),
335}
336
337impl JsonParam {
338 pub fn from_json(val: &serde_json::Value) -> Self {
342 match val {
343 serde_json::Value::Null => JsonParam::Null,
344 serde_json::Value::String(s) => JsonParam::Text(s.clone()),
345 serde_json::Value::Bool(b) => JsonParam::Bool(*b),
346 serde_json::Value::Number(n) => {
347 if let Some(i) = n.as_i64() {
348 JsonParam::Int(i)
349 } else if let Some(f) = n.as_f64() {
350 JsonParam::Float(f)
351 } else {
352 JsonParam::Text(n.to_string())
353 }
354 }
355 other => JsonParam::Text(other.to_string()),
356 }
357 }
358}
359
360#[cfg(feature = "postgres-live")]
361impl postgres::types::ToSql for JsonParam {
362 fn to_sql(
363 &self,
364 ty: &postgres::types::Type,
365 out: &mut bytes::BytesMut,
366 ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
367 use postgres::types::Type;
368
369 if matches!(self, JsonParam::Null) {
373 return Ok(postgres::types::IsNull::Yes);
374 }
375
376 match (self, ty) {
383 (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
384
385 (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
386 (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
387 (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
388 (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
389 (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
390
391 (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
392 (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
393 (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
394 (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
395
396 (JsonParam::Text(s), &Type::TEXT)
397 | (JsonParam::Text(s), &Type::VARCHAR)
398 | (JsonParam::Text(s), &Type::BPCHAR)
399 | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
400 (JsonParam::Text(s), &Type::TIMESTAMPTZ) | (JsonParam::Text(s), &Type::TIMESTAMP) => {
401 s.as_str().to_sql(ty, out)
407 }
408
409 (other, _) => {
414 let s = match other {
415 JsonParam::Bool(b) => b.to_string(),
416 JsonParam::Int(n) => n.to_string(),
417 JsonParam::Float(f) => f.to_string(),
418 JsonParam::Text(s) => s.clone(),
419 JsonParam::Null => unreachable!(),
420 };
421 s.to_sql(ty, out)
422 }
423 }
424 }
425
426 fn accepts(_ty: &postgres::types::Type) -> bool {
427 true
431 }
432
433 postgres::types::to_sql_checked!();
434}
435
436pub fn build_insert_sql(
444 entity: &str,
445 data: &serde_json::Value,
446) -> Result<(String, Vec<JsonParam>), StorageError> {
447 let id = generate_id();
448 let obj = data.as_object().ok_or_else(|| StorageError {
449 code: "PG_INVALID_DATA".into(),
450 message: "Insert data must be a JSON object".into(),
451 })?;
452
453 let mut col_names = vec!["id".to_string()];
454 let mut placeholders = vec!["$1".to_string()];
455 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
456
457 for (i, (key, val)) in obj.iter().enumerate() {
458 col_names.push(quote_ident(key));
459 placeholders.push(format!("${}", i + 2));
460 values.push(JsonParam::from_json(val));
461 }
462
463 let sql = format!(
464 "INSERT INTO {} ({}) VALUES ({})",
465 quote_ident(entity),
466 col_names.join(", "),
467 placeholders.join(", ")
468 );
469
470 Ok((sql, values))
471}
472
473pub fn build_update_sql(
476 entity: &str,
477 id: &str,
478 data: &serde_json::Value,
479) -> Result<(String, Vec<JsonParam>), StorageError> {
480 let obj = data.as_object().ok_or_else(|| StorageError {
481 code: "PG_INVALID_DATA".into(),
482 message: "Update data must be a JSON object".into(),
483 })?;
484
485 if obj.is_empty() {
486 return Err(StorageError {
487 code: "PG_INVALID_DATA".into(),
488 message: "Update data must contain at least one field".into(),
489 });
490 }
491
492 let mut set_clauses = Vec::new();
493 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
494
495 for (i, (key, val)) in obj.iter().enumerate() {
496 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
497 values.push(JsonParam::from_json(val));
498 }
499
500 let sql = format!(
501 "UPDATE {} SET {} WHERE id = $1",
502 quote_ident(entity),
503 set_clauses.join(", ")
504 );
505
506 Ok((sql, values))
507}
508
509#[cfg(feature = "postgres-live")]
513fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
514 values
515 .iter()
516 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
517 .collect()
518}
519
520#[cfg(feature = "postgres-live")]
525pub mod live {
526 use super::*;
527 use crate::{
528 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
529 };
530
531 pub struct LivePostgresAdapter {
533 client: postgres::Client,
534 }
535
536 impl LivePostgresAdapter {
537 pub fn connect(url: &str) -> Result<Self, StorageError> {
539 let client =
540 postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
541 code: "PG_CONNECT_FAILED".into(),
542 message: format!("Failed to connect to Postgres: {e}"),
543 })?;
544 Ok(Self { client })
545 }
546
547 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
549 let table_rows = self
550 .client
551 .query(INTROSPECT_TABLES_SQL, &[])
552 .map_err(pg_err)?;
553
554 let mut tables = Vec::new();
555 for row in &table_rows {
556 let table_name: String = row.get(0);
557 let columns = self.read_columns(&table_name)?;
558 let indexes = self.read_indexes(&table_name)?;
559 tables.push(TableSnapshot {
560 name: table_name,
561 columns,
562 indexes,
563 });
564 }
565
566 Ok(SchemaSnapshot { tables })
567 }
568
569 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
570 let rows = self
571 .client
572 .query(INTROSPECT_COLUMNS_SQL, &[&table])
573 .map_err(pg_err)?;
574
575 let mut columns = Vec::new();
576 for row in &rows {
577 let name: String = row.get(0);
578 let data_type: String = row.get(1);
579 let is_nullable: String = row.get(2);
580 let is_pk: i64 = row.get(3);
581 columns.push(ColumnSnapshot {
582 name,
583 column_type: data_type,
584 notnull: is_nullable == "NO",
585 primary_key: is_pk > 0,
586 });
587 }
588 Ok(columns)
589 }
590
591 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
592 let rows = self
593 .client
594 .query(INTROSPECT_INDEXES_SQL, &[&table])
595 .map_err(pg_err)?;
596
597 let mut indexes = Vec::new();
598 for row in &rows {
599 let name: String = row.get(0);
600 let unique: bool = row.get(1);
601 let columns: Vec<String> = row.get(2);
602 indexes.push(IndexSnapshot {
603 name,
604 columns,
605 unique,
606 });
607 }
608 Ok(indexes)
609 }
610
611 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
613 let snapshot = self.read_schema()?;
614 Ok(crate::plan_from_snapshot(&snapshot, target))
615 }
616 }
617
618 impl StorageAdapter for LivePostgresAdapter {
619 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
620 Err(StorageError {
621 code: "PG_PLAN_NEEDS_MUTABLE".into(),
622 message: "Use plan_from_live() instead for live Postgres planning".into(),
623 })
624 }
625
626 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
627 Err(StorageError {
628 code: "PG_APPLY_USE_METHOD".into(),
629 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
630 })
631 }
632 }
633
634 impl LivePostgresAdapter {
635 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
637 let statements = plan_to_sql(plan)?;
638 for sql in &statements {
639 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
640 }
641 Ok(())
642 }
643
644 pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
650 self.client.execute(sql, &[]).map_err(pg_err)
651 }
652
653 pub fn insert(
655 &mut self,
656 entity: &str,
657 data: &serde_json::Value,
658 ) -> Result<String, StorageError> {
659 let (sql, values) = build_insert_sql(entity, data)?;
660 let id = match &values[0] {
663 JsonParam::Text(s) => s.clone(),
664 _ => {
665 return Err(StorageError {
666 code: "PG_INTERNAL".into(),
667 message: "build_insert_sql produced non-text id param".into(),
668 });
669 }
670 };
671 let params = as_pg_params(&values);
672 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
673 Ok(id)
674 }
675
676 pub fn get_by_id(
678 &mut self,
679 entity: &str,
680 id: &str,
681 ) -> Result<Option<serde_json::Value>, StorageError> {
682 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
683 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
684
685 match rows.first() {
686 Some(row) => Ok(Some(row_to_json(row))),
687 None => Ok(None),
688 }
689 }
690
691 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
693 let sql = format!("SELECT * FROM {}", quote_ident(entity));
694 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
695
696 Ok(rows.iter().map(row_to_json).collect())
697 }
698
699 pub fn list_after(
703 &mut self,
704 entity: &str,
705 after: Option<&str>,
706 limit: usize,
707 ) -> Result<Vec<serde_json::Value>, StorageError> {
708 let capped: i64 = limit.min(10_000) as i64;
711 let sql = match after {
712 Some(_) => format!(
713 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
714 quote_ident(entity)
715 ),
716 None => format!(
717 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
718 quote_ident(entity)
719 ),
720 };
721 let rows = match after {
722 Some(cursor) => self
723 .client
724 .query(sql.as_str(), &[&cursor, &capped])
725 .map_err(pg_err)?,
726 None => self
727 .client
728 .query(sql.as_str(), &[&capped])
729 .map_err(pg_err)?,
730 };
731 Ok(rows.iter().map(row_to_json).collect())
732 }
733
734 pub fn update(
736 &mut self,
737 entity: &str,
738 id: &str,
739 data: &serde_json::Value,
740 ) -> Result<bool, StorageError> {
741 let (sql, values) = build_update_sql(entity, id, data)?;
742 let params = as_pg_params(&values);
743 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
744 Ok(rows_affected > 0)
745 }
746
747 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
749 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
750 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
751 Ok(rows_affected > 0)
752 }
753
754 pub fn lookup_field(
758 &mut self,
759 entity: &str,
760 field: &str,
761 value: &str,
762 ) -> Result<Option<serde_json::Value>, StorageError> {
763 let sql = format!(
764 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
765 quote_ident(entity),
766 quote_ident(field),
767 );
768 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
769 Ok(rows.first().map(row_to_json))
770 }
771
772 pub fn query_filtered(
795 &mut self,
796 entity: &str,
797 filter: &serde_json::Value,
798 valid_columns: &[String],
799 ) -> Result<Vec<serde_json::Value>, StorageError> {
800 let empty = serde_json::Map::new();
801 let obj = filter.as_object().unwrap_or(&empty);
802
803 let validate = |col: &str| -> Result<(), StorageError> {
804 if col == "id" || valid_columns.iter().any(|c| c == col) {
805 Ok(())
806 } else {
807 Err(StorageError {
808 code: "UNKNOWN_COLUMN".into(),
809 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
810 })
811 }
812 };
813
814 let mut where_clauses: Vec<String> = Vec::new();
815 let mut order_clause = String::new();
816 let mut limit_clause = String::new();
817 let mut offset_clause = String::new();
818 let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
822
823 for (key, val) in obj {
824 match key.as_str() {
825 "$search" => {
826 return Err(StorageError {
833 code: "SEARCH_NOT_SUPPORTED".into(),
834 message: "$search is SQLite-FTS5-only; use a Postgres tsvector column with the storage adapter's full-text path"
835 .into(),
836 });
837 }
838 "$order" => {
839 if let Some(ord) = val.as_object() {
840 let mut parts = Vec::new();
841 for (col, dir) in ord {
842 validate(col)?;
843 let d = match dir.as_str().unwrap_or("asc") {
844 "desc" | "DESC" => "DESC",
845 _ => "ASC",
846 };
847 parts.push(format!("{} {d}", quote_ident(col)));
848 }
849 if !parts.is_empty() {
850 order_clause = format!(" ORDER BY {}", parts.join(", "));
851 }
852 }
853 }
854 "$limit" => {
855 if let Some(n) = val.as_u64() {
856 limit_clause = format!(" LIMIT {}", n);
857 }
858 }
859 "$offset" => {
860 if let Some(n) = val.as_u64() {
861 offset_clause = format!(" OFFSET {}", n);
862 }
863 }
864 field => {
865 validate(field)?;
866 match val {
867 serde_json::Value::Object(ops) => {
868 for (op, v) in ops {
869 match op.as_str() {
870 "$not" => planned.push((
871 field.into(),
872 "!=".into(),
873 value_to_pg(v),
874 )),
875 "$gt" => {
876 planned.push((field.into(), ">".into(), value_to_pg(v)))
877 }
878 "$gte" => planned.push((
879 field.into(),
880 ">=".into(),
881 value_to_pg(v),
882 )),
883 "$lt" => {
884 planned.push((field.into(), "<".into(), value_to_pg(v)))
885 }
886 "$lte" => planned.push((
887 field.into(),
888 "<=".into(),
889 value_to_pg(v),
890 )),
891 "$like" => planned.push((
892 field.into(),
893 "LIKE".into(),
894 value_to_pg(v),
895 )),
896 "$in" => {
897 if let Some(arr) = v.as_array() {
898 let placeholders: Vec<String> = (0..arr.len())
899 .map(|i| format!("${}", planned.len() + 1 + i))
900 .collect();
901 where_clauses.push(format!(
902 "{} IN ({})",
903 quote_ident(field),
904 placeholders.join(", "),
905 ));
906 for x in arr {
907 planned.push((
908 format!("__inline_{}", planned.len()),
909 "__INLINE__".into(),
910 value_to_pg(x),
911 ));
912 }
913 }
914 }
915 _ => {}
916 }
917 }
918 }
919 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
920 }
921 }
922 }
923 }
924
925 let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
927 for (field, op, v) in &planned {
928 if op == "__INLINE__" {
929 params.push(v.clone());
931 } else {
932 let placeholder = format!("${}", params.len() + 1);
933 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
934 params.push(v.clone());
935 }
936 }
937
938 let where_sql = if where_clauses.is_empty() {
939 String::new()
940 } else {
941 format!(" WHERE {}", where_clauses.join(" AND "))
942 };
943 let sql = format!(
944 "SELECT * FROM {}{}{}{}{}",
945 quote_ident(entity),
946 where_sql,
947 order_clause,
948 limit_clause,
949 offset_clause,
950 );
951
952 let pg_params = as_pg_params(¶ms);
953 let rows = self
954 .client
955 .query(sql.as_str(), &pg_params)
956 .map_err(pg_err)?;
957 Ok(rows.iter().map(row_to_json).collect())
958 }
959
960 pub fn aggregate(
979 &mut self,
980 entity: &str,
981 spec: &serde_json::Value,
982 valid_columns: &[String],
983 ) -> Result<serde_json::Value, StorageError> {
984 let obj = spec.as_object().ok_or_else(|| StorageError {
985 code: "INVALID_QUERY".into(),
986 message: "aggregate spec must be a JSON object".into(),
987 })?;
988
989 let validate = |col: &str| -> Result<(), StorageError> {
990 if col == "id" || valid_columns.iter().any(|c| c == col) {
991 Ok(())
992 } else {
993 Err(StorageError {
994 code: "UNKNOWN_COLUMN".into(),
995 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
996 })
997 }
998 };
999
1000 let mut select_parts: Vec<String> = Vec::new();
1001 let mut result_fields: Vec<String> = Vec::new();
1002
1003 if let Some(count) = obj.get("count") {
1004 match count {
1005 serde_json::Value::String(s) if s == "*" => {
1006 select_parts.push("COUNT(*) AS count".into());
1007 result_fields.push("count".into());
1008 }
1009 serde_json::Value::String(field) => {
1010 validate(field)?;
1011 let alias = format!("count_{field}");
1012 select_parts.push(format!(
1013 "COUNT({}) AS {}",
1014 quote_ident(field),
1015 quote_ident(&alias),
1016 ));
1017 result_fields.push(alias);
1018 }
1019 _ => {}
1020 }
1021 }
1022
1023 for (fn_name, prefix) in [
1024 ("sum", "sum_"),
1025 ("avg", "avg_"),
1026 ("min", "min_"),
1027 ("max", "max_"),
1028 ] {
1029 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1030 for field in fields {
1031 if let Some(f) = field.as_str() {
1032 validate(f)?;
1033 let alias = format!("{prefix}{f}");
1034 let sql_fn = fn_name.to_uppercase();
1035 select_parts.push(format!(
1036 "{}({}) AS {}",
1037 sql_fn,
1038 quote_ident(f),
1039 quote_ident(&alias),
1040 ));
1041 result_fields.push(alias);
1042 }
1043 }
1044 }
1045 }
1046
1047 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1048 for field in fields {
1049 if let Some(f) = field.as_str() {
1050 validate(f)?;
1051 let alias = format!("count_distinct_{f}");
1052 select_parts.push(format!(
1053 "COUNT(DISTINCT {}) AS {}",
1054 quote_ident(f),
1055 quote_ident(&alias),
1056 ));
1057 result_fields.push(alias);
1058 }
1059 }
1060 }
1061
1062 let mut group_by: Vec<String> = Vec::new();
1067 let mut group_select: Vec<String> = Vec::new();
1068 let mut group_field_names: Vec<String> = Vec::new();
1069 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1070 for g in groups {
1071 if let Some(f) = g.as_str() {
1072 validate(f)?;
1073 let q = quote_ident(f);
1074 group_by.push(q.clone());
1075 group_select.push(q);
1076 group_field_names.push(f.to_string());
1077 } else if let Some(spec) = g.as_object() {
1078 let field =
1079 spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1080 StorageError {
1081 code: "INVALID_QUERY".into(),
1082 message: "groupBy object spec requires `field`".into(),
1083 }
1084 })?;
1085 validate(field)?;
1086 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1087 let trunc_unit = match bucket {
1088 "hour" | "day" | "week" | "month" | "year" => bucket,
1089 _ => {
1090 return Err(StorageError {
1091 code: "INVALID_QUERY".into(),
1092 message: format!(
1093 "bucket must be one of hour/day/week/month/year, got {bucket}"
1094 ),
1095 });
1096 }
1097 };
1098 let alias = format!("{field}_{bucket}");
1099 let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1100 group_by.push(expr.clone());
1101 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1102 group_field_names.push(alias);
1103 }
1104 }
1105 }
1106
1107 let mut full_select = group_select.clone();
1108 full_select.extend(select_parts.iter().cloned());
1109 if full_select.is_empty() {
1110 return Err(StorageError {
1111 code: "INVALID_QUERY".into(),
1112 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1113 });
1114 }
1115
1116 let mut where_clauses: Vec<String> = Vec::new();
1117 let mut params: Vec<JsonParam> = Vec::new();
1118 if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1119 for (k, v) in w {
1120 validate(k)?;
1121 let placeholder = format!("${}", params.len() + 1);
1122 where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1123 params.push(value_to_pg(v));
1124 }
1125 }
1126 let where_sql = if where_clauses.is_empty() {
1127 String::new()
1128 } else {
1129 format!(" WHERE {}", where_clauses.join(" AND "))
1130 };
1131 let group_sql = if group_by.is_empty() {
1132 String::new()
1133 } else {
1134 format!(" GROUP BY {}", group_by.join(", "))
1135 };
1136
1137 let sql = format!(
1138 "SELECT {} FROM {}{}{}",
1139 full_select.join(", "),
1140 quote_ident(entity),
1141 where_sql,
1142 group_sql,
1143 );
1144
1145 let pg_params = as_pg_params(¶ms);
1146 let rows = self
1147 .client
1148 .query(sql.as_str(), &pg_params)
1149 .map_err(pg_err)?;
1150
1151 let column_names: Vec<String> = group_field_names
1152 .iter()
1153 .chain(result_fields.iter())
1154 .cloned()
1155 .collect();
1156
1157 let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1158 for row in &rows {
1159 let row_json = row_to_json(row);
1160 if let serde_json::Value::Object(map) = &row_json {
1161 let mut filtered = serde_json::Map::new();
1162 for name in &column_names {
1163 if let Some(v) = map.get(name) {
1164 filtered.insert(name.clone(), v.clone());
1165 }
1166 }
1167 out.push(serde_json::Value::Object(filtered));
1168 } else {
1169 out.push(row_json);
1170 }
1171 }
1172 Ok(serde_json::json!({ "rows": out }))
1173 }
1174 }
1175
1176 pub enum TxOp<'a> {
1178 Insert {
1179 entity: &'a str,
1180 data: &'a serde_json::Value,
1181 },
1182 Update {
1183 entity: &'a str,
1184 id: &'a str,
1185 data: &'a serde_json::Value,
1186 },
1187 Delete {
1188 entity: &'a str,
1189 id: &'a str,
1190 },
1191 }
1192
1193 #[derive(Debug, Clone)]
1195 pub enum TxResult {
1196 Inserted(String),
1197 Updated(bool),
1198 Deleted(bool),
1199 }
1200
1201 impl LivePostgresAdapter {
1202 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1207 let mut tx = self.client.transaction().map_err(pg_err)?;
1208 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1209
1210 for op in ops {
1211 match op {
1212 TxOp::Insert { entity, data } => {
1213 let (sql, values) = build_insert_sql(entity, data)?;
1214 let id = match &values[0] {
1215 JsonParam::Text(s) => s.clone(),
1216 _ => {
1217 return Err(StorageError {
1218 code: "PG_INTERNAL".into(),
1219 message: "build_insert_sql produced non-text id param".into(),
1220 });
1221 }
1222 };
1223 let params = as_pg_params(&values);
1224 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1225 results.push(TxResult::Inserted(id));
1226 }
1227 TxOp::Update { entity, id, data } => {
1228 let (sql, values) = build_update_sql(entity, id, data)?;
1229 let params = as_pg_params(&values);
1230 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1231 results.push(TxResult::Updated(n > 0));
1232 }
1233 TxOp::Delete { entity, id } => {
1234 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1235 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1236 results.push(TxResult::Deleted(n > 0));
1237 }
1238 }
1239 }
1240
1241 tx.commit().map_err(pg_err)?;
1242 Ok(results)
1243 }
1244 }
1245
1246 fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1252 JsonParam::from_json(v)
1253 }
1254
1255 fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1256 use postgres::types::Type;
1257 let mut obj = serde_json::Map::new();
1258 for (i, col) in row.columns().iter().enumerate() {
1259 let name = col.name().to_string();
1260
1261 let value: serde_json::Value = match *col.type_() {
1272 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1273 .flatten()
1274 .map(serde_json::Value::Bool)
1275 .unwrap_or(serde_json::Value::Null),
1276 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1277 .flatten()
1278 .map(|v| serde_json::Value::Number(v.into()))
1279 .unwrap_or(serde_json::Value::Null),
1280 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1281 .flatten()
1282 .map(|v| serde_json::Value::Number(v.into()))
1283 .unwrap_or(serde_json::Value::Null),
1284 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1285 .flatten()
1286 .map(|v| serde_json::Value::Number(v.into()))
1287 .unwrap_or(serde_json::Value::Null),
1288 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1289 .flatten()
1290 .and_then(|v| serde_json::Number::from_f64(v as f64))
1291 .map(serde_json::Value::Number)
1292 .unwrap_or(serde_json::Value::Null),
1293 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1294 .flatten()
1295 .and_then(serde_json::Number::from_f64)
1296 .map(serde_json::Value::Number)
1297 .unwrap_or(serde_json::Value::Null),
1298 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1299 .flatten()
1300 .unwrap_or(serde_json::Value::Null),
1301 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1302 .flatten()
1303 .map(|b| serde_json::Value::String(b64(&b)))
1304 .unwrap_or(serde_json::Value::Null),
1305 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1306 try_get_or_null::<Option<String>>(row, i)
1307 .flatten()
1308 .map(serde_json::Value::String)
1309 .unwrap_or(serde_json::Value::Null)
1310 }
1311 _ => {
1312 match row.try_get::<_, Option<String>>(i) {
1317 Ok(Some(s)) => serde_json::Value::String(s),
1318 Ok(None) => serde_json::Value::Null,
1319 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1320 Ok(Some(bytes)) => serde_json::Value::String(
1321 String::from_utf8_lossy(&bytes).into_owned(),
1322 ),
1323 _ => serde_json::Value::Null,
1324 },
1325 }
1326 }
1327 };
1328 obj.insert(name, value);
1329 }
1330 serde_json::Value::Object(obj)
1331 }
1332
1333 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1334 where
1335 T: postgres::types::FromSql<'a>,
1336 {
1337 match row.try_get::<_, T>(i) {
1338 Ok(v) => Some(v),
1339 Err(e) => {
1340 tracing::warn!(
1341 "[postgres] decode failed for column {} ({}): {e}",
1342 i,
1343 row.columns()[i].name()
1344 );
1345 None
1346 }
1347 }
1348 }
1349
1350 fn b64(bytes: &[u8]) -> String {
1353 const TABLE: &[u8; 64] =
1354 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1355 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1356 let chunks = bytes.chunks(3);
1357 for chunk in chunks {
1358 let b = [
1359 chunk.first().copied().unwrap_or(0),
1360 chunk.get(1).copied().unwrap_or(0),
1361 chunk.get(2).copied().unwrap_or(0),
1362 ];
1363 out.push(TABLE[(b[0] >> 2) as usize] as char);
1364 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1365 if chunk.len() > 1 {
1366 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1367 } else {
1368 out.push('=');
1369 }
1370 if chunk.len() > 2 {
1371 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1372 } else {
1373 out.push('=');
1374 }
1375 }
1376 out
1377 }
1378
1379 fn pg_err(e: postgres::Error) -> StorageError {
1380 use std::error::Error;
1386 let mut detail = format!("{e}");
1387 let mut src: Option<&dyn Error> = e.source();
1388 while let Some(s) = src {
1389 detail.push_str(": ");
1390 detail.push_str(&format!("{s}"));
1391 src = s.source();
1392 }
1393 StorageError {
1394 code: "PG_QUERY_FAILED".into(),
1395 message: format!("Postgres query failed: {detail}"),
1396 }
1397 }
1398}
1399
1400#[cfg(test)]
1405mod tests {
1406 use super::*;
1407
1408 fn test_manifest() -> AppManifest {
1412 use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1413 let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1414 name: name.into(),
1415 field_type: ty.into(),
1416 optional: opt,
1417 unique: uniq,
1418 crdt: None,
1419 };
1420 AppManifest {
1421 manifest_version: 1,
1422 name: "test".into(),
1423 version: "0.0.0".into(),
1424 entities: vec![
1425 ManifestEntity {
1426 name: "User".into(),
1427 fields: vec![
1428 f("email", "string", false, true),
1429 f("displayName", "string", false, false),
1430 f("createdAt", "datetime", false, false),
1431 ],
1432 indexes: vec![],
1433 relations: vec![],
1434 search: None,
1435 crdt: true,
1436 },
1437 ManifestEntity {
1438 name: "Todo".into(),
1439 fields: vec![
1440 f("title", "string", false, false),
1441 f("done", "bool", false, false),
1442 f("userId", "id(User)", false, false),
1443 f("createdAt", "datetime", false, false),
1444 ],
1445 indexes: vec![ManifestIndex {
1446 name: "by_user".into(),
1447 fields: vec!["userId".into()],
1448 unique: false,
1449 }],
1450 relations: vec![],
1451 search: None,
1452 crdt: true,
1453 },
1454 ],
1455 queries: vec![],
1456 actions: vec![],
1457 policies: vec![],
1458 routes: vec![],
1459 }
1460 }
1461
1462 #[test]
1463 fn pg_type_mapping() {
1464 assert_eq!(pg_column_type("string"), "TEXT");
1465 assert_eq!(pg_column_type("int"), "INTEGER");
1466 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1467 assert_eq!(pg_column_type("bool"), "BOOLEAN");
1468 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1469 assert_eq!(pg_column_type("richtext"), "TEXT");
1470 assert_eq!(pg_column_type("id(User)"), "TEXT");
1471 }
1472
1473 #[test]
1474 fn quote_ident_simple() {
1475 assert_eq!(quote_ident("User"), "\"User\"");
1476 assert_eq!(quote_ident("email"), "\"email\"");
1477 }
1478
1479 #[test]
1480 fn quote_ident_escapes_embedded_double_quotes() {
1481 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1482 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1483 }
1484
1485 #[test]
1486 fn create_table_sql_basic() {
1487 let fields = vec![
1488 FieldSpec {
1489 name: "email".into(),
1490 field_type: "string".into(),
1491 optional: false,
1492 unique: true,
1493 },
1494 FieldSpec {
1495 name: "age".into(),
1496 field_type: "int".into(),
1497 optional: true,
1498 unique: false,
1499 },
1500 ];
1501 let sql = create_table_sql("User", &fields);
1502 assert_eq!(
1503 sql,
1504 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1505 );
1506 }
1507
1508 #[test]
1509 fn create_table_sql_escapes_identifiers() {
1510 let fields = vec![FieldSpec {
1511 name: "col\"x".into(),
1512 field_type: "string".into(),
1513 optional: false,
1514 unique: false,
1515 }];
1516 let sql = create_table_sql("my\"table", &fields);
1517 assert!(sql.contains("\"my\"\"table\""));
1518 assert!(sql.contains("\"col\"\"x\""));
1519 }
1520
1521 #[test]
1522 fn create_index_sql_unique() {
1523 let sql = create_index_sql("User", "by_email", &["email".into()], true);
1524 assert_eq!(
1525 sql,
1526 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1527 );
1528 }
1529
1530 #[test]
1531 fn create_index_sql_non_unique() {
1532 let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1533 assert_eq!(
1534 sql,
1535 "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1536 );
1537 }
1538
1539 #[test]
1540 fn add_column_sql_basic() {
1541 let field = FieldSpec {
1542 name: "bio".into(),
1543 field_type: "string".into(),
1544 optional: true,
1545 unique: false,
1546 };
1547 let sql = add_column_sql("User", &field);
1548 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1549 }
1550
1551 #[test]
1552 fn plan_from_manifest() {
1553 let adapter = PostgresAdapter;
1554 let manifest = test_manifest();
1555 let plan = adapter.plan_schema(&manifest).unwrap();
1556
1557 assert!(plan.operations.iter().any(|op| matches!(
1559 op,
1560 SchemaOperation::CreateEntity { name, .. } if name == "User"
1561 )));
1562 assert!(plan.operations.iter().any(|op| matches!(
1563 op,
1564 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1565 )));
1566 assert!(plan.operations.iter().any(|op| matches!(
1567 op,
1568 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1569 )));
1570 }
1571
1572 #[test]
1573 fn plan_to_sql_produces_statements() {
1574 let adapter = PostgresAdapter;
1575 let manifest = test_manifest();
1576 let plan = adapter.plan_schema(&manifest).unwrap();
1577 let stmts = plan_to_sql(&plan).unwrap();
1578
1579 let create_tables = stmts
1584 .iter()
1585 .filter(|s| s.starts_with("CREATE TABLE"))
1586 .count();
1587 let create_indexes = stmts
1588 .iter()
1589 .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1590 .count();
1591 assert_eq!(create_tables, 2);
1592 assert!(create_indexes >= 1);
1593 assert!(stmts[0].starts_with("CREATE TABLE"));
1594 assert!(stmts[1].starts_with("CREATE TABLE"));
1595 }
1596
1597 #[test]
1598 fn plan_to_sql_rejects_unsupported() {
1599 let plan = SchemaPlan {
1600 operations: vec![SchemaOperation::RemoveEntity {
1601 name: "User".into(),
1602 }],
1603 };
1604 let result = plan_to_sql(&plan);
1605 assert!(result.is_err());
1606 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1607 }
1608
1609 #[test]
1610 fn apply_not_implemented() {
1611 let adapter = PostgresAdapter;
1612 let plan = SchemaPlan {
1613 operations: vec![SchemaOperation::Noop],
1614 };
1615 let result = adapter.apply_schema(&plan);
1616 assert!(result.is_err());
1617 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1618 }
1619
1620 #[test]
1621 fn sql_uses_quoted_identifiers() {
1622 let fields = vec![FieldSpec {
1623 name: "createdAt".into(),
1624 field_type: "datetime".into(),
1625 optional: false,
1626 unique: false,
1627 }];
1628 let sql = create_table_sql("User", &fields);
1629 assert!(sql.contains("\"User\""));
1631 assert!(sql.contains("\"createdAt\""));
1632 assert!(sql.contains("TIMESTAMPTZ"));
1633 }
1634
1635 #[test]
1638 fn introspect_sql_constants_are_valid() {
1639 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1641 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1642 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1643 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1644 }
1645
1646 #[test]
1649 fn plan_from_empty_snapshot_creates_all() {
1650 let snapshot = crate::SchemaSnapshot { tables: vec![] };
1651 let manifest = test_manifest();
1652 let plan = plan_from_snapshot(&snapshot, &manifest);
1653
1654 assert!(plan.operations.iter().any(|op| matches!(
1655 op,
1656 SchemaOperation::CreateEntity { name, .. } if name == "User"
1657 )));
1658 assert!(plan.operations.iter().any(|op| matches!(
1659 op,
1660 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1661 )));
1662 assert!(plan.operations.iter().any(|op| matches!(
1663 op,
1664 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1665 )));
1666 }
1667
1668 #[test]
1669 fn plan_from_full_snapshot_is_noop() {
1670 let snapshot = crate::SchemaSnapshot {
1671 tables: vec![
1672 crate::TableSnapshot {
1673 name: "User".into(),
1674 columns: vec![
1675 crate::ColumnSnapshot {
1676 name: "id".into(),
1677 column_type: "TEXT".into(),
1678 notnull: true,
1679 primary_key: true,
1680 },
1681 crate::ColumnSnapshot {
1682 name: "email".into(),
1683 column_type: "TEXT".into(),
1684 notnull: true,
1685 primary_key: false,
1686 },
1687 crate::ColumnSnapshot {
1688 name: "displayName".into(),
1689 column_type: "TEXT".into(),
1690 notnull: true,
1691 primary_key: false,
1692 },
1693 crate::ColumnSnapshot {
1694 name: "createdAt".into(),
1695 column_type: "TIMESTAMPTZ".into(),
1696 notnull: true,
1697 primary_key: false,
1698 },
1699 ],
1700 indexes: vec![],
1701 },
1702 crate::TableSnapshot {
1703 name: "Todo".into(),
1704 columns: vec![
1705 crate::ColumnSnapshot {
1706 name: "id".into(),
1707 column_type: "TEXT".into(),
1708 notnull: true,
1709 primary_key: true,
1710 },
1711 crate::ColumnSnapshot {
1712 name: "title".into(),
1713 column_type: "TEXT".into(),
1714 notnull: true,
1715 primary_key: false,
1716 },
1717 crate::ColumnSnapshot {
1718 name: "done".into(),
1719 column_type: "BOOLEAN".into(),
1720 notnull: true,
1721 primary_key: false,
1722 },
1723 crate::ColumnSnapshot {
1724 name: "userId".into(),
1725 column_type: "TEXT".into(),
1726 notnull: true,
1727 primary_key: false,
1728 },
1729 crate::ColumnSnapshot {
1730 name: "createdAt".into(),
1731 column_type: "TIMESTAMPTZ".into(),
1732 notnull: true,
1733 primary_key: false,
1734 },
1735 ],
1736 indexes: vec![crate::IndexSnapshot {
1737 name: "Todo_by_user".into(),
1738 columns: vec!["userId".into()],
1739 unique: false,
1740 }],
1741 },
1742 ],
1743 };
1744 let manifest = test_manifest();
1745 let plan = plan_from_snapshot(&snapshot, &manifest);
1746 assert!(plan.is_empty());
1747 }
1748
1749 #[test]
1750 fn plan_detects_missing_column_in_snapshot() {
1751 let snapshot = crate::SchemaSnapshot {
1752 tables: vec![
1753 crate::TableSnapshot {
1754 name: "User".into(),
1755 columns: vec![
1756 crate::ColumnSnapshot {
1757 name: "id".into(),
1758 column_type: "TEXT".into(),
1759 notnull: true,
1760 primary_key: true,
1761 },
1762 crate::ColumnSnapshot {
1763 name: "email".into(),
1764 column_type: "TEXT".into(),
1765 notnull: true,
1766 primary_key: false,
1767 },
1768 ],
1770 indexes: vec![],
1771 },
1772 crate::TableSnapshot {
1773 name: "Todo".into(),
1774 columns: vec![
1775 crate::ColumnSnapshot {
1776 name: "id".into(),
1777 column_type: "TEXT".into(),
1778 notnull: true,
1779 primary_key: true,
1780 },
1781 crate::ColumnSnapshot {
1782 name: "title".into(),
1783 column_type: "TEXT".into(),
1784 notnull: true,
1785 primary_key: false,
1786 },
1787 crate::ColumnSnapshot {
1788 name: "done".into(),
1789 column_type: "BOOLEAN".into(),
1790 notnull: true,
1791 primary_key: false,
1792 },
1793 crate::ColumnSnapshot {
1794 name: "userId".into(),
1795 column_type: "TEXT".into(),
1796 notnull: true,
1797 primary_key: false,
1798 },
1799 crate::ColumnSnapshot {
1800 name: "createdAt".into(),
1801 column_type: "TIMESTAMPTZ".into(),
1802 notnull: true,
1803 primary_key: false,
1804 },
1805 ],
1806 indexes: vec![crate::IndexSnapshot {
1807 name: "Todo_by_user".into(),
1808 columns: vec!["userId".into()],
1809 unique: false,
1810 }],
1811 },
1812 ],
1813 };
1814 let manifest = test_manifest();
1815 let plan = plan_from_snapshot(&snapshot, &manifest);
1816
1817 let add_fields: Vec<_> = plan
1818 .operations
1819 .iter()
1820 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1821 .collect();
1822 assert_eq!(add_fields.len(), 2); }
1824
1825 #[test]
1828 fn json_value_to_string_handles_all_types() {
1829 assert_eq!(
1830 json_value_to_string(&serde_json::Value::String("hello".into())),
1831 "hello"
1832 );
1833 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1834 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1835 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1836 assert_eq!(
1837 json_value_to_string(&serde_json::Value::Bool(false)),
1838 "false"
1839 );
1840 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1841 assert_eq!(
1843 json_value_to_string(&serde_json::json!([1, 2, 3])),
1844 "[1,2,3]"
1845 );
1846 assert_eq!(
1847 json_value_to_string(&serde_json::json!({"a": 1})),
1848 "{\"a\":1}"
1849 );
1850 }
1851
1852 #[test]
1853 fn generate_id_returns_hex_string() {
1854 let id = generate_id();
1855 assert!(!id.is_empty());
1856 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1858 }
1859
1860 #[test]
1861 fn generate_id_is_unique_across_calls() {
1862 let id1 = generate_id();
1863 let id2 = generate_id();
1864 assert_ne!(id1, id2);
1865 }
1866
1867 #[test]
1868 fn generate_id_is_lex_sortable() {
1869 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1872 let sorted = {
1873 let mut s = ids.clone();
1874 s.sort();
1875 s
1876 };
1877 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1878 let len0 = ids[0].len();
1881 assert!(ids.iter().all(|id| id.len() == len0));
1882 ids.dedup();
1883 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1884 }
1885
1886 #[test]
1887 fn build_insert_sql_simple() {
1888 let data = serde_json::json!({
1889 "email": "alice@example.com",
1890 "displayName": "Alice"
1891 });
1892 let (sql, values) = build_insert_sql("User", &data).unwrap();
1893
1894 assert!(sql.starts_with("INSERT INTO \"User\""));
1895 assert!(sql.contains("id"));
1896 assert!(sql.contains("$1"));
1897 assert!(sql.contains("$2"));
1898 assert!(sql.contains("$3"));
1899 match &values[0] {
1901 JsonParam::Text(s) => assert!(!s.is_empty()),
1902 other => panic!("expected Text id param, got {other:?}"),
1903 }
1904 assert_eq!(values.len(), 3); }
1906
1907 #[test]
1908 fn build_insert_sql_preserves_json_types() {
1909 let data = serde_json::json!({
1910 "n": 42,
1911 "f": 1.5,
1912 "b": true,
1913 "s": "hi",
1914 "z": null,
1915 });
1916 let (_sql, values) = build_insert_sql("T", &data).unwrap();
1917 let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
1919 assert!(matches!(kinds[0], JsonParam::Bool(true)));
1920 assert!(matches!(kinds[1], JsonParam::Float(_)));
1921 assert!(matches!(kinds[2], JsonParam::Int(42)));
1922 assert!(matches!(kinds[3], JsonParam::Text(_)));
1923 assert!(matches!(kinds[4], JsonParam::Null));
1924 }
1925
1926 #[test]
1927 fn build_insert_sql_quotes_column_names() {
1928 let data = serde_json::json!({"createdAt": "2026-01-01"});
1929 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1930 assert!(sql.contains("\"createdAt\""));
1931 assert!(sql.contains("\"Todo\""));
1932 }
1933
1934 #[test]
1935 fn build_insert_sql_rejects_non_object() {
1936 let data = serde_json::json!("not an object");
1937 let result = build_insert_sql("User", &data);
1938 assert!(result.is_err());
1939 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1940 }
1941
1942 #[test]
1943 fn build_update_sql_simple() {
1944 let data = serde_json::json!({
1945 "displayName": "Bob",
1946 "email": "bob@example.com"
1947 });
1948 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1949
1950 assert!(sql.starts_with("UPDATE \"User\" SET"));
1951 assert!(sql.contains("WHERE id = $1"));
1952 assert!(sql.contains("$2"));
1953 assert!(sql.contains("$3"));
1954 match &values[0] {
1955 JsonParam::Text(s) => assert_eq!(s, "abc123"),
1956 other => panic!("expected Text id param, got {other:?}"),
1957 }
1958 assert_eq!(values.len(), 3); }
1960
1961 #[test]
1962 fn build_update_sql_quotes_column_names() {
1963 let data = serde_json::json!({"displayName": "Carol"});
1964 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1965 assert!(sql.contains("\"displayName\" = $2"));
1966 }
1967
1968 #[test]
1969 fn build_update_sql_rejects_non_object() {
1970 let data = serde_json::json!(42);
1971 let result = build_update_sql("User", "id1", &data);
1972 assert!(result.is_err());
1973 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1974 }
1975
1976 #[test]
1977 fn build_update_sql_rejects_empty_object() {
1978 let data = serde_json::json!({});
1979 let err = build_update_sql("User", "id1", &data).unwrap_err();
1980 assert_eq!(err.code, "PG_INVALID_DATA");
1981 assert!(err.message.contains("at least one field"));
1982 }
1983}