1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48 for field in fields {
49 let col_type = pg_column_type(&field.field_type);
50 let not_null = if field.optional { "" } else { " NOT NULL" };
51 let unique = if field.unique { " UNIQUE" } else { "" };
52 columns.push(format!(
53 "{} {}{}{}",
54 quote_ident(&field.name),
55 col_type,
56 not_null,
57 unique
58 ));
59 }
60
61 format!(
62 "CREATE TABLE IF NOT EXISTS {} ({})",
63 quote_ident(entity_name),
64 columns.join(", ")
65 )
66}
67
68pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72 let col_type = pg_column_type(&field.field_type);
73 let unique = if field.unique { " UNIQUE" } else { "" };
74 format!(
75 "ALTER TABLE {} ADD COLUMN {} {}{}",
76 quote_ident(entity_name),
77 quote_ident(&field.name),
78 col_type,
79 unique
80 )
81}
82
83pub fn create_index_sql(
85 entity_name: &str,
86 index_name: &str,
87 fields: &[String],
88 unique: bool,
89) -> String {
90 let unique_str = if unique { "UNIQUE " } else { "" };
91 let full_index_name = format!("{}_{}", entity_name, index_name);
92 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93 format!(
94 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95 unique_str,
96 quote_ident(&full_index_name),
97 quote_ident(entity_name),
98 quoted_fields.join(", ")
99 )
100}
101
102pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112 let mut operations = Vec::new();
114
115 for entity in &target.entities {
116 let fields: Vec<FieldSpec> = entity
117 .fields
118 .iter()
119 .map(|f| FieldSpec {
120 name: f.name.clone(),
121 field_type: f.field_type.clone(),
122 optional: f.optional,
123 unique: f.unique,
124 })
125 .collect();
126
127 operations.push(SchemaOperation::CreateEntity {
128 name: entity.name.clone(),
129 fields,
130 });
131
132 for index in &entity.indexes {
133 operations.push(SchemaOperation::AddIndex {
134 entity: entity.name.clone(),
135 name: index.name.clone(),
136 fields: index.fields.clone(),
137 unique: index.unique,
138 });
139 }
140 }
141
142 if operations.is_empty() {
143 operations.push(SchemaOperation::Noop);
144 }
145
146 Ok(SchemaPlan { operations })
147 }
148
149 }
151
152pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155 let mut statements = Vec::new();
156
157 for op in &plan.operations {
158 match op {
159 SchemaOperation::CreateEntity { name, fields } => {
160 statements.push(create_table_sql(name, fields));
161 }
162 SchemaOperation::AddField { entity, field } => {
163 statements.push(add_column_sql(entity, field));
164 }
165 SchemaOperation::AddIndex {
166 entity,
167 name,
168 fields,
169 unique,
170 } => {
171 statements.push(create_index_sql(entity, name, fields, *unique));
172 }
173 SchemaOperation::Noop => {}
174 other => {
175 return Err(StorageError {
176 code: "PG_OP_UNSUPPORTED".into(),
177 message: format!("Operation not supported by Postgres adapter: {other:?}"),
178 });
179 }
180 }
181 }
182
183 Ok(statements)
184}
185
186pub const INTROSPECT_TABLES_SQL: &str = "\
195 SELECT table_name \
196 FROM information_schema.tables \
197 WHERE table_schema = 'public' \
198 AND table_type = 'BASE TABLE' \
199 AND table_name NOT LIKE '_pylon_%' \
200 ORDER BY table_name";
201
202pub const INTROSPECT_COLUMNS_SQL: &str = "\
205 SELECT column_name, data_type, is_nullable, \
206 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
207 JOIN information_schema.key_column_usage kcu \
208 ON tc.constraint_name = kcu.constraint_name \
209 WHERE tc.table_name = c.table_name \
210 AND kcu.column_name = c.column_name \
211 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
212 FROM information_schema.columns c \
213 WHERE table_schema = 'public' AND table_name = $1 \
214 ORDER BY ordinal_position";
215
216pub const INTROSPECT_INDEXES_SQL: &str = "\
219 SELECT i.relname as index_name, \
220 ix.indisunique as is_unique, \
221 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
222 FROM pg_index ix \
223 JOIN pg_class t ON t.oid = ix.indrelid \
224 JOIN pg_class i ON i.oid = ix.indexrelid \
225 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
226 JOIN pg_namespace n ON n.oid = t.relnamespace \
227 WHERE n.nspname = 'public' \
228 AND t.relname = $1 \
229 AND NOT ix.indisprimary \
230 GROUP BY i.relname, ix.indisunique \
231 ORDER BY i.relname";
232
233pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
236 crate::plan_from_snapshot(snapshot, target)
237}
238
239pub fn generate_id() -> String {
252 use std::sync::atomic::{AtomicU32, Ordering};
253 use std::time::{SystemTime, UNIX_EPOCH};
254 static COUNTER: AtomicU32 = AtomicU32::new(0);
255 let ts = SystemTime::now()
256 .duration_since(UNIX_EPOCH)
257 .unwrap_or_default()
258 .as_nanos();
259 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
260 format!("{ts:032x}{seq:08x}")
261}
262
263pub fn json_value_to_string(val: &serde_json::Value) -> String {
265 match val {
266 serde_json::Value::String(s) => s.clone(),
267 serde_json::Value::Number(n) => n.to_string(),
268 serde_json::Value::Bool(b) => b.to_string(),
269 serde_json::Value::Null => String::new(),
270 other => other.to_string(),
271 }
272}
273
274pub fn build_insert_sql(
277 entity: &str,
278 data: &serde_json::Value,
279) -> Result<(String, Vec<String>), StorageError> {
280 let id = generate_id();
281 let obj = data.as_object().ok_or_else(|| StorageError {
282 code: "PG_INVALID_DATA".into(),
283 message: "Insert data must be a JSON object".into(),
284 })?;
285
286 let mut col_names = vec!["id".to_string()];
287 let mut placeholders = vec!["$1".to_string()];
288 let mut values: Vec<String> = vec![id];
289
290 for (i, (key, val)) in obj.iter().enumerate() {
291 col_names.push(quote_ident(key));
292 placeholders.push(format!("${}", i + 2));
293 values.push(json_value_to_string(val));
294 }
295
296 let sql = format!(
297 "INSERT INTO {} ({}) VALUES ({})",
298 quote_ident(entity),
299 col_names.join(", "),
300 placeholders.join(", ")
301 );
302
303 Ok((sql, values))
304}
305
306pub fn build_update_sql(
309 entity: &str,
310 id: &str,
311 data: &serde_json::Value,
312) -> Result<(String, Vec<String>), StorageError> {
313 let obj = data.as_object().ok_or_else(|| StorageError {
314 code: "PG_INVALID_DATA".into(),
315 message: "Update data must be a JSON object".into(),
316 })?;
317
318 if obj.is_empty() {
319 return Err(StorageError {
320 code: "PG_INVALID_DATA".into(),
321 message: "Update data must contain at least one field".into(),
322 });
323 }
324
325 let mut set_clauses = Vec::new();
326 let mut values: Vec<String> = vec![id.to_string()];
327
328 for (i, (key, val)) in obj.iter().enumerate() {
329 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
330 values.push(json_value_to_string(val));
331 }
332
333 let sql = format!(
334 "UPDATE {} SET {} WHERE id = $1",
335 quote_ident(entity),
336 set_clauses.join(", ")
337 );
338
339 Ok((sql, values))
340}
341
342#[cfg(feature = "postgres-live")]
347pub mod live {
348 use super::*;
349 use crate::{
350 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
351 };
352
353 pub struct LivePostgresAdapter {
355 client: postgres::Client,
356 }
357
358 impl LivePostgresAdapter {
359 pub fn connect(url: &str) -> Result<Self, StorageError> {
361 let client =
362 postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
363 code: "PG_CONNECT_FAILED".into(),
364 message: format!("Failed to connect to Postgres: {e}"),
365 })?;
366 Ok(Self { client })
367 }
368
369 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
371 let table_rows = self
372 .client
373 .query(INTROSPECT_TABLES_SQL, &[])
374 .map_err(pg_err)?;
375
376 let mut tables = Vec::new();
377 for row in &table_rows {
378 let table_name: String = row.get(0);
379 let columns = self.read_columns(&table_name)?;
380 let indexes = self.read_indexes(&table_name)?;
381 tables.push(TableSnapshot {
382 name: table_name,
383 columns,
384 indexes,
385 });
386 }
387
388 Ok(SchemaSnapshot { tables })
389 }
390
391 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
392 let rows = self
393 .client
394 .query(INTROSPECT_COLUMNS_SQL, &[&table])
395 .map_err(pg_err)?;
396
397 let mut columns = Vec::new();
398 for row in &rows {
399 let name: String = row.get(0);
400 let data_type: String = row.get(1);
401 let is_nullable: String = row.get(2);
402 let is_pk: i64 = row.get(3);
403 columns.push(ColumnSnapshot {
404 name,
405 column_type: data_type,
406 notnull: is_nullable == "NO",
407 primary_key: is_pk > 0,
408 });
409 }
410 Ok(columns)
411 }
412
413 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
414 let rows = self
415 .client
416 .query(INTROSPECT_INDEXES_SQL, &[&table])
417 .map_err(pg_err)?;
418
419 let mut indexes = Vec::new();
420 for row in &rows {
421 let name: String = row.get(0);
422 let unique: bool = row.get(1);
423 let columns: Vec<String> = row.get(2);
424 indexes.push(IndexSnapshot {
425 name,
426 columns,
427 unique,
428 });
429 }
430 Ok(indexes)
431 }
432
433 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
435 let snapshot = self.read_schema()?;
436 Ok(crate::plan_from_snapshot(&snapshot, target))
437 }
438 }
439
440 impl StorageAdapter for LivePostgresAdapter {
441 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
442 Err(StorageError {
443 code: "PG_PLAN_NEEDS_MUTABLE".into(),
444 message: "Use plan_from_live() instead for live Postgres planning".into(),
445 })
446 }
447
448 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
449 Err(StorageError {
450 code: "PG_APPLY_USE_METHOD".into(),
451 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
452 })
453 }
454 }
455
456 impl LivePostgresAdapter {
457 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
459 let statements = plan_to_sql(plan)?;
460 for sql in &statements {
461 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
462 }
463 Ok(())
464 }
465
466 pub fn insert(
468 &mut self,
469 entity: &str,
470 data: &serde_json::Value,
471 ) -> Result<String, StorageError> {
472 let (sql, values) = build_insert_sql(entity, data)?;
473 let id = values[0].clone();
474
475 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
476 .iter()
477 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
478 .collect();
479
480 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
481 Ok(id)
482 }
483
484 pub fn get_by_id(
486 &mut self,
487 entity: &str,
488 id: &str,
489 ) -> Result<Option<serde_json::Value>, StorageError> {
490 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
491 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
492
493 match rows.first() {
494 Some(row) => Ok(Some(row_to_json(row))),
495 None => Ok(None),
496 }
497 }
498
499 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
501 let sql = format!("SELECT * FROM {}", quote_ident(entity));
502 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
503
504 Ok(rows.iter().map(row_to_json).collect())
505 }
506
507 pub fn list_after(
511 &mut self,
512 entity: &str,
513 after: Option<&str>,
514 limit: usize,
515 ) -> Result<Vec<serde_json::Value>, StorageError> {
516 let capped: i64 = limit.min(10_000) as i64;
519 let sql = match after {
520 Some(_) => format!(
521 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
522 quote_ident(entity)
523 ),
524 None => format!(
525 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
526 quote_ident(entity)
527 ),
528 };
529 let rows = match after {
530 Some(cursor) => self
531 .client
532 .query(sql.as_str(), &[&cursor, &capped])
533 .map_err(pg_err)?,
534 None => self
535 .client
536 .query(sql.as_str(), &[&capped])
537 .map_err(pg_err)?,
538 };
539 Ok(rows.iter().map(row_to_json).collect())
540 }
541
542 pub fn update(
544 &mut self,
545 entity: &str,
546 id: &str,
547 data: &serde_json::Value,
548 ) -> Result<bool, StorageError> {
549 let (sql, values) = build_update_sql(entity, id, data)?;
550
551 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
552 .iter()
553 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
554 .collect();
555
556 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
557 Ok(rows_affected > 0)
558 }
559
560 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
562 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
563 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
564 Ok(rows_affected > 0)
565 }
566
567 pub fn lookup_field(
571 &mut self,
572 entity: &str,
573 field: &str,
574 value: &str,
575 ) -> Result<Option<serde_json::Value>, StorageError> {
576 let sql = format!(
577 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
578 quote_ident(entity),
579 quote_ident(field),
580 );
581 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
582 Ok(rows.first().map(row_to_json))
583 }
584
585 pub fn query_filtered(
595 &mut self,
596 entity: &str,
597 filter: &serde_json::Value,
598 valid_columns: &[String],
599 ) -> Result<Vec<serde_json::Value>, StorageError> {
600 let empty = serde_json::Map::new();
601 let obj = filter.as_object().unwrap_or(&empty);
602
603 let validate = |col: &str| -> Result<(), StorageError> {
604 if col == "id" || valid_columns.iter().any(|c| c == col) {
605 Ok(())
606 } else {
607 Err(StorageError {
608 code: "UNKNOWN_COLUMN".into(),
609 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
610 })
611 }
612 };
613
614 let mut where_clauses: Vec<String> = Vec::new();
615 let mut order_clause = String::new();
616 let mut limit_clause = String::new();
617 let mut offset_clause = String::new();
618 let mut planned: Vec<(String, String, String)> = Vec::new();
621
622 for (key, val) in obj {
623 match key.as_str() {
624 "$order" => {
625 if let Some(ord) = val.as_object() {
626 let mut parts = Vec::new();
627 for (col, dir) in ord {
628 validate(col)?;
629 let d = match dir.as_str().unwrap_or("asc") {
630 "desc" | "DESC" => "DESC",
631 _ => "ASC",
632 };
633 parts.push(format!("{} {d}", quote_ident(col)));
634 }
635 if !parts.is_empty() {
636 order_clause = format!(" ORDER BY {}", parts.join(", "));
637 }
638 }
639 }
640 "$limit" => {
641 if let Some(n) = val.as_u64() {
642 limit_clause = format!(" LIMIT {}", n);
643 }
644 }
645 "$offset" => {
646 if let Some(n) = val.as_u64() {
647 offset_clause = format!(" OFFSET {}", n);
648 }
649 }
650 field => {
651 validate(field)?;
652 match val {
653 serde_json::Value::Object(ops) => {
654 for (op, v) in ops {
655 match op.as_str() {
656 "$gt" => {
657 planned.push((field.into(), ">".into(), value_to_pg(v)))
658 }
659 "$gte" => planned.push((
660 field.into(),
661 ">=".into(),
662 value_to_pg(v),
663 )),
664 "$lt" => {
665 planned.push((field.into(), "<".into(), value_to_pg(v)))
666 }
667 "$lte" => planned.push((
668 field.into(),
669 "<=".into(),
670 value_to_pg(v),
671 )),
672 "$like" => planned.push((
673 field.into(),
674 "LIKE".into(),
675 value_to_pg(v),
676 )),
677 "$in" => {
678 if let Some(arr) = v.as_array() {
679 let placeholders: Vec<String> = (0..arr.len())
680 .map(|i| format!("${}", planned.len() + 1 + i))
681 .collect();
682 where_clauses.push(format!(
683 "{} IN ({})",
684 quote_ident(field),
685 placeholders.join(", "),
686 ));
687 for x in arr {
688 planned.push((
689 format!("__inline_{}", planned.len()),
690 "__INLINE__".into(),
691 value_to_pg(x),
692 ));
693 }
694 }
695 }
696 _ => {}
697 }
698 }
699 }
700 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
701 }
702 }
703 }
704 }
705
706 let mut params: Vec<String> = Vec::with_capacity(planned.len());
708 for (field, op, v) in &planned {
709 if op == "__INLINE__" {
710 params.push(v.clone());
712 } else {
713 let placeholder = format!("${}", params.len() + 1);
714 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
715 params.push(v.clone());
716 }
717 }
718
719 let where_sql = if where_clauses.is_empty() {
720 String::new()
721 } else {
722 format!(" WHERE {}", where_clauses.join(" AND "))
723 };
724 let sql = format!(
725 "SELECT * FROM {}{}{}{}{}",
726 quote_ident(entity),
727 where_sql,
728 order_clause,
729 limit_clause,
730 offset_clause,
731 );
732
733 let pg_params: Vec<&(dyn postgres::types::ToSql + Sync)> = params
734 .iter()
735 .map(|s| s as &(dyn postgres::types::ToSql + Sync))
736 .collect();
737
738 let rows = self
739 .client
740 .query(sql.as_str(), &pg_params)
741 .map_err(pg_err)?;
742 Ok(rows.iter().map(row_to_json).collect())
743 }
744 }
745
746 pub enum TxOp<'a> {
748 Insert {
749 entity: &'a str,
750 data: &'a serde_json::Value,
751 },
752 Update {
753 entity: &'a str,
754 id: &'a str,
755 data: &'a serde_json::Value,
756 },
757 Delete {
758 entity: &'a str,
759 id: &'a str,
760 },
761 }
762
763 #[derive(Debug, Clone)]
765 pub enum TxResult {
766 Inserted(String),
767 Updated(bool),
768 Deleted(bool),
769 }
770
771 impl LivePostgresAdapter {
772 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
777 let mut tx = self.client.transaction().map_err(pg_err)?;
778 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
779
780 for op in ops {
781 match op {
782 TxOp::Insert { entity, data } => {
783 let (sql, values) = build_insert_sql(entity, data)?;
784 let id = values[0].clone();
785 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
786 .iter()
787 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
788 .collect();
789 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
790 results.push(TxResult::Inserted(id));
791 }
792 TxOp::Update { entity, id, data } => {
793 let (sql, values) = build_update_sql(entity, id, data)?;
794 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
795 .iter()
796 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
797 .collect();
798 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
799 results.push(TxResult::Updated(n > 0));
800 }
801 TxOp::Delete { entity, id } => {
802 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
803 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
804 results.push(TxResult::Deleted(n > 0));
805 }
806 }
807 }
808
809 tx.commit().map_err(pg_err)?;
810 Ok(results)
811 }
812 }
813
814 fn value_to_pg(v: &serde_json::Value) -> String {
815 match v {
816 serde_json::Value::String(s) => s.clone(),
817 serde_json::Value::Number(n) => n.to_string(),
818 serde_json::Value::Bool(b) => b.to_string(),
819 serde_json::Value::Null => String::new(),
820 other => other.to_string(),
821 }
822 }
823
824 fn row_to_json(row: &postgres::Row) -> serde_json::Value {
825 use postgres::types::Type;
826 let mut obj = serde_json::Map::new();
827 for (i, col) in row.columns().iter().enumerate() {
828 let name = col.name().to_string();
829
830 let value: serde_json::Value = match *col.type_() {
841 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
842 .flatten()
843 .map(serde_json::Value::Bool)
844 .unwrap_or(serde_json::Value::Null),
845 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
846 .flatten()
847 .map(|v| serde_json::Value::Number(v.into()))
848 .unwrap_or(serde_json::Value::Null),
849 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
850 .flatten()
851 .map(|v| serde_json::Value::Number(v.into()))
852 .unwrap_or(serde_json::Value::Null),
853 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
854 .flatten()
855 .map(|v| serde_json::Value::Number(v.into()))
856 .unwrap_or(serde_json::Value::Null),
857 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
858 .flatten()
859 .and_then(|v| serde_json::Number::from_f64(v as f64))
860 .map(serde_json::Value::Number)
861 .unwrap_or(serde_json::Value::Null),
862 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
863 .flatten()
864 .and_then(serde_json::Number::from_f64)
865 .map(serde_json::Value::Number)
866 .unwrap_or(serde_json::Value::Null),
867 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
868 .flatten()
869 .unwrap_or(serde_json::Value::Null),
870 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
871 .flatten()
872 .map(|b| serde_json::Value::String(b64(&b)))
873 .unwrap_or(serde_json::Value::Null),
874 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
875 try_get_or_null::<Option<String>>(row, i)
876 .flatten()
877 .map(serde_json::Value::String)
878 .unwrap_or(serde_json::Value::Null)
879 }
880 _ => {
881 match row.try_get::<_, Option<String>>(i) {
886 Ok(Some(s)) => serde_json::Value::String(s),
887 Ok(None) => serde_json::Value::Null,
888 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
889 Ok(Some(bytes)) => serde_json::Value::String(
890 String::from_utf8_lossy(&bytes).into_owned(),
891 ),
892 _ => serde_json::Value::Null,
893 },
894 }
895 }
896 };
897 obj.insert(name, value);
898 }
899 serde_json::Value::Object(obj)
900 }
901
902 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
903 where
904 T: postgres::types::FromSql<'a>,
905 {
906 match row.try_get::<_, T>(i) {
907 Ok(v) => Some(v),
908 Err(e) => {
909 tracing::warn!(
910 "[postgres] decode failed for column {} ({}): {e}",
911 i,
912 row.columns()[i].name()
913 );
914 None
915 }
916 }
917 }
918
919 fn b64(bytes: &[u8]) -> String {
922 const TABLE: &[u8; 64] =
923 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
924 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
925 let chunks = bytes.chunks(3);
926 for chunk in chunks {
927 let b = [
928 chunk.first().copied().unwrap_or(0),
929 chunk.get(1).copied().unwrap_or(0),
930 chunk.get(2).copied().unwrap_or(0),
931 ];
932 out.push(TABLE[(b[0] >> 2) as usize] as char);
933 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
934 if chunk.len() > 1 {
935 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
936 } else {
937 out.push('=');
938 }
939 if chunk.len() > 2 {
940 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
941 } else {
942 out.push('=');
943 }
944 }
945 out
946 }
947
948 fn pg_err(e: postgres::Error) -> StorageError {
949 StorageError {
950 code: "PG_QUERY_FAILED".into(),
951 message: format!("Postgres query failed: {e}"),
952 }
953 }
954}
955
956#[cfg(test)]
961mod tests {
962 use super::*;
963
964 fn test_manifest() -> AppManifest {
965 serde_json::from_str(include_str!(
966 "../../../examples/todo-app/pylon.manifest.json"
967 ))
968 .unwrap()
969 }
970
971 #[test]
972 fn pg_type_mapping() {
973 assert_eq!(pg_column_type("string"), "TEXT");
974 assert_eq!(pg_column_type("int"), "INTEGER");
975 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
976 assert_eq!(pg_column_type("bool"), "BOOLEAN");
977 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
978 assert_eq!(pg_column_type("richtext"), "TEXT");
979 assert_eq!(pg_column_type("id(User)"), "TEXT");
980 }
981
982 #[test]
983 fn quote_ident_simple() {
984 assert_eq!(quote_ident("User"), "\"User\"");
985 assert_eq!(quote_ident("email"), "\"email\"");
986 }
987
988 #[test]
989 fn quote_ident_escapes_embedded_double_quotes() {
990 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
991 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
992 }
993
994 #[test]
995 fn create_table_sql_basic() {
996 let fields = vec![
997 FieldSpec {
998 name: "email".into(),
999 field_type: "string".into(),
1000 optional: false,
1001 unique: true,
1002 },
1003 FieldSpec {
1004 name: "age".into(),
1005 field_type: "int".into(),
1006 optional: true,
1007 unique: false,
1008 },
1009 ];
1010 let sql = create_table_sql("User", &fields);
1011 assert_eq!(
1012 sql,
1013 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1014 );
1015 }
1016
1017 #[test]
1018 fn create_table_sql_escapes_identifiers() {
1019 let fields = vec![FieldSpec {
1020 name: "col\"x".into(),
1021 field_type: "string".into(),
1022 optional: false,
1023 unique: false,
1024 }];
1025 let sql = create_table_sql("my\"table", &fields);
1026 assert!(sql.contains("\"my\"\"table\""));
1027 assert!(sql.contains("\"col\"\"x\""));
1028 }
1029
1030 #[test]
1031 fn create_index_sql_unique() {
1032 let sql = create_index_sql("User", "by_email", &["email".into()], true);
1033 assert_eq!(
1034 sql,
1035 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1036 );
1037 }
1038
1039 #[test]
1040 fn create_index_sql_non_unique() {
1041 let sql = create_index_sql("Todo", "by_author", &["authorId".into()], false);
1042 assert_eq!(
1043 sql,
1044 "CREATE INDEX IF NOT EXISTS \"Todo_by_author\" ON \"Todo\" (\"authorId\")"
1045 );
1046 }
1047
1048 #[test]
1049 fn add_column_sql_basic() {
1050 let field = FieldSpec {
1051 name: "bio".into(),
1052 field_type: "string".into(),
1053 optional: true,
1054 unique: false,
1055 };
1056 let sql = add_column_sql("User", &field);
1057 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1058 }
1059
1060 #[test]
1061 fn plan_from_manifest() {
1062 let adapter = PostgresAdapter;
1063 let manifest = test_manifest();
1064 let plan = adapter.plan_schema(&manifest).unwrap();
1065
1066 assert!(plan.operations.iter().any(|op| matches!(
1068 op,
1069 SchemaOperation::CreateEntity { name, .. } if name == "User"
1070 )));
1071 assert!(plan.operations.iter().any(|op| matches!(
1072 op,
1073 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1074 )));
1075 assert!(plan.operations.iter().any(|op| matches!(
1076 op,
1077 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_author"
1078 )));
1079 }
1080
1081 #[test]
1082 fn plan_to_sql_produces_statements() {
1083 let adapter = PostgresAdapter;
1084 let manifest = test_manifest();
1085 let plan = adapter.plan_schema(&manifest).unwrap();
1086 let stmts = plan_to_sql(&plan).unwrap();
1087
1088 assert_eq!(stmts.len(), 3); assert!(stmts[0].starts_with("CREATE TABLE"));
1090 assert!(stmts[1].starts_with("CREATE TABLE"));
1091 assert!(stmts[2].starts_with("CREATE INDEX"));
1092 }
1093
1094 #[test]
1095 fn plan_to_sql_rejects_unsupported() {
1096 let plan = SchemaPlan {
1097 operations: vec![SchemaOperation::RemoveEntity {
1098 name: "User".into(),
1099 }],
1100 };
1101 let result = plan_to_sql(&plan);
1102 assert!(result.is_err());
1103 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1104 }
1105
1106 #[test]
1107 fn apply_not_implemented() {
1108 let adapter = PostgresAdapter;
1109 let plan = SchemaPlan {
1110 operations: vec![SchemaOperation::Noop],
1111 };
1112 let result = adapter.apply_schema(&plan);
1113 assert!(result.is_err());
1114 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1115 }
1116
1117 #[test]
1118 fn sql_uses_quoted_identifiers() {
1119 let fields = vec![FieldSpec {
1120 name: "createdAt".into(),
1121 field_type: "datetime".into(),
1122 optional: false,
1123 unique: false,
1124 }];
1125 let sql = create_table_sql("User", &fields);
1126 assert!(sql.contains("\"User\""));
1128 assert!(sql.contains("\"createdAt\""));
1129 assert!(sql.contains("TIMESTAMPTZ"));
1130 }
1131
1132 #[test]
1135 fn introspect_sql_constants_are_valid() {
1136 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1138 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1139 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1140 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1141 }
1142
1143 #[test]
1146 fn plan_from_empty_snapshot_creates_all() {
1147 let snapshot = crate::SchemaSnapshot { tables: vec![] };
1148 let manifest = test_manifest();
1149 let plan = plan_from_snapshot(&snapshot, &manifest);
1150
1151 assert!(plan.operations.iter().any(|op| matches!(
1152 op,
1153 SchemaOperation::CreateEntity { name, .. } if name == "User"
1154 )));
1155 assert!(plan.operations.iter().any(|op| matches!(
1156 op,
1157 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1158 )));
1159 assert!(plan.operations.iter().any(|op| matches!(
1160 op,
1161 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_author"
1162 )));
1163 }
1164
1165 #[test]
1166 fn plan_from_full_snapshot_is_noop() {
1167 let snapshot = crate::SchemaSnapshot {
1168 tables: vec![
1169 crate::TableSnapshot {
1170 name: "User".into(),
1171 columns: vec![
1172 crate::ColumnSnapshot {
1173 name: "id".into(),
1174 column_type: "TEXT".into(),
1175 notnull: true,
1176 primary_key: true,
1177 },
1178 crate::ColumnSnapshot {
1179 name: "email".into(),
1180 column_type: "TEXT".into(),
1181 notnull: true,
1182 primary_key: false,
1183 },
1184 crate::ColumnSnapshot {
1185 name: "displayName".into(),
1186 column_type: "TEXT".into(),
1187 notnull: true,
1188 primary_key: false,
1189 },
1190 crate::ColumnSnapshot {
1191 name: "createdAt".into(),
1192 column_type: "TIMESTAMPTZ".into(),
1193 notnull: true,
1194 primary_key: false,
1195 },
1196 ],
1197 indexes: vec![],
1198 },
1199 crate::TableSnapshot {
1200 name: "Todo".into(),
1201 columns: vec![
1202 crate::ColumnSnapshot {
1203 name: "id".into(),
1204 column_type: "TEXT".into(),
1205 notnull: true,
1206 primary_key: true,
1207 },
1208 crate::ColumnSnapshot {
1209 name: "title".into(),
1210 column_type: "TEXT".into(),
1211 notnull: true,
1212 primary_key: false,
1213 },
1214 crate::ColumnSnapshot {
1215 name: "done".into(),
1216 column_type: "BOOLEAN".into(),
1217 notnull: true,
1218 primary_key: false,
1219 },
1220 crate::ColumnSnapshot {
1221 name: "authorId".into(),
1222 column_type: "TEXT".into(),
1223 notnull: true,
1224 primary_key: false,
1225 },
1226 crate::ColumnSnapshot {
1227 name: "createdAt".into(),
1228 column_type: "TIMESTAMPTZ".into(),
1229 notnull: true,
1230 primary_key: false,
1231 },
1232 ],
1233 indexes: vec![crate::IndexSnapshot {
1234 name: "Todo_by_author".into(),
1235 columns: vec!["authorId".into()],
1236 unique: false,
1237 }],
1238 },
1239 ],
1240 };
1241 let manifest = test_manifest();
1242 let plan = plan_from_snapshot(&snapshot, &manifest);
1243 assert!(plan.is_empty());
1244 }
1245
1246 #[test]
1247 fn plan_detects_missing_column_in_snapshot() {
1248 let snapshot = crate::SchemaSnapshot {
1249 tables: vec![
1250 crate::TableSnapshot {
1251 name: "User".into(),
1252 columns: vec![
1253 crate::ColumnSnapshot {
1254 name: "id".into(),
1255 column_type: "TEXT".into(),
1256 notnull: true,
1257 primary_key: true,
1258 },
1259 crate::ColumnSnapshot {
1260 name: "email".into(),
1261 column_type: "TEXT".into(),
1262 notnull: true,
1263 primary_key: false,
1264 },
1265 ],
1267 indexes: vec![],
1268 },
1269 crate::TableSnapshot {
1270 name: "Todo".into(),
1271 columns: vec![
1272 crate::ColumnSnapshot {
1273 name: "id".into(),
1274 column_type: "TEXT".into(),
1275 notnull: true,
1276 primary_key: true,
1277 },
1278 crate::ColumnSnapshot {
1279 name: "title".into(),
1280 column_type: "TEXT".into(),
1281 notnull: true,
1282 primary_key: false,
1283 },
1284 crate::ColumnSnapshot {
1285 name: "done".into(),
1286 column_type: "BOOLEAN".into(),
1287 notnull: true,
1288 primary_key: false,
1289 },
1290 crate::ColumnSnapshot {
1291 name: "authorId".into(),
1292 column_type: "TEXT".into(),
1293 notnull: true,
1294 primary_key: false,
1295 },
1296 crate::ColumnSnapshot {
1297 name: "createdAt".into(),
1298 column_type: "TIMESTAMPTZ".into(),
1299 notnull: true,
1300 primary_key: false,
1301 },
1302 ],
1303 indexes: vec![crate::IndexSnapshot {
1304 name: "Todo_by_author".into(),
1305 columns: vec!["authorId".into()],
1306 unique: false,
1307 }],
1308 },
1309 ],
1310 };
1311 let manifest = test_manifest();
1312 let plan = plan_from_snapshot(&snapshot, &manifest);
1313
1314 let add_fields: Vec<_> = plan
1315 .operations
1316 .iter()
1317 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1318 .collect();
1319 assert_eq!(add_fields.len(), 2); }
1321
1322 #[test]
1325 fn json_value_to_string_handles_all_types() {
1326 assert_eq!(
1327 json_value_to_string(&serde_json::Value::String("hello".into())),
1328 "hello"
1329 );
1330 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1331 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1332 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1333 assert_eq!(
1334 json_value_to_string(&serde_json::Value::Bool(false)),
1335 "false"
1336 );
1337 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1338 assert_eq!(
1340 json_value_to_string(&serde_json::json!([1, 2, 3])),
1341 "[1,2,3]"
1342 );
1343 assert_eq!(
1344 json_value_to_string(&serde_json::json!({"a": 1})),
1345 "{\"a\":1}"
1346 );
1347 }
1348
1349 #[test]
1350 fn generate_id_returns_hex_string() {
1351 let id = generate_id();
1352 assert!(!id.is_empty());
1353 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1355 }
1356
1357 #[test]
1358 fn generate_id_is_unique_across_calls() {
1359 let id1 = generate_id();
1360 let id2 = generate_id();
1361 assert_ne!(id1, id2);
1362 }
1363
1364 #[test]
1365 fn generate_id_is_lex_sortable() {
1366 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1369 let sorted = {
1370 let mut s = ids.clone();
1371 s.sort();
1372 s
1373 };
1374 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1375 let len0 = ids[0].len();
1378 assert!(ids.iter().all(|id| id.len() == len0));
1379 ids.dedup();
1380 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1381 }
1382
1383 #[test]
1384 fn build_insert_sql_simple() {
1385 let data = serde_json::json!({
1386 "email": "alice@example.com",
1387 "displayName": "Alice"
1388 });
1389 let (sql, values) = build_insert_sql("User", &data).unwrap();
1390
1391 assert!(sql.starts_with("INSERT INTO \"User\""));
1392 assert!(sql.contains("id"));
1393 assert!(sql.contains("$1"));
1394 assert!(sql.contains("$2"));
1395 assert!(sql.contains("$3"));
1396 assert!(!values[0].is_empty());
1398 assert_eq!(values.len(), 3); }
1400
1401 #[test]
1402 fn build_insert_sql_quotes_column_names() {
1403 let data = serde_json::json!({"createdAt": "2026-01-01"});
1404 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1405 assert!(sql.contains("\"createdAt\""));
1406 assert!(sql.contains("\"Todo\""));
1407 }
1408
1409 #[test]
1410 fn build_insert_sql_rejects_non_object() {
1411 let data = serde_json::json!("not an object");
1412 let result = build_insert_sql("User", &data);
1413 assert!(result.is_err());
1414 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1415 }
1416
1417 #[test]
1418 fn build_update_sql_simple() {
1419 let data = serde_json::json!({
1420 "displayName": "Bob",
1421 "email": "bob@example.com"
1422 });
1423 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1424
1425 assert!(sql.starts_with("UPDATE \"User\" SET"));
1426 assert!(sql.contains("WHERE id = $1"));
1427 assert!(sql.contains("$2"));
1428 assert!(sql.contains("$3"));
1429 assert_eq!(values[0], "abc123");
1430 assert_eq!(values.len(), 3); }
1432
1433 #[test]
1434 fn build_update_sql_quotes_column_names() {
1435 let data = serde_json::json!({"displayName": "Carol"});
1436 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1437 assert!(sql.contains("\"displayName\" = $2"));
1438 }
1439
1440 #[test]
1441 fn build_update_sql_rejects_non_object() {
1442 let data = serde_json::json!(42);
1443 let result = build_update_sql("User", "id1", &data);
1444 assert!(result.is_err());
1445 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1446 }
1447
1448 #[test]
1449 fn build_update_sql_rejects_empty_object() {
1450 let data = serde_json::json!({});
1451 let err = build_update_sql("User", "id1", &data).unwrap_err();
1452 assert_eq!(err.code, "PG_INVALID_DATA");
1453 assert!(err.message.contains("at least one field"));
1454 }
1455}