1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48 for field in fields {
49 let col_type = pg_column_type(&field.field_type);
50 let not_null = if field.optional { "" } else { " NOT NULL" };
51 let unique = if field.unique { " UNIQUE" } else { "" };
52 columns.push(format!(
53 "{} {}{}{}",
54 quote_ident(&field.name),
55 col_type,
56 not_null,
57 unique
58 ));
59 }
60
61 format!(
62 "CREATE TABLE IF NOT EXISTS {} ({})",
63 quote_ident(entity_name),
64 columns.join(", ")
65 )
66}
67
68pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72 let col_type = pg_column_type(&field.field_type);
73 let unique = if field.unique { " UNIQUE" } else { "" };
74 format!(
75 "ALTER TABLE {} ADD COLUMN {} {}{}",
76 quote_ident(entity_name),
77 quote_ident(&field.name),
78 col_type,
79 unique
80 )
81}
82
83pub fn create_index_sql(
85 entity_name: &str,
86 index_name: &str,
87 fields: &[String],
88 unique: bool,
89) -> String {
90 let unique_str = if unique { "UNIQUE " } else { "" };
91 let full_index_name = format!("{}_{}", entity_name, index_name);
92 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93 format!(
94 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95 unique_str,
96 quote_ident(&full_index_name),
97 quote_ident(entity_name),
98 quoted_fields.join(", ")
99 )
100}
101
102pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112 let mut operations = Vec::new();
114
115 for entity in &target.entities {
116 let fields: Vec<FieldSpec> = entity
117 .fields
118 .iter()
119 .map(|f| FieldSpec {
120 name: f.name.clone(),
121 field_type: f.field_type.clone(),
122 optional: f.optional,
123 unique: f.unique,
124 })
125 .collect();
126
127 operations.push(SchemaOperation::CreateEntity {
128 name: entity.name.clone(),
129 fields,
130 });
131
132 for index in &entity.indexes {
133 operations.push(SchemaOperation::AddIndex {
134 entity: entity.name.clone(),
135 name: index.name.clone(),
136 fields: index.fields.clone(),
137 unique: index.unique,
138 });
139 }
140 }
141
142 if operations.is_empty() {
143 operations.push(SchemaOperation::Noop);
144 }
145
146 Ok(SchemaPlan { operations })
147 }
148
149 }
151
152pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155 let mut statements = Vec::new();
156
157 for op in &plan.operations {
158 match op {
159 SchemaOperation::CreateEntity { name, fields } => {
160 statements.push(create_table_sql(name, fields));
161 }
162 SchemaOperation::AddField { entity, field } => {
163 statements.push(add_column_sql(entity, field));
164 }
165 SchemaOperation::AddIndex {
166 entity,
167 name,
168 fields,
169 unique,
170 } => {
171 statements.push(create_index_sql(entity, name, fields, *unique));
172 }
173 SchemaOperation::Noop => {}
174 other => {
175 return Err(StorageError {
176 code: "PG_OP_UNSUPPORTED".into(),
177 message: format!("Operation not supported by Postgres adapter: {other:?}"),
178 });
179 }
180 }
181 }
182
183 Ok(statements)
184}
185
186pub const INTROSPECT_TABLES_SQL: &str = "\
195 SELECT table_name \
196 FROM information_schema.tables \
197 WHERE table_schema = 'public' \
198 AND table_type = 'BASE TABLE' \
199 AND table_name NOT LIKE '_pylon_%' \
200 ORDER BY table_name";
201
202pub const INTROSPECT_COLUMNS_SQL: &str = "\
205 SELECT column_name, data_type, is_nullable, \
206 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
207 JOIN information_schema.key_column_usage kcu \
208 ON tc.constraint_name = kcu.constraint_name \
209 WHERE tc.table_name = c.table_name \
210 AND kcu.column_name = c.column_name \
211 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
212 FROM information_schema.columns c \
213 WHERE table_schema = 'public' AND table_name = $1 \
214 ORDER BY ordinal_position";
215
216pub const INTROSPECT_INDEXES_SQL: &str = "\
219 SELECT i.relname as index_name, \
220 ix.indisunique as is_unique, \
221 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
222 FROM pg_index ix \
223 JOIN pg_class t ON t.oid = ix.indrelid \
224 JOIN pg_class i ON i.oid = ix.indexrelid \
225 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
226 JOIN pg_namespace n ON n.oid = t.relnamespace \
227 WHERE n.nspname = 'public' \
228 AND t.relname = $1 \
229 AND NOT ix.indisprimary \
230 GROUP BY i.relname, ix.indisunique \
231 ORDER BY i.relname";
232
233pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
236 crate::plan_from_snapshot(snapshot, target)
237}
238
239pub fn generate_id() -> String {
252 use std::sync::atomic::{AtomicU32, Ordering};
253 use std::time::{SystemTime, UNIX_EPOCH};
254 static COUNTER: AtomicU32 = AtomicU32::new(0);
255 let ts = SystemTime::now()
256 .duration_since(UNIX_EPOCH)
257 .unwrap_or_default()
258 .as_nanos();
259 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
260 format!("{ts:032x}{seq:08x}")
261}
262
263pub fn json_value_to_string(val: &serde_json::Value) -> String {
272 match val {
273 serde_json::Value::String(s) => s.clone(),
274 serde_json::Value::Number(n) => n.to_string(),
275 serde_json::Value::Bool(b) => b.to_string(),
276 serde_json::Value::Null => String::new(),
277 other => other.to_string(),
278 }
279}
280
281#[derive(Debug, Clone, PartialEq)]
297pub enum JsonParam {
298 Null,
299 Text(String),
300 Int(i64),
301 Float(f64),
302 Bool(bool),
303}
304
305impl JsonParam {
306 pub fn from_json(val: &serde_json::Value) -> Self {
310 match val {
311 serde_json::Value::Null => JsonParam::Null,
312 serde_json::Value::String(s) => JsonParam::Text(s.clone()),
313 serde_json::Value::Bool(b) => JsonParam::Bool(*b),
314 serde_json::Value::Number(n) => {
315 if let Some(i) = n.as_i64() {
316 JsonParam::Int(i)
317 } else if let Some(f) = n.as_f64() {
318 JsonParam::Float(f)
319 } else {
320 JsonParam::Text(n.to_string())
321 }
322 }
323 other => JsonParam::Text(other.to_string()),
324 }
325 }
326}
327
328#[cfg(feature = "postgres-live")]
329impl postgres::types::ToSql for JsonParam {
330 fn to_sql(
331 &self,
332 ty: &postgres::types::Type,
333 out: &mut bytes::BytesMut,
334 ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
335 use postgres::types::Type;
336
337 if matches!(self, JsonParam::Null) {
341 return Ok(postgres::types::IsNull::Yes);
342 }
343
344 match (self, ty) {
351 (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
352
353 (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
354 (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
355 (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
356 (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
357 (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
358
359 (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
360 (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
361 (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
362 (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
363
364 (JsonParam::Text(s), &Type::TEXT)
365 | (JsonParam::Text(s), &Type::VARCHAR)
366 | (JsonParam::Text(s), &Type::BPCHAR)
367 | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
368 (JsonParam::Text(s), &Type::TIMESTAMPTZ) | (JsonParam::Text(s), &Type::TIMESTAMP) => {
369 s.as_str().to_sql(ty, out)
375 }
376
377 (other, _) => {
382 let s = match other {
383 JsonParam::Bool(b) => b.to_string(),
384 JsonParam::Int(n) => n.to_string(),
385 JsonParam::Float(f) => f.to_string(),
386 JsonParam::Text(s) => s.clone(),
387 JsonParam::Null => unreachable!(),
388 };
389 s.to_sql(ty, out)
390 }
391 }
392 }
393
394 fn accepts(_ty: &postgres::types::Type) -> bool {
395 true
399 }
400
401 postgres::types::to_sql_checked!();
402}
403
404pub fn build_insert_sql(
412 entity: &str,
413 data: &serde_json::Value,
414) -> Result<(String, Vec<JsonParam>), StorageError> {
415 let id = generate_id();
416 let obj = data.as_object().ok_or_else(|| StorageError {
417 code: "PG_INVALID_DATA".into(),
418 message: "Insert data must be a JSON object".into(),
419 })?;
420
421 let mut col_names = vec!["id".to_string()];
422 let mut placeholders = vec!["$1".to_string()];
423 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
424
425 for (i, (key, val)) in obj.iter().enumerate() {
426 col_names.push(quote_ident(key));
427 placeholders.push(format!("${}", i + 2));
428 values.push(JsonParam::from_json(val));
429 }
430
431 let sql = format!(
432 "INSERT INTO {} ({}) VALUES ({})",
433 quote_ident(entity),
434 col_names.join(", "),
435 placeholders.join(", ")
436 );
437
438 Ok((sql, values))
439}
440
441pub fn build_update_sql(
444 entity: &str,
445 id: &str,
446 data: &serde_json::Value,
447) -> Result<(String, Vec<JsonParam>), StorageError> {
448 let obj = data.as_object().ok_or_else(|| StorageError {
449 code: "PG_INVALID_DATA".into(),
450 message: "Update data must be a JSON object".into(),
451 })?;
452
453 if obj.is_empty() {
454 return Err(StorageError {
455 code: "PG_INVALID_DATA".into(),
456 message: "Update data must contain at least one field".into(),
457 });
458 }
459
460 let mut set_clauses = Vec::new();
461 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
462
463 for (i, (key, val)) in obj.iter().enumerate() {
464 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
465 values.push(JsonParam::from_json(val));
466 }
467
468 let sql = format!(
469 "UPDATE {} SET {} WHERE id = $1",
470 quote_ident(entity),
471 set_clauses.join(", ")
472 );
473
474 Ok((sql, values))
475}
476
477#[cfg(feature = "postgres-live")]
481fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
482 values
483 .iter()
484 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
485 .collect()
486}
487
488#[cfg(feature = "postgres-live")]
493pub mod live {
494 use super::*;
495 use crate::{
496 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
497 };
498
499 pub struct LivePostgresAdapter {
501 client: postgres::Client,
502 }
503
504 impl LivePostgresAdapter {
505 pub fn connect(url: &str) -> Result<Self, StorageError> {
507 let client =
508 postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
509 code: "PG_CONNECT_FAILED".into(),
510 message: format!("Failed to connect to Postgres: {e}"),
511 })?;
512 Ok(Self { client })
513 }
514
515 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
517 let table_rows = self
518 .client
519 .query(INTROSPECT_TABLES_SQL, &[])
520 .map_err(pg_err)?;
521
522 let mut tables = Vec::new();
523 for row in &table_rows {
524 let table_name: String = row.get(0);
525 let columns = self.read_columns(&table_name)?;
526 let indexes = self.read_indexes(&table_name)?;
527 tables.push(TableSnapshot {
528 name: table_name,
529 columns,
530 indexes,
531 });
532 }
533
534 Ok(SchemaSnapshot { tables })
535 }
536
537 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
538 let rows = self
539 .client
540 .query(INTROSPECT_COLUMNS_SQL, &[&table])
541 .map_err(pg_err)?;
542
543 let mut columns = Vec::new();
544 for row in &rows {
545 let name: String = row.get(0);
546 let data_type: String = row.get(1);
547 let is_nullable: String = row.get(2);
548 let is_pk: i64 = row.get(3);
549 columns.push(ColumnSnapshot {
550 name,
551 column_type: data_type,
552 notnull: is_nullable == "NO",
553 primary_key: is_pk > 0,
554 });
555 }
556 Ok(columns)
557 }
558
559 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
560 let rows = self
561 .client
562 .query(INTROSPECT_INDEXES_SQL, &[&table])
563 .map_err(pg_err)?;
564
565 let mut indexes = Vec::new();
566 for row in &rows {
567 let name: String = row.get(0);
568 let unique: bool = row.get(1);
569 let columns: Vec<String> = row.get(2);
570 indexes.push(IndexSnapshot {
571 name,
572 columns,
573 unique,
574 });
575 }
576 Ok(indexes)
577 }
578
579 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
581 let snapshot = self.read_schema()?;
582 Ok(crate::plan_from_snapshot(&snapshot, target))
583 }
584 }
585
586 impl StorageAdapter for LivePostgresAdapter {
587 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
588 Err(StorageError {
589 code: "PG_PLAN_NEEDS_MUTABLE".into(),
590 message: "Use plan_from_live() instead for live Postgres planning".into(),
591 })
592 }
593
594 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
595 Err(StorageError {
596 code: "PG_APPLY_USE_METHOD".into(),
597 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
598 })
599 }
600 }
601
602 impl LivePostgresAdapter {
603 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
605 let statements = plan_to_sql(plan)?;
606 for sql in &statements {
607 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
608 }
609 Ok(())
610 }
611
612 pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
618 self.client.execute(sql, &[]).map_err(pg_err)
619 }
620
621 pub fn insert(
623 &mut self,
624 entity: &str,
625 data: &serde_json::Value,
626 ) -> Result<String, StorageError> {
627 let (sql, values) = build_insert_sql(entity, data)?;
628 let id = match &values[0] {
631 JsonParam::Text(s) => s.clone(),
632 _ => {
633 return Err(StorageError {
634 code: "PG_INTERNAL".into(),
635 message: "build_insert_sql produced non-text id param".into(),
636 });
637 }
638 };
639 let params = as_pg_params(&values);
640 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
641 Ok(id)
642 }
643
644 pub fn get_by_id(
646 &mut self,
647 entity: &str,
648 id: &str,
649 ) -> Result<Option<serde_json::Value>, StorageError> {
650 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
651 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
652
653 match rows.first() {
654 Some(row) => Ok(Some(row_to_json(row))),
655 None => Ok(None),
656 }
657 }
658
659 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
661 let sql = format!("SELECT * FROM {}", quote_ident(entity));
662 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
663
664 Ok(rows.iter().map(row_to_json).collect())
665 }
666
667 pub fn list_after(
671 &mut self,
672 entity: &str,
673 after: Option<&str>,
674 limit: usize,
675 ) -> Result<Vec<serde_json::Value>, StorageError> {
676 let capped: i64 = limit.min(10_000) as i64;
679 let sql = match after {
680 Some(_) => format!(
681 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
682 quote_ident(entity)
683 ),
684 None => format!(
685 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
686 quote_ident(entity)
687 ),
688 };
689 let rows = match after {
690 Some(cursor) => self
691 .client
692 .query(sql.as_str(), &[&cursor, &capped])
693 .map_err(pg_err)?,
694 None => self
695 .client
696 .query(sql.as_str(), &[&capped])
697 .map_err(pg_err)?,
698 };
699 Ok(rows.iter().map(row_to_json).collect())
700 }
701
702 pub fn update(
704 &mut self,
705 entity: &str,
706 id: &str,
707 data: &serde_json::Value,
708 ) -> Result<bool, StorageError> {
709 let (sql, values) = build_update_sql(entity, id, data)?;
710 let params = as_pg_params(&values);
711 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
712 Ok(rows_affected > 0)
713 }
714
715 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
717 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
718 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
719 Ok(rows_affected > 0)
720 }
721
722 pub fn lookup_field(
726 &mut self,
727 entity: &str,
728 field: &str,
729 value: &str,
730 ) -> Result<Option<serde_json::Value>, StorageError> {
731 let sql = format!(
732 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
733 quote_ident(entity),
734 quote_ident(field),
735 );
736 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
737 Ok(rows.first().map(row_to_json))
738 }
739
740 pub fn query_filtered(
763 &mut self,
764 entity: &str,
765 filter: &serde_json::Value,
766 valid_columns: &[String],
767 ) -> Result<Vec<serde_json::Value>, StorageError> {
768 let empty = serde_json::Map::new();
769 let obj = filter.as_object().unwrap_or(&empty);
770
771 let validate = |col: &str| -> Result<(), StorageError> {
772 if col == "id" || valid_columns.iter().any(|c| c == col) {
773 Ok(())
774 } else {
775 Err(StorageError {
776 code: "UNKNOWN_COLUMN".into(),
777 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
778 })
779 }
780 };
781
782 let mut where_clauses: Vec<String> = Vec::new();
783 let mut order_clause = String::new();
784 let mut limit_clause = String::new();
785 let mut offset_clause = String::new();
786 let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
790
791 for (key, val) in obj {
792 match key.as_str() {
793 "$search" => {
794 return Err(StorageError {
801 code: "SEARCH_NOT_SUPPORTED".into(),
802 message: "$search is SQLite-FTS5-only; use a Postgres tsvector column with the storage adapter's full-text path"
803 .into(),
804 });
805 }
806 "$order" => {
807 if let Some(ord) = val.as_object() {
808 let mut parts = Vec::new();
809 for (col, dir) in ord {
810 validate(col)?;
811 let d = match dir.as_str().unwrap_or("asc") {
812 "desc" | "DESC" => "DESC",
813 _ => "ASC",
814 };
815 parts.push(format!("{} {d}", quote_ident(col)));
816 }
817 if !parts.is_empty() {
818 order_clause = format!(" ORDER BY {}", parts.join(", "));
819 }
820 }
821 }
822 "$limit" => {
823 if let Some(n) = val.as_u64() {
824 limit_clause = format!(" LIMIT {}", n);
825 }
826 }
827 "$offset" => {
828 if let Some(n) = val.as_u64() {
829 offset_clause = format!(" OFFSET {}", n);
830 }
831 }
832 field => {
833 validate(field)?;
834 match val {
835 serde_json::Value::Object(ops) => {
836 for (op, v) in ops {
837 match op.as_str() {
838 "$not" => planned.push((
839 field.into(),
840 "!=".into(),
841 value_to_pg(v),
842 )),
843 "$gt" => {
844 planned.push((field.into(), ">".into(), value_to_pg(v)))
845 }
846 "$gte" => planned.push((
847 field.into(),
848 ">=".into(),
849 value_to_pg(v),
850 )),
851 "$lt" => {
852 planned.push((field.into(), "<".into(), value_to_pg(v)))
853 }
854 "$lte" => planned.push((
855 field.into(),
856 "<=".into(),
857 value_to_pg(v),
858 )),
859 "$like" => planned.push((
860 field.into(),
861 "LIKE".into(),
862 value_to_pg(v),
863 )),
864 "$in" => {
865 if let Some(arr) = v.as_array() {
866 let placeholders: Vec<String> = (0..arr.len())
867 .map(|i| format!("${}", planned.len() + 1 + i))
868 .collect();
869 where_clauses.push(format!(
870 "{} IN ({})",
871 quote_ident(field),
872 placeholders.join(", "),
873 ));
874 for x in arr {
875 planned.push((
876 format!("__inline_{}", planned.len()),
877 "__INLINE__".into(),
878 value_to_pg(x),
879 ));
880 }
881 }
882 }
883 _ => {}
884 }
885 }
886 }
887 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
888 }
889 }
890 }
891 }
892
893 let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
895 for (field, op, v) in &planned {
896 if op == "__INLINE__" {
897 params.push(v.clone());
899 } else {
900 let placeholder = format!("${}", params.len() + 1);
901 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
902 params.push(v.clone());
903 }
904 }
905
906 let where_sql = if where_clauses.is_empty() {
907 String::new()
908 } else {
909 format!(" WHERE {}", where_clauses.join(" AND "))
910 };
911 let sql = format!(
912 "SELECT * FROM {}{}{}{}{}",
913 quote_ident(entity),
914 where_sql,
915 order_clause,
916 limit_clause,
917 offset_clause,
918 );
919
920 let pg_params = as_pg_params(¶ms);
921 let rows = self
922 .client
923 .query(sql.as_str(), &pg_params)
924 .map_err(pg_err)?;
925 Ok(rows.iter().map(row_to_json).collect())
926 }
927
928 pub fn aggregate(
947 &mut self,
948 entity: &str,
949 spec: &serde_json::Value,
950 valid_columns: &[String],
951 ) -> Result<serde_json::Value, StorageError> {
952 let obj = spec.as_object().ok_or_else(|| StorageError {
953 code: "INVALID_QUERY".into(),
954 message: "aggregate spec must be a JSON object".into(),
955 })?;
956
957 let validate = |col: &str| -> Result<(), StorageError> {
958 if col == "id" || valid_columns.iter().any(|c| c == col) {
959 Ok(())
960 } else {
961 Err(StorageError {
962 code: "UNKNOWN_COLUMN".into(),
963 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
964 })
965 }
966 };
967
968 let mut select_parts: Vec<String> = Vec::new();
969 let mut result_fields: Vec<String> = Vec::new();
970
971 if let Some(count) = obj.get("count") {
972 match count {
973 serde_json::Value::String(s) if s == "*" => {
974 select_parts.push("COUNT(*) AS count".into());
975 result_fields.push("count".into());
976 }
977 serde_json::Value::String(field) => {
978 validate(field)?;
979 let alias = format!("count_{field}");
980 select_parts.push(format!(
981 "COUNT({}) AS {}",
982 quote_ident(field),
983 quote_ident(&alias),
984 ));
985 result_fields.push(alias);
986 }
987 _ => {}
988 }
989 }
990
991 for (fn_name, prefix) in [
992 ("sum", "sum_"),
993 ("avg", "avg_"),
994 ("min", "min_"),
995 ("max", "max_"),
996 ] {
997 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
998 for field in fields {
999 if let Some(f) = field.as_str() {
1000 validate(f)?;
1001 let alias = format!("{prefix}{f}");
1002 let sql_fn = fn_name.to_uppercase();
1003 select_parts.push(format!(
1004 "{}({}) AS {}",
1005 sql_fn,
1006 quote_ident(f),
1007 quote_ident(&alias),
1008 ));
1009 result_fields.push(alias);
1010 }
1011 }
1012 }
1013 }
1014
1015 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1016 for field in fields {
1017 if let Some(f) = field.as_str() {
1018 validate(f)?;
1019 let alias = format!("count_distinct_{f}");
1020 select_parts.push(format!(
1021 "COUNT(DISTINCT {}) AS {}",
1022 quote_ident(f),
1023 quote_ident(&alias),
1024 ));
1025 result_fields.push(alias);
1026 }
1027 }
1028 }
1029
1030 let mut group_by: Vec<String> = Vec::new();
1035 let mut group_select: Vec<String> = Vec::new();
1036 let mut group_field_names: Vec<String> = Vec::new();
1037 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1038 for g in groups {
1039 if let Some(f) = g.as_str() {
1040 validate(f)?;
1041 let q = quote_ident(f);
1042 group_by.push(q.clone());
1043 group_select.push(q);
1044 group_field_names.push(f.to_string());
1045 } else if let Some(spec) = g.as_object() {
1046 let field =
1047 spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1048 StorageError {
1049 code: "INVALID_QUERY".into(),
1050 message: "groupBy object spec requires `field`".into(),
1051 }
1052 })?;
1053 validate(field)?;
1054 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1055 let trunc_unit = match bucket {
1056 "hour" | "day" | "week" | "month" | "year" => bucket,
1057 _ => {
1058 return Err(StorageError {
1059 code: "INVALID_QUERY".into(),
1060 message: format!(
1061 "bucket must be one of hour/day/week/month/year, got {bucket}"
1062 ),
1063 });
1064 }
1065 };
1066 let alias = format!("{field}_{bucket}");
1067 let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1068 group_by.push(expr.clone());
1069 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1070 group_field_names.push(alias);
1071 }
1072 }
1073 }
1074
1075 let mut full_select = group_select.clone();
1076 full_select.extend(select_parts.iter().cloned());
1077 if full_select.is_empty() {
1078 return Err(StorageError {
1079 code: "INVALID_QUERY".into(),
1080 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1081 });
1082 }
1083
1084 let mut where_clauses: Vec<String> = Vec::new();
1085 let mut params: Vec<JsonParam> = Vec::new();
1086 if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1087 for (k, v) in w {
1088 validate(k)?;
1089 let placeholder = format!("${}", params.len() + 1);
1090 where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1091 params.push(value_to_pg(v));
1092 }
1093 }
1094 let where_sql = if where_clauses.is_empty() {
1095 String::new()
1096 } else {
1097 format!(" WHERE {}", where_clauses.join(" AND "))
1098 };
1099 let group_sql = if group_by.is_empty() {
1100 String::new()
1101 } else {
1102 format!(" GROUP BY {}", group_by.join(", "))
1103 };
1104
1105 let sql = format!(
1106 "SELECT {} FROM {}{}{}",
1107 full_select.join(", "),
1108 quote_ident(entity),
1109 where_sql,
1110 group_sql,
1111 );
1112
1113 let pg_params = as_pg_params(¶ms);
1114 let rows = self
1115 .client
1116 .query(sql.as_str(), &pg_params)
1117 .map_err(pg_err)?;
1118
1119 let column_names: Vec<String> = group_field_names
1120 .iter()
1121 .chain(result_fields.iter())
1122 .cloned()
1123 .collect();
1124
1125 let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1126 for row in &rows {
1127 let row_json = row_to_json(row);
1128 if let serde_json::Value::Object(map) = &row_json {
1129 let mut filtered = serde_json::Map::new();
1130 for name in &column_names {
1131 if let Some(v) = map.get(name) {
1132 filtered.insert(name.clone(), v.clone());
1133 }
1134 }
1135 out.push(serde_json::Value::Object(filtered));
1136 } else {
1137 out.push(row_json);
1138 }
1139 }
1140 Ok(serde_json::json!({ "rows": out }))
1141 }
1142 }
1143
1144 pub enum TxOp<'a> {
1146 Insert {
1147 entity: &'a str,
1148 data: &'a serde_json::Value,
1149 },
1150 Update {
1151 entity: &'a str,
1152 id: &'a str,
1153 data: &'a serde_json::Value,
1154 },
1155 Delete {
1156 entity: &'a str,
1157 id: &'a str,
1158 },
1159 }
1160
1161 #[derive(Debug, Clone)]
1163 pub enum TxResult {
1164 Inserted(String),
1165 Updated(bool),
1166 Deleted(bool),
1167 }
1168
1169 impl LivePostgresAdapter {
1170 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1175 let mut tx = self.client.transaction().map_err(pg_err)?;
1176 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1177
1178 for op in ops {
1179 match op {
1180 TxOp::Insert { entity, data } => {
1181 let (sql, values) = build_insert_sql(entity, data)?;
1182 let id = match &values[0] {
1183 JsonParam::Text(s) => s.clone(),
1184 _ => {
1185 return Err(StorageError {
1186 code: "PG_INTERNAL".into(),
1187 message: "build_insert_sql produced non-text id param".into(),
1188 });
1189 }
1190 };
1191 let params = as_pg_params(&values);
1192 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1193 results.push(TxResult::Inserted(id));
1194 }
1195 TxOp::Update { entity, id, data } => {
1196 let (sql, values) = build_update_sql(entity, id, data)?;
1197 let params = as_pg_params(&values);
1198 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1199 results.push(TxResult::Updated(n > 0));
1200 }
1201 TxOp::Delete { entity, id } => {
1202 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1203 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1204 results.push(TxResult::Deleted(n > 0));
1205 }
1206 }
1207 }
1208
1209 tx.commit().map_err(pg_err)?;
1210 Ok(results)
1211 }
1212 }
1213
1214 fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1220 JsonParam::from_json(v)
1221 }
1222
1223 fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1224 use postgres::types::Type;
1225 let mut obj = serde_json::Map::new();
1226 for (i, col) in row.columns().iter().enumerate() {
1227 let name = col.name().to_string();
1228
1229 let value: serde_json::Value = match *col.type_() {
1240 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1241 .flatten()
1242 .map(serde_json::Value::Bool)
1243 .unwrap_or(serde_json::Value::Null),
1244 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1245 .flatten()
1246 .map(|v| serde_json::Value::Number(v.into()))
1247 .unwrap_or(serde_json::Value::Null),
1248 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1249 .flatten()
1250 .map(|v| serde_json::Value::Number(v.into()))
1251 .unwrap_or(serde_json::Value::Null),
1252 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1253 .flatten()
1254 .map(|v| serde_json::Value::Number(v.into()))
1255 .unwrap_or(serde_json::Value::Null),
1256 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1257 .flatten()
1258 .and_then(|v| serde_json::Number::from_f64(v as f64))
1259 .map(serde_json::Value::Number)
1260 .unwrap_or(serde_json::Value::Null),
1261 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1262 .flatten()
1263 .and_then(serde_json::Number::from_f64)
1264 .map(serde_json::Value::Number)
1265 .unwrap_or(serde_json::Value::Null),
1266 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1267 .flatten()
1268 .unwrap_or(serde_json::Value::Null),
1269 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1270 .flatten()
1271 .map(|b| serde_json::Value::String(b64(&b)))
1272 .unwrap_or(serde_json::Value::Null),
1273 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1274 try_get_or_null::<Option<String>>(row, i)
1275 .flatten()
1276 .map(serde_json::Value::String)
1277 .unwrap_or(serde_json::Value::Null)
1278 }
1279 _ => {
1280 match row.try_get::<_, Option<String>>(i) {
1285 Ok(Some(s)) => serde_json::Value::String(s),
1286 Ok(None) => serde_json::Value::Null,
1287 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1288 Ok(Some(bytes)) => serde_json::Value::String(
1289 String::from_utf8_lossy(&bytes).into_owned(),
1290 ),
1291 _ => serde_json::Value::Null,
1292 },
1293 }
1294 }
1295 };
1296 obj.insert(name, value);
1297 }
1298 serde_json::Value::Object(obj)
1299 }
1300
1301 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1302 where
1303 T: postgres::types::FromSql<'a>,
1304 {
1305 match row.try_get::<_, T>(i) {
1306 Ok(v) => Some(v),
1307 Err(e) => {
1308 tracing::warn!(
1309 "[postgres] decode failed for column {} ({}): {e}",
1310 i,
1311 row.columns()[i].name()
1312 );
1313 None
1314 }
1315 }
1316 }
1317
1318 fn b64(bytes: &[u8]) -> String {
1321 const TABLE: &[u8; 64] =
1322 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1323 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1324 let chunks = bytes.chunks(3);
1325 for chunk in chunks {
1326 let b = [
1327 chunk.first().copied().unwrap_or(0),
1328 chunk.get(1).copied().unwrap_or(0),
1329 chunk.get(2).copied().unwrap_or(0),
1330 ];
1331 out.push(TABLE[(b[0] >> 2) as usize] as char);
1332 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1333 if chunk.len() > 1 {
1334 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1335 } else {
1336 out.push('=');
1337 }
1338 if chunk.len() > 2 {
1339 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1340 } else {
1341 out.push('=');
1342 }
1343 }
1344 out
1345 }
1346
1347 fn pg_err(e: postgres::Error) -> StorageError {
1348 use std::error::Error;
1354 let mut detail = format!("{e}");
1355 let mut src: Option<&dyn Error> = e.source();
1356 while let Some(s) = src {
1357 detail.push_str(": ");
1358 detail.push_str(&format!("{s}"));
1359 src = s.source();
1360 }
1361 StorageError {
1362 code: "PG_QUERY_FAILED".into(),
1363 message: format!("Postgres query failed: {detail}"),
1364 }
1365 }
1366}
1367
1368#[cfg(test)]
1373mod tests {
1374 use super::*;
1375
1376 fn test_manifest() -> AppManifest {
1380 use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1381 let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1382 name: name.into(),
1383 field_type: ty.into(),
1384 optional: opt,
1385 unique: uniq,
1386 crdt: None,
1387 };
1388 AppManifest {
1389 manifest_version: 1,
1390 name: "test".into(),
1391 version: "0.0.0".into(),
1392 entities: vec![
1393 ManifestEntity {
1394 name: "User".into(),
1395 fields: vec![
1396 f("email", "string", false, true),
1397 f("displayName", "string", false, false),
1398 f("createdAt", "datetime", false, false),
1399 ],
1400 indexes: vec![],
1401 relations: vec![],
1402 search: None,
1403 crdt: true,
1404 },
1405 ManifestEntity {
1406 name: "Todo".into(),
1407 fields: vec![
1408 f("title", "string", false, false),
1409 f("done", "bool", false, false),
1410 f("userId", "id(User)", false, false),
1411 f("createdAt", "datetime", false, false),
1412 ],
1413 indexes: vec![ManifestIndex {
1414 name: "by_user".into(),
1415 fields: vec!["userId".into()],
1416 unique: false,
1417 }],
1418 relations: vec![],
1419 search: None,
1420 crdt: true,
1421 },
1422 ],
1423 queries: vec![],
1424 actions: vec![],
1425 policies: vec![],
1426 routes: vec![],
1427 }
1428 }
1429
1430 #[test]
1431 fn pg_type_mapping() {
1432 assert_eq!(pg_column_type("string"), "TEXT");
1433 assert_eq!(pg_column_type("int"), "INTEGER");
1434 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1435 assert_eq!(pg_column_type("bool"), "BOOLEAN");
1436 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1437 assert_eq!(pg_column_type("richtext"), "TEXT");
1438 assert_eq!(pg_column_type("id(User)"), "TEXT");
1439 }
1440
1441 #[test]
1442 fn quote_ident_simple() {
1443 assert_eq!(quote_ident("User"), "\"User\"");
1444 assert_eq!(quote_ident("email"), "\"email\"");
1445 }
1446
1447 #[test]
1448 fn quote_ident_escapes_embedded_double_quotes() {
1449 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1450 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1451 }
1452
1453 #[test]
1454 fn create_table_sql_basic() {
1455 let fields = vec![
1456 FieldSpec {
1457 name: "email".into(),
1458 field_type: "string".into(),
1459 optional: false,
1460 unique: true,
1461 },
1462 FieldSpec {
1463 name: "age".into(),
1464 field_type: "int".into(),
1465 optional: true,
1466 unique: false,
1467 },
1468 ];
1469 let sql = create_table_sql("User", &fields);
1470 assert_eq!(
1471 sql,
1472 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1473 );
1474 }
1475
1476 #[test]
1477 fn create_table_sql_escapes_identifiers() {
1478 let fields = vec![FieldSpec {
1479 name: "col\"x".into(),
1480 field_type: "string".into(),
1481 optional: false,
1482 unique: false,
1483 }];
1484 let sql = create_table_sql("my\"table", &fields);
1485 assert!(sql.contains("\"my\"\"table\""));
1486 assert!(sql.contains("\"col\"\"x\""));
1487 }
1488
1489 #[test]
1490 fn create_index_sql_unique() {
1491 let sql = create_index_sql("User", "by_email", &["email".into()], true);
1492 assert_eq!(
1493 sql,
1494 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1495 );
1496 }
1497
1498 #[test]
1499 fn create_index_sql_non_unique() {
1500 let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1501 assert_eq!(
1502 sql,
1503 "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1504 );
1505 }
1506
1507 #[test]
1508 fn add_column_sql_basic() {
1509 let field = FieldSpec {
1510 name: "bio".into(),
1511 field_type: "string".into(),
1512 optional: true,
1513 unique: false,
1514 };
1515 let sql = add_column_sql("User", &field);
1516 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1517 }
1518
1519 #[test]
1520 fn plan_from_manifest() {
1521 let adapter = PostgresAdapter;
1522 let manifest = test_manifest();
1523 let plan = adapter.plan_schema(&manifest).unwrap();
1524
1525 assert!(plan.operations.iter().any(|op| matches!(
1527 op,
1528 SchemaOperation::CreateEntity { name, .. } if name == "User"
1529 )));
1530 assert!(plan.operations.iter().any(|op| matches!(
1531 op,
1532 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1533 )));
1534 assert!(plan.operations.iter().any(|op| matches!(
1535 op,
1536 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1537 )));
1538 }
1539
1540 #[test]
1541 fn plan_to_sql_produces_statements() {
1542 let adapter = PostgresAdapter;
1543 let manifest = test_manifest();
1544 let plan = adapter.plan_schema(&manifest).unwrap();
1545 let stmts = plan_to_sql(&plan).unwrap();
1546
1547 let create_tables = stmts
1552 .iter()
1553 .filter(|s| s.starts_with("CREATE TABLE"))
1554 .count();
1555 let create_indexes = stmts
1556 .iter()
1557 .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1558 .count();
1559 assert_eq!(create_tables, 2);
1560 assert!(create_indexes >= 1);
1561 assert!(stmts[0].starts_with("CREATE TABLE"));
1562 assert!(stmts[1].starts_with("CREATE TABLE"));
1563 }
1564
1565 #[test]
1566 fn plan_to_sql_rejects_unsupported() {
1567 let plan = SchemaPlan {
1568 operations: vec![SchemaOperation::RemoveEntity {
1569 name: "User".into(),
1570 }],
1571 };
1572 let result = plan_to_sql(&plan);
1573 assert!(result.is_err());
1574 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1575 }
1576
1577 #[test]
1578 fn apply_not_implemented() {
1579 let adapter = PostgresAdapter;
1580 let plan = SchemaPlan {
1581 operations: vec![SchemaOperation::Noop],
1582 };
1583 let result = adapter.apply_schema(&plan);
1584 assert!(result.is_err());
1585 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1586 }
1587
1588 #[test]
1589 fn sql_uses_quoted_identifiers() {
1590 let fields = vec![FieldSpec {
1591 name: "createdAt".into(),
1592 field_type: "datetime".into(),
1593 optional: false,
1594 unique: false,
1595 }];
1596 let sql = create_table_sql("User", &fields);
1597 assert!(sql.contains("\"User\""));
1599 assert!(sql.contains("\"createdAt\""));
1600 assert!(sql.contains("TIMESTAMPTZ"));
1601 }
1602
1603 #[test]
1606 fn introspect_sql_constants_are_valid() {
1607 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1609 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1610 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1611 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1612 }
1613
1614 #[test]
1617 fn plan_from_empty_snapshot_creates_all() {
1618 let snapshot = crate::SchemaSnapshot { tables: vec![] };
1619 let manifest = test_manifest();
1620 let plan = plan_from_snapshot(&snapshot, &manifest);
1621
1622 assert!(plan.operations.iter().any(|op| matches!(
1623 op,
1624 SchemaOperation::CreateEntity { name, .. } if name == "User"
1625 )));
1626 assert!(plan.operations.iter().any(|op| matches!(
1627 op,
1628 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1629 )));
1630 assert!(plan.operations.iter().any(|op| matches!(
1631 op,
1632 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1633 )));
1634 }
1635
1636 #[test]
1637 fn plan_from_full_snapshot_is_noop() {
1638 let snapshot = crate::SchemaSnapshot {
1639 tables: vec![
1640 crate::TableSnapshot {
1641 name: "User".into(),
1642 columns: vec![
1643 crate::ColumnSnapshot {
1644 name: "id".into(),
1645 column_type: "TEXT".into(),
1646 notnull: true,
1647 primary_key: true,
1648 },
1649 crate::ColumnSnapshot {
1650 name: "email".into(),
1651 column_type: "TEXT".into(),
1652 notnull: true,
1653 primary_key: false,
1654 },
1655 crate::ColumnSnapshot {
1656 name: "displayName".into(),
1657 column_type: "TEXT".into(),
1658 notnull: true,
1659 primary_key: false,
1660 },
1661 crate::ColumnSnapshot {
1662 name: "createdAt".into(),
1663 column_type: "TIMESTAMPTZ".into(),
1664 notnull: true,
1665 primary_key: false,
1666 },
1667 ],
1668 indexes: vec![],
1669 },
1670 crate::TableSnapshot {
1671 name: "Todo".into(),
1672 columns: vec![
1673 crate::ColumnSnapshot {
1674 name: "id".into(),
1675 column_type: "TEXT".into(),
1676 notnull: true,
1677 primary_key: true,
1678 },
1679 crate::ColumnSnapshot {
1680 name: "title".into(),
1681 column_type: "TEXT".into(),
1682 notnull: true,
1683 primary_key: false,
1684 },
1685 crate::ColumnSnapshot {
1686 name: "done".into(),
1687 column_type: "BOOLEAN".into(),
1688 notnull: true,
1689 primary_key: false,
1690 },
1691 crate::ColumnSnapshot {
1692 name: "userId".into(),
1693 column_type: "TEXT".into(),
1694 notnull: true,
1695 primary_key: false,
1696 },
1697 crate::ColumnSnapshot {
1698 name: "createdAt".into(),
1699 column_type: "TIMESTAMPTZ".into(),
1700 notnull: true,
1701 primary_key: false,
1702 },
1703 ],
1704 indexes: vec![crate::IndexSnapshot {
1705 name: "Todo_by_user".into(),
1706 columns: vec!["userId".into()],
1707 unique: false,
1708 }],
1709 },
1710 ],
1711 };
1712 let manifest = test_manifest();
1713 let plan = plan_from_snapshot(&snapshot, &manifest);
1714 assert!(plan.is_empty());
1715 }
1716
1717 #[test]
1718 fn plan_detects_missing_column_in_snapshot() {
1719 let snapshot = crate::SchemaSnapshot {
1720 tables: vec![
1721 crate::TableSnapshot {
1722 name: "User".into(),
1723 columns: vec![
1724 crate::ColumnSnapshot {
1725 name: "id".into(),
1726 column_type: "TEXT".into(),
1727 notnull: true,
1728 primary_key: true,
1729 },
1730 crate::ColumnSnapshot {
1731 name: "email".into(),
1732 column_type: "TEXT".into(),
1733 notnull: true,
1734 primary_key: false,
1735 },
1736 ],
1738 indexes: vec![],
1739 },
1740 crate::TableSnapshot {
1741 name: "Todo".into(),
1742 columns: vec![
1743 crate::ColumnSnapshot {
1744 name: "id".into(),
1745 column_type: "TEXT".into(),
1746 notnull: true,
1747 primary_key: true,
1748 },
1749 crate::ColumnSnapshot {
1750 name: "title".into(),
1751 column_type: "TEXT".into(),
1752 notnull: true,
1753 primary_key: false,
1754 },
1755 crate::ColumnSnapshot {
1756 name: "done".into(),
1757 column_type: "BOOLEAN".into(),
1758 notnull: true,
1759 primary_key: false,
1760 },
1761 crate::ColumnSnapshot {
1762 name: "userId".into(),
1763 column_type: "TEXT".into(),
1764 notnull: true,
1765 primary_key: false,
1766 },
1767 crate::ColumnSnapshot {
1768 name: "createdAt".into(),
1769 column_type: "TIMESTAMPTZ".into(),
1770 notnull: true,
1771 primary_key: false,
1772 },
1773 ],
1774 indexes: vec![crate::IndexSnapshot {
1775 name: "Todo_by_user".into(),
1776 columns: vec!["userId".into()],
1777 unique: false,
1778 }],
1779 },
1780 ],
1781 };
1782 let manifest = test_manifest();
1783 let plan = plan_from_snapshot(&snapshot, &manifest);
1784
1785 let add_fields: Vec<_> = plan
1786 .operations
1787 .iter()
1788 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1789 .collect();
1790 assert_eq!(add_fields.len(), 2); }
1792
1793 #[test]
1796 fn json_value_to_string_handles_all_types() {
1797 assert_eq!(
1798 json_value_to_string(&serde_json::Value::String("hello".into())),
1799 "hello"
1800 );
1801 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1802 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1803 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1804 assert_eq!(
1805 json_value_to_string(&serde_json::Value::Bool(false)),
1806 "false"
1807 );
1808 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1809 assert_eq!(
1811 json_value_to_string(&serde_json::json!([1, 2, 3])),
1812 "[1,2,3]"
1813 );
1814 assert_eq!(
1815 json_value_to_string(&serde_json::json!({"a": 1})),
1816 "{\"a\":1}"
1817 );
1818 }
1819
1820 #[test]
1821 fn generate_id_returns_hex_string() {
1822 let id = generate_id();
1823 assert!(!id.is_empty());
1824 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1826 }
1827
1828 #[test]
1829 fn generate_id_is_unique_across_calls() {
1830 let id1 = generate_id();
1831 let id2 = generate_id();
1832 assert_ne!(id1, id2);
1833 }
1834
1835 #[test]
1836 fn generate_id_is_lex_sortable() {
1837 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1840 let sorted = {
1841 let mut s = ids.clone();
1842 s.sort();
1843 s
1844 };
1845 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1846 let len0 = ids[0].len();
1849 assert!(ids.iter().all(|id| id.len() == len0));
1850 ids.dedup();
1851 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1852 }
1853
1854 #[test]
1855 fn build_insert_sql_simple() {
1856 let data = serde_json::json!({
1857 "email": "alice@example.com",
1858 "displayName": "Alice"
1859 });
1860 let (sql, values) = build_insert_sql("User", &data).unwrap();
1861
1862 assert!(sql.starts_with("INSERT INTO \"User\""));
1863 assert!(sql.contains("id"));
1864 assert!(sql.contains("$1"));
1865 assert!(sql.contains("$2"));
1866 assert!(sql.contains("$3"));
1867 match &values[0] {
1869 JsonParam::Text(s) => assert!(!s.is_empty()),
1870 other => panic!("expected Text id param, got {other:?}"),
1871 }
1872 assert_eq!(values.len(), 3); }
1874
1875 #[test]
1876 fn build_insert_sql_preserves_json_types() {
1877 let data = serde_json::json!({
1878 "n": 42,
1879 "f": 1.5,
1880 "b": true,
1881 "s": "hi",
1882 "z": null,
1883 });
1884 let (_sql, values) = build_insert_sql("T", &data).unwrap();
1885 let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
1887 assert!(matches!(kinds[0], JsonParam::Bool(true)));
1888 assert!(matches!(kinds[1], JsonParam::Float(_)));
1889 assert!(matches!(kinds[2], JsonParam::Int(42)));
1890 assert!(matches!(kinds[3], JsonParam::Text(_)));
1891 assert!(matches!(kinds[4], JsonParam::Null));
1892 }
1893
1894 #[test]
1895 fn build_insert_sql_quotes_column_names() {
1896 let data = serde_json::json!({"createdAt": "2026-01-01"});
1897 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1898 assert!(sql.contains("\"createdAt\""));
1899 assert!(sql.contains("\"Todo\""));
1900 }
1901
1902 #[test]
1903 fn build_insert_sql_rejects_non_object() {
1904 let data = serde_json::json!("not an object");
1905 let result = build_insert_sql("User", &data);
1906 assert!(result.is_err());
1907 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1908 }
1909
1910 #[test]
1911 fn build_update_sql_simple() {
1912 let data = serde_json::json!({
1913 "displayName": "Bob",
1914 "email": "bob@example.com"
1915 });
1916 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1917
1918 assert!(sql.starts_with("UPDATE \"User\" SET"));
1919 assert!(sql.contains("WHERE id = $1"));
1920 assert!(sql.contains("$2"));
1921 assert!(sql.contains("$3"));
1922 match &values[0] {
1923 JsonParam::Text(s) => assert_eq!(s, "abc123"),
1924 other => panic!("expected Text id param, got {other:?}"),
1925 }
1926 assert_eq!(values.len(), 3); }
1928
1929 #[test]
1930 fn build_update_sql_quotes_column_names() {
1931 let data = serde_json::json!({"displayName": "Carol"});
1932 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1933 assert!(sql.contains("\"displayName\" = $2"));
1934 }
1935
1936 #[test]
1937 fn build_update_sql_rejects_non_object() {
1938 let data = serde_json::json!(42);
1939 let result = build_update_sql("User", "id1", &data);
1940 assert!(result.is_err());
1941 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1942 }
1943
1944 #[test]
1945 fn build_update_sql_rejects_empty_object() {
1946 let data = serde_json::json!({});
1947 let err = build_update_sql("User", "id1", &data).unwrap_err();
1948 assert_eq!(err.code, "PG_INVALID_DATA");
1949 assert!(err.message.contains("at least one field"));
1950 }
1951}