1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48 for field in fields {
49 let col_type = pg_column_type(&field.field_type);
50 let not_null = if field.optional { "" } else { " NOT NULL" };
51 let unique = if field.unique { " UNIQUE" } else { "" };
52 columns.push(format!(
53 "{} {}{}{}",
54 quote_ident(&field.name),
55 col_type,
56 not_null,
57 unique
58 ));
59 }
60
61 format!(
62 "CREATE TABLE IF NOT EXISTS {} ({})",
63 quote_ident(entity_name),
64 columns.join(", ")
65 )
66}
67
68pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72 let col_type = pg_column_type(&field.field_type);
73 let unique = if field.unique { " UNIQUE" } else { "" };
74 format!(
75 "ALTER TABLE {} ADD COLUMN {} {}{}",
76 quote_ident(entity_name),
77 quote_ident(&field.name),
78 col_type,
79 unique
80 )
81}
82
83pub fn create_index_sql(
85 entity_name: &str,
86 index_name: &str,
87 fields: &[String],
88 unique: bool,
89) -> String {
90 let unique_str = if unique { "UNIQUE " } else { "" };
91 let full_index_name = format!("{}_{}", entity_name, index_name);
92 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93 format!(
94 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95 unique_str,
96 quote_ident(&full_index_name),
97 quote_ident(entity_name),
98 quoted_fields.join(", ")
99 )
100}
101
102pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112 let mut operations = Vec::new();
114
115 for entity in &target.entities {
116 let fields: Vec<FieldSpec> = entity
117 .fields
118 .iter()
119 .map(|f| FieldSpec {
120 name: f.name.clone(),
121 field_type: f.field_type.clone(),
122 optional: f.optional,
123 unique: f.unique,
124 })
125 .collect();
126
127 operations.push(SchemaOperation::CreateEntity {
128 name: entity.name.clone(),
129 fields,
130 });
131
132 for index in &entity.indexes {
133 operations.push(SchemaOperation::AddIndex {
134 entity: entity.name.clone(),
135 name: index.name.clone(),
136 fields: index.fields.clone(),
137 unique: index.unique,
138 });
139 }
140 }
141
142 if operations.is_empty() {
143 operations.push(SchemaOperation::Noop);
144 }
145
146 Ok(SchemaPlan { operations })
147 }
148
149 }
151
152pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155 let mut statements = Vec::new();
156
157 for op in &plan.operations {
158 match op {
159 SchemaOperation::CreateEntity { name, fields } => {
160 statements.push(create_table_sql(name, fields));
161 }
162 SchemaOperation::AddField { entity, field } => {
163 statements.push(add_column_sql(entity, field));
164 }
165 SchemaOperation::AddIndex {
166 entity,
167 name,
168 fields,
169 unique,
170 } => {
171 statements.push(create_index_sql(entity, name, fields, *unique));
172 }
173 SchemaOperation::Noop => {}
174 other => {
175 return Err(StorageError {
176 code: "PG_OP_UNSUPPORTED".into(),
177 message: format!("Operation not supported by Postgres adapter: {other:?}"),
178 });
179 }
180 }
181 }
182
183 Ok(statements)
184}
185
186pub const INTROSPECT_TABLES_SQL: &str = "\
195 SELECT table_name \
196 FROM information_schema.tables \
197 WHERE table_schema = 'public' \
198 AND table_type = 'BASE TABLE' \
199 AND table_name NOT LIKE '_pylon_%' \
200 ORDER BY table_name";
201
202pub const INTROSPECT_COLUMNS_SQL: &str = "\
205 SELECT column_name, data_type, is_nullable, \
206 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
207 JOIN information_schema.key_column_usage kcu \
208 ON tc.constraint_name = kcu.constraint_name \
209 WHERE tc.table_name = c.table_name \
210 AND kcu.column_name = c.column_name \
211 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
212 FROM information_schema.columns c \
213 WHERE table_schema = 'public' AND table_name = $1 \
214 ORDER BY ordinal_position";
215
216pub const INTROSPECT_INDEXES_SQL: &str = "\
219 SELECT i.relname as index_name, \
220 ix.indisunique as is_unique, \
221 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
222 FROM pg_index ix \
223 JOIN pg_class t ON t.oid = ix.indrelid \
224 JOIN pg_class i ON i.oid = ix.indexrelid \
225 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
226 JOIN pg_namespace n ON n.oid = t.relnamespace \
227 WHERE n.nspname = 'public' \
228 AND t.relname = $1 \
229 AND NOT ix.indisprimary \
230 GROUP BY i.relname, ix.indisunique \
231 ORDER BY i.relname";
232
233pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
236 crate::plan_from_snapshot(snapshot, target)
237}
238
239pub fn generate_id() -> String {
252 use std::sync::atomic::{AtomicU32, Ordering};
253 use std::time::{SystemTime, UNIX_EPOCH};
254 static COUNTER: AtomicU32 = AtomicU32::new(0);
255 let ts = SystemTime::now()
256 .duration_since(UNIX_EPOCH)
257 .unwrap_or_default()
258 .as_nanos();
259 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
260 format!("{ts:032x}{seq:08x}")
261}
262
263pub fn json_value_to_string(val: &serde_json::Value) -> String {
265 match val {
266 serde_json::Value::String(s) => s.clone(),
267 serde_json::Value::Number(n) => n.to_string(),
268 serde_json::Value::Bool(b) => b.to_string(),
269 serde_json::Value::Null => String::new(),
270 other => other.to_string(),
271 }
272}
273
274pub fn build_insert_sql(
277 entity: &str,
278 data: &serde_json::Value,
279) -> Result<(String, Vec<String>), StorageError> {
280 let id = generate_id();
281 let obj = data.as_object().ok_or_else(|| StorageError {
282 code: "PG_INVALID_DATA".into(),
283 message: "Insert data must be a JSON object".into(),
284 })?;
285
286 let mut col_names = vec!["id".to_string()];
287 let mut placeholders = vec!["$1".to_string()];
288 let mut values: Vec<String> = vec![id];
289
290 for (i, (key, val)) in obj.iter().enumerate() {
291 col_names.push(quote_ident(key));
292 placeholders.push(format!("${}", i + 2));
293 values.push(json_value_to_string(val));
294 }
295
296 let sql = format!(
297 "INSERT INTO {} ({}) VALUES ({})",
298 quote_ident(entity),
299 col_names.join(", "),
300 placeholders.join(", ")
301 );
302
303 Ok((sql, values))
304}
305
306pub fn build_update_sql(
309 entity: &str,
310 id: &str,
311 data: &serde_json::Value,
312) -> Result<(String, Vec<String>), StorageError> {
313 let obj = data.as_object().ok_or_else(|| StorageError {
314 code: "PG_INVALID_DATA".into(),
315 message: "Update data must be a JSON object".into(),
316 })?;
317
318 if obj.is_empty() {
319 return Err(StorageError {
320 code: "PG_INVALID_DATA".into(),
321 message: "Update data must contain at least one field".into(),
322 });
323 }
324
325 let mut set_clauses = Vec::new();
326 let mut values: Vec<String> = vec![id.to_string()];
327
328 for (i, (key, val)) in obj.iter().enumerate() {
329 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
330 values.push(json_value_to_string(val));
331 }
332
333 let sql = format!(
334 "UPDATE {} SET {} WHERE id = $1",
335 quote_ident(entity),
336 set_clauses.join(", ")
337 );
338
339 Ok((sql, values))
340}
341
342#[cfg(feature = "postgres-live")]
347pub mod live {
348 use super::*;
349 use crate::{
350 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
351 };
352
353 pub struct LivePostgresAdapter {
355 client: postgres::Client,
356 }
357
358 impl LivePostgresAdapter {
359 pub fn connect(url: &str) -> Result<Self, StorageError> {
361 let client =
362 postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
363 code: "PG_CONNECT_FAILED".into(),
364 message: format!("Failed to connect to Postgres: {e}"),
365 })?;
366 Ok(Self { client })
367 }
368
369 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
371 let table_rows = self
372 .client
373 .query(INTROSPECT_TABLES_SQL, &[])
374 .map_err(pg_err)?;
375
376 let mut tables = Vec::new();
377 for row in &table_rows {
378 let table_name: String = row.get(0);
379 let columns = self.read_columns(&table_name)?;
380 let indexes = self.read_indexes(&table_name)?;
381 tables.push(TableSnapshot {
382 name: table_name,
383 columns,
384 indexes,
385 });
386 }
387
388 Ok(SchemaSnapshot { tables })
389 }
390
391 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
392 let rows = self
393 .client
394 .query(INTROSPECT_COLUMNS_SQL, &[&table])
395 .map_err(pg_err)?;
396
397 let mut columns = Vec::new();
398 for row in &rows {
399 let name: String = row.get(0);
400 let data_type: String = row.get(1);
401 let is_nullable: String = row.get(2);
402 let is_pk: i64 = row.get(3);
403 columns.push(ColumnSnapshot {
404 name,
405 column_type: data_type,
406 notnull: is_nullable == "NO",
407 primary_key: is_pk > 0,
408 });
409 }
410 Ok(columns)
411 }
412
413 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
414 let rows = self
415 .client
416 .query(INTROSPECT_INDEXES_SQL, &[&table])
417 .map_err(pg_err)?;
418
419 let mut indexes = Vec::new();
420 for row in &rows {
421 let name: String = row.get(0);
422 let unique: bool = row.get(1);
423 let columns: Vec<String> = row.get(2);
424 indexes.push(IndexSnapshot {
425 name,
426 columns,
427 unique,
428 });
429 }
430 Ok(indexes)
431 }
432
433 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
435 let snapshot = self.read_schema()?;
436 Ok(crate::plan_from_snapshot(&snapshot, target))
437 }
438 }
439
440 impl StorageAdapter for LivePostgresAdapter {
441 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
442 Err(StorageError {
443 code: "PG_PLAN_NEEDS_MUTABLE".into(),
444 message: "Use plan_from_live() instead for live Postgres planning".into(),
445 })
446 }
447
448 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
449 Err(StorageError {
450 code: "PG_APPLY_USE_METHOD".into(),
451 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
452 })
453 }
454 }
455
456 impl LivePostgresAdapter {
457 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
459 let statements = plan_to_sql(plan)?;
460 for sql in &statements {
461 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
462 }
463 Ok(())
464 }
465
466 pub fn insert(
468 &mut self,
469 entity: &str,
470 data: &serde_json::Value,
471 ) -> Result<String, StorageError> {
472 let (sql, values) = build_insert_sql(entity, data)?;
473 let id = values[0].clone();
474
475 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
476 .iter()
477 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
478 .collect();
479
480 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
481 Ok(id)
482 }
483
484 pub fn get_by_id(
486 &mut self,
487 entity: &str,
488 id: &str,
489 ) -> Result<Option<serde_json::Value>, StorageError> {
490 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
491 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
492
493 match rows.first() {
494 Some(row) => Ok(Some(row_to_json(row))),
495 None => Ok(None),
496 }
497 }
498
499 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
501 let sql = format!("SELECT * FROM {}", quote_ident(entity));
502 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
503
504 Ok(rows.iter().map(row_to_json).collect())
505 }
506
507 pub fn list_after(
511 &mut self,
512 entity: &str,
513 after: Option<&str>,
514 limit: usize,
515 ) -> Result<Vec<serde_json::Value>, StorageError> {
516 let capped: i64 = limit.min(10_000) as i64;
519 let sql = match after {
520 Some(_) => format!(
521 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
522 quote_ident(entity)
523 ),
524 None => format!(
525 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
526 quote_ident(entity)
527 ),
528 };
529 let rows = match after {
530 Some(cursor) => self
531 .client
532 .query(sql.as_str(), &[&cursor, &capped])
533 .map_err(pg_err)?,
534 None => self
535 .client
536 .query(sql.as_str(), &[&capped])
537 .map_err(pg_err)?,
538 };
539 Ok(rows.iter().map(row_to_json).collect())
540 }
541
542 pub fn update(
544 &mut self,
545 entity: &str,
546 id: &str,
547 data: &serde_json::Value,
548 ) -> Result<bool, StorageError> {
549 let (sql, values) = build_update_sql(entity, id, data)?;
550
551 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
552 .iter()
553 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
554 .collect();
555
556 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
557 Ok(rows_affected > 0)
558 }
559
560 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
562 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
563 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
564 Ok(rows_affected > 0)
565 }
566
567 pub fn lookup_field(
571 &mut self,
572 entity: &str,
573 field: &str,
574 value: &str,
575 ) -> Result<Option<serde_json::Value>, StorageError> {
576 let sql = format!(
577 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
578 quote_ident(entity),
579 quote_ident(field),
580 );
581 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
582 Ok(rows.first().map(row_to_json))
583 }
584
585 pub fn query_filtered(
595 &mut self,
596 entity: &str,
597 filter: &serde_json::Value,
598 valid_columns: &[String],
599 ) -> Result<Vec<serde_json::Value>, StorageError> {
600 let empty = serde_json::Map::new();
601 let obj = filter.as_object().unwrap_or(&empty);
602
603 let validate = |col: &str| -> Result<(), StorageError> {
604 if col == "id" || valid_columns.iter().any(|c| c == col) {
605 Ok(())
606 } else {
607 Err(StorageError {
608 code: "UNKNOWN_COLUMN".into(),
609 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
610 })
611 }
612 };
613
614 let mut where_clauses: Vec<String> = Vec::new();
615 let mut order_clause = String::new();
616 let mut limit_clause = String::new();
617 let mut offset_clause = String::new();
618 let mut planned: Vec<(String, String, String)> = Vec::new();
621
622 for (key, val) in obj {
623 match key.as_str() {
624 "$order" => {
625 if let Some(ord) = val.as_object() {
626 let mut parts = Vec::new();
627 for (col, dir) in ord {
628 validate(col)?;
629 let d = match dir.as_str().unwrap_or("asc") {
630 "desc" | "DESC" => "DESC",
631 _ => "ASC",
632 };
633 parts.push(format!("{} {d}", quote_ident(col)));
634 }
635 if !parts.is_empty() {
636 order_clause = format!(" ORDER BY {}", parts.join(", "));
637 }
638 }
639 }
640 "$limit" => {
641 if let Some(n) = val.as_u64() {
642 limit_clause = format!(" LIMIT {}", n);
643 }
644 }
645 "$offset" => {
646 if let Some(n) = val.as_u64() {
647 offset_clause = format!(" OFFSET {}", n);
648 }
649 }
650 field => {
651 validate(field)?;
652 match val {
653 serde_json::Value::Object(ops) => {
654 for (op, v) in ops {
655 match op.as_str() {
656 "$gt" => {
657 planned.push((field.into(), ">".into(), value_to_pg(v)))
658 }
659 "$gte" => planned.push((
660 field.into(),
661 ">=".into(),
662 value_to_pg(v),
663 )),
664 "$lt" => {
665 planned.push((field.into(), "<".into(), value_to_pg(v)))
666 }
667 "$lte" => planned.push((
668 field.into(),
669 "<=".into(),
670 value_to_pg(v),
671 )),
672 "$like" => planned.push((
673 field.into(),
674 "LIKE".into(),
675 value_to_pg(v),
676 )),
677 "$in" => {
678 if let Some(arr) = v.as_array() {
679 let placeholders: Vec<String> = (0..arr.len())
680 .map(|i| format!("${}", planned.len() + 1 + i))
681 .collect();
682 where_clauses.push(format!(
683 "{} IN ({})",
684 quote_ident(field),
685 placeholders.join(", "),
686 ));
687 for x in arr {
688 planned.push((
689 format!("__inline_{}", planned.len()),
690 "__INLINE__".into(),
691 value_to_pg(x),
692 ));
693 }
694 }
695 }
696 _ => {}
697 }
698 }
699 }
700 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
701 }
702 }
703 }
704 }
705
706 let mut params: Vec<String> = Vec::with_capacity(planned.len());
708 for (field, op, v) in &planned {
709 if op == "__INLINE__" {
710 params.push(v.clone());
712 } else {
713 let placeholder = format!("${}", params.len() + 1);
714 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
715 params.push(v.clone());
716 }
717 }
718
719 let where_sql = if where_clauses.is_empty() {
720 String::new()
721 } else {
722 format!(" WHERE {}", where_clauses.join(" AND "))
723 };
724 let sql = format!(
725 "SELECT * FROM {}{}{}{}{}",
726 quote_ident(entity),
727 where_sql,
728 order_clause,
729 limit_clause,
730 offset_clause,
731 );
732
733 let pg_params: Vec<&(dyn postgres::types::ToSql + Sync)> = params
734 .iter()
735 .map(|s| s as &(dyn postgres::types::ToSql + Sync))
736 .collect();
737
738 let rows = self
739 .client
740 .query(sql.as_str(), &pg_params)
741 .map_err(pg_err)?;
742 Ok(rows.iter().map(row_to_json).collect())
743 }
744 }
745
746 pub enum TxOp<'a> {
748 Insert {
749 entity: &'a str,
750 data: &'a serde_json::Value,
751 },
752 Update {
753 entity: &'a str,
754 id: &'a str,
755 data: &'a serde_json::Value,
756 },
757 Delete {
758 entity: &'a str,
759 id: &'a str,
760 },
761 }
762
763 #[derive(Debug, Clone)]
765 pub enum TxResult {
766 Inserted(String),
767 Updated(bool),
768 Deleted(bool),
769 }
770
771 impl LivePostgresAdapter {
772 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
777 let mut tx = self.client.transaction().map_err(pg_err)?;
778 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
779
780 for op in ops {
781 match op {
782 TxOp::Insert { entity, data } => {
783 let (sql, values) = build_insert_sql(entity, data)?;
784 let id = values[0].clone();
785 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
786 .iter()
787 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
788 .collect();
789 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
790 results.push(TxResult::Inserted(id));
791 }
792 TxOp::Update { entity, id, data } => {
793 let (sql, values) = build_update_sql(entity, id, data)?;
794 let params: Vec<&(dyn postgres::types::ToSql + Sync)> = values
795 .iter()
796 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
797 .collect();
798 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
799 results.push(TxResult::Updated(n > 0));
800 }
801 TxOp::Delete { entity, id } => {
802 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
803 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
804 results.push(TxResult::Deleted(n > 0));
805 }
806 }
807 }
808
809 tx.commit().map_err(pg_err)?;
810 Ok(results)
811 }
812 }
813
814 fn value_to_pg(v: &serde_json::Value) -> String {
815 match v {
816 serde_json::Value::String(s) => s.clone(),
817 serde_json::Value::Number(n) => n.to_string(),
818 serde_json::Value::Bool(b) => b.to_string(),
819 serde_json::Value::Null => String::new(),
820 other => other.to_string(),
821 }
822 }
823
824 fn row_to_json(row: &postgres::Row) -> serde_json::Value {
825 use postgres::types::Type;
826 let mut obj = serde_json::Map::new();
827 for (i, col) in row.columns().iter().enumerate() {
828 let name = col.name().to_string();
829
830 let value: serde_json::Value = match *col.type_() {
841 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
842 .flatten()
843 .map(serde_json::Value::Bool)
844 .unwrap_or(serde_json::Value::Null),
845 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
846 .flatten()
847 .map(|v| serde_json::Value::Number(v.into()))
848 .unwrap_or(serde_json::Value::Null),
849 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
850 .flatten()
851 .map(|v| serde_json::Value::Number(v.into()))
852 .unwrap_or(serde_json::Value::Null),
853 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
854 .flatten()
855 .map(|v| serde_json::Value::Number(v.into()))
856 .unwrap_or(serde_json::Value::Null),
857 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
858 .flatten()
859 .and_then(|v| serde_json::Number::from_f64(v as f64))
860 .map(serde_json::Value::Number)
861 .unwrap_or(serde_json::Value::Null),
862 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
863 .flatten()
864 .and_then(serde_json::Number::from_f64)
865 .map(serde_json::Value::Number)
866 .unwrap_or(serde_json::Value::Null),
867 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
868 .flatten()
869 .unwrap_or(serde_json::Value::Null),
870 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
871 .flatten()
872 .map(|b| serde_json::Value::String(b64(&b)))
873 .unwrap_or(serde_json::Value::Null),
874 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
875 try_get_or_null::<Option<String>>(row, i)
876 .flatten()
877 .map(serde_json::Value::String)
878 .unwrap_or(serde_json::Value::Null)
879 }
880 _ => {
881 match row.try_get::<_, Option<String>>(i) {
886 Ok(Some(s)) => serde_json::Value::String(s),
887 Ok(None) => serde_json::Value::Null,
888 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
889 Ok(Some(bytes)) => serde_json::Value::String(
890 String::from_utf8_lossy(&bytes).into_owned(),
891 ),
892 _ => serde_json::Value::Null,
893 },
894 }
895 }
896 };
897 obj.insert(name, value);
898 }
899 serde_json::Value::Object(obj)
900 }
901
902 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
903 where
904 T: postgres::types::FromSql<'a>,
905 {
906 match row.try_get::<_, T>(i) {
907 Ok(v) => Some(v),
908 Err(e) => {
909 tracing::warn!(
910 "[postgres] decode failed for column {} ({}): {e}",
911 i,
912 row.columns()[i].name()
913 );
914 None
915 }
916 }
917 }
918
919 fn b64(bytes: &[u8]) -> String {
922 const TABLE: &[u8; 64] =
923 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
924 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
925 let chunks = bytes.chunks(3);
926 for chunk in chunks {
927 let b = [
928 chunk.first().copied().unwrap_or(0),
929 chunk.get(1).copied().unwrap_or(0),
930 chunk.get(2).copied().unwrap_or(0),
931 ];
932 out.push(TABLE[(b[0] >> 2) as usize] as char);
933 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
934 if chunk.len() > 1 {
935 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
936 } else {
937 out.push('=');
938 }
939 if chunk.len() > 2 {
940 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
941 } else {
942 out.push('=');
943 }
944 }
945 out
946 }
947
948 fn pg_err(e: postgres::Error) -> StorageError {
949 StorageError {
950 code: "PG_QUERY_FAILED".into(),
951 message: format!("Postgres query failed: {e}"),
952 }
953 }
954}
955
956#[cfg(test)]
961mod tests {
962 use super::*;
963
964 fn test_manifest() -> AppManifest {
968 use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
969 let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
970 name: name.into(),
971 field_type: ty.into(),
972 optional: opt,
973 unique: uniq,
974 crdt: None,
975 };
976 AppManifest {
977 manifest_version: 1,
978 name: "test".into(),
979 version: "0.0.0".into(),
980 entities: vec![
981 ManifestEntity {
982 name: "User".into(),
983 fields: vec![
984 f("email", "string", false, true),
985 f("displayName", "string", false, false),
986 f("createdAt", "datetime", false, false),
987 ],
988 indexes: vec![],
989 relations: vec![],
990 search: None,
991 crdt: true,
992 },
993 ManifestEntity {
994 name: "Todo".into(),
995 fields: vec![
996 f("title", "string", false, false),
997 f("done", "bool", false, false),
998 f("userId", "id(User)", false, false),
999 f("createdAt", "datetime", false, false),
1000 ],
1001 indexes: vec![ManifestIndex {
1002 name: "by_user".into(),
1003 fields: vec!["userId".into()],
1004 unique: false,
1005 }],
1006 relations: vec![],
1007 search: None,
1008 crdt: true,
1009 },
1010 ],
1011 queries: vec![],
1012 actions: vec![],
1013 policies: vec![],
1014 routes: vec![],
1015 }
1016 }
1017
1018 #[test]
1019 fn pg_type_mapping() {
1020 assert_eq!(pg_column_type("string"), "TEXT");
1021 assert_eq!(pg_column_type("int"), "INTEGER");
1022 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1023 assert_eq!(pg_column_type("bool"), "BOOLEAN");
1024 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1025 assert_eq!(pg_column_type("richtext"), "TEXT");
1026 assert_eq!(pg_column_type("id(User)"), "TEXT");
1027 }
1028
1029 #[test]
1030 fn quote_ident_simple() {
1031 assert_eq!(quote_ident("User"), "\"User\"");
1032 assert_eq!(quote_ident("email"), "\"email\"");
1033 }
1034
1035 #[test]
1036 fn quote_ident_escapes_embedded_double_quotes() {
1037 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1038 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1039 }
1040
1041 #[test]
1042 fn create_table_sql_basic() {
1043 let fields = vec![
1044 FieldSpec {
1045 name: "email".into(),
1046 field_type: "string".into(),
1047 optional: false,
1048 unique: true,
1049 },
1050 FieldSpec {
1051 name: "age".into(),
1052 field_type: "int".into(),
1053 optional: true,
1054 unique: false,
1055 },
1056 ];
1057 let sql = create_table_sql("User", &fields);
1058 assert_eq!(
1059 sql,
1060 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1061 );
1062 }
1063
1064 #[test]
1065 fn create_table_sql_escapes_identifiers() {
1066 let fields = vec![FieldSpec {
1067 name: "col\"x".into(),
1068 field_type: "string".into(),
1069 optional: false,
1070 unique: false,
1071 }];
1072 let sql = create_table_sql("my\"table", &fields);
1073 assert!(sql.contains("\"my\"\"table\""));
1074 assert!(sql.contains("\"col\"\"x\""));
1075 }
1076
1077 #[test]
1078 fn create_index_sql_unique() {
1079 let sql = create_index_sql("User", "by_email", &["email".into()], true);
1080 assert_eq!(
1081 sql,
1082 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1083 );
1084 }
1085
1086 #[test]
1087 fn create_index_sql_non_unique() {
1088 let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1089 assert_eq!(
1090 sql,
1091 "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1092 );
1093 }
1094
1095 #[test]
1096 fn add_column_sql_basic() {
1097 let field = FieldSpec {
1098 name: "bio".into(),
1099 field_type: "string".into(),
1100 optional: true,
1101 unique: false,
1102 };
1103 let sql = add_column_sql("User", &field);
1104 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1105 }
1106
1107 #[test]
1108 fn plan_from_manifest() {
1109 let adapter = PostgresAdapter;
1110 let manifest = test_manifest();
1111 let plan = adapter.plan_schema(&manifest).unwrap();
1112
1113 assert!(plan.operations.iter().any(|op| matches!(
1115 op,
1116 SchemaOperation::CreateEntity { name, .. } if name == "User"
1117 )));
1118 assert!(plan.operations.iter().any(|op| matches!(
1119 op,
1120 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1121 )));
1122 assert!(plan.operations.iter().any(|op| matches!(
1123 op,
1124 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1125 )));
1126 }
1127
1128 #[test]
1129 fn plan_to_sql_produces_statements() {
1130 let adapter = PostgresAdapter;
1131 let manifest = test_manifest();
1132 let plan = adapter.plan_schema(&manifest).unwrap();
1133 let stmts = plan_to_sql(&plan).unwrap();
1134
1135 let create_tables = stmts
1140 .iter()
1141 .filter(|s| s.starts_with("CREATE TABLE"))
1142 .count();
1143 let create_indexes = stmts
1144 .iter()
1145 .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1146 .count();
1147 assert_eq!(create_tables, 2);
1148 assert!(create_indexes >= 1);
1149 assert!(stmts[0].starts_with("CREATE TABLE"));
1150 assert!(stmts[1].starts_with("CREATE TABLE"));
1151 }
1152
1153 #[test]
1154 fn plan_to_sql_rejects_unsupported() {
1155 let plan = SchemaPlan {
1156 operations: vec![SchemaOperation::RemoveEntity {
1157 name: "User".into(),
1158 }],
1159 };
1160 let result = plan_to_sql(&plan);
1161 assert!(result.is_err());
1162 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1163 }
1164
1165 #[test]
1166 fn apply_not_implemented() {
1167 let adapter = PostgresAdapter;
1168 let plan = SchemaPlan {
1169 operations: vec![SchemaOperation::Noop],
1170 };
1171 let result = adapter.apply_schema(&plan);
1172 assert!(result.is_err());
1173 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1174 }
1175
1176 #[test]
1177 fn sql_uses_quoted_identifiers() {
1178 let fields = vec![FieldSpec {
1179 name: "createdAt".into(),
1180 field_type: "datetime".into(),
1181 optional: false,
1182 unique: false,
1183 }];
1184 let sql = create_table_sql("User", &fields);
1185 assert!(sql.contains("\"User\""));
1187 assert!(sql.contains("\"createdAt\""));
1188 assert!(sql.contains("TIMESTAMPTZ"));
1189 }
1190
1191 #[test]
1194 fn introspect_sql_constants_are_valid() {
1195 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1197 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1198 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1199 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1200 }
1201
1202 #[test]
1205 fn plan_from_empty_snapshot_creates_all() {
1206 let snapshot = crate::SchemaSnapshot { tables: vec![] };
1207 let manifest = test_manifest();
1208 let plan = plan_from_snapshot(&snapshot, &manifest);
1209
1210 assert!(plan.operations.iter().any(|op| matches!(
1211 op,
1212 SchemaOperation::CreateEntity { name, .. } if name == "User"
1213 )));
1214 assert!(plan.operations.iter().any(|op| matches!(
1215 op,
1216 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1217 )));
1218 assert!(plan.operations.iter().any(|op| matches!(
1219 op,
1220 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1221 )));
1222 }
1223
1224 #[test]
1225 fn plan_from_full_snapshot_is_noop() {
1226 let snapshot = crate::SchemaSnapshot {
1227 tables: vec![
1228 crate::TableSnapshot {
1229 name: "User".into(),
1230 columns: vec![
1231 crate::ColumnSnapshot {
1232 name: "id".into(),
1233 column_type: "TEXT".into(),
1234 notnull: true,
1235 primary_key: true,
1236 },
1237 crate::ColumnSnapshot {
1238 name: "email".into(),
1239 column_type: "TEXT".into(),
1240 notnull: true,
1241 primary_key: false,
1242 },
1243 crate::ColumnSnapshot {
1244 name: "displayName".into(),
1245 column_type: "TEXT".into(),
1246 notnull: true,
1247 primary_key: false,
1248 },
1249 crate::ColumnSnapshot {
1250 name: "createdAt".into(),
1251 column_type: "TIMESTAMPTZ".into(),
1252 notnull: true,
1253 primary_key: false,
1254 },
1255 ],
1256 indexes: vec![],
1257 },
1258 crate::TableSnapshot {
1259 name: "Todo".into(),
1260 columns: vec![
1261 crate::ColumnSnapshot {
1262 name: "id".into(),
1263 column_type: "TEXT".into(),
1264 notnull: true,
1265 primary_key: true,
1266 },
1267 crate::ColumnSnapshot {
1268 name: "title".into(),
1269 column_type: "TEXT".into(),
1270 notnull: true,
1271 primary_key: false,
1272 },
1273 crate::ColumnSnapshot {
1274 name: "done".into(),
1275 column_type: "BOOLEAN".into(),
1276 notnull: true,
1277 primary_key: false,
1278 },
1279 crate::ColumnSnapshot {
1280 name: "userId".into(),
1281 column_type: "TEXT".into(),
1282 notnull: true,
1283 primary_key: false,
1284 },
1285 crate::ColumnSnapshot {
1286 name: "createdAt".into(),
1287 column_type: "TIMESTAMPTZ".into(),
1288 notnull: true,
1289 primary_key: false,
1290 },
1291 ],
1292 indexes: vec![crate::IndexSnapshot {
1293 name: "Todo_by_user".into(),
1294 columns: vec!["userId".into()],
1295 unique: false,
1296 }],
1297 },
1298 ],
1299 };
1300 let manifest = test_manifest();
1301 let plan = plan_from_snapshot(&snapshot, &manifest);
1302 assert!(plan.is_empty());
1303 }
1304
1305 #[test]
1306 fn plan_detects_missing_column_in_snapshot() {
1307 let snapshot = crate::SchemaSnapshot {
1308 tables: vec![
1309 crate::TableSnapshot {
1310 name: "User".into(),
1311 columns: vec![
1312 crate::ColumnSnapshot {
1313 name: "id".into(),
1314 column_type: "TEXT".into(),
1315 notnull: true,
1316 primary_key: true,
1317 },
1318 crate::ColumnSnapshot {
1319 name: "email".into(),
1320 column_type: "TEXT".into(),
1321 notnull: true,
1322 primary_key: false,
1323 },
1324 ],
1326 indexes: vec![],
1327 },
1328 crate::TableSnapshot {
1329 name: "Todo".into(),
1330 columns: vec![
1331 crate::ColumnSnapshot {
1332 name: "id".into(),
1333 column_type: "TEXT".into(),
1334 notnull: true,
1335 primary_key: true,
1336 },
1337 crate::ColumnSnapshot {
1338 name: "title".into(),
1339 column_type: "TEXT".into(),
1340 notnull: true,
1341 primary_key: false,
1342 },
1343 crate::ColumnSnapshot {
1344 name: "done".into(),
1345 column_type: "BOOLEAN".into(),
1346 notnull: true,
1347 primary_key: false,
1348 },
1349 crate::ColumnSnapshot {
1350 name: "userId".into(),
1351 column_type: "TEXT".into(),
1352 notnull: true,
1353 primary_key: false,
1354 },
1355 crate::ColumnSnapshot {
1356 name: "createdAt".into(),
1357 column_type: "TIMESTAMPTZ".into(),
1358 notnull: true,
1359 primary_key: false,
1360 },
1361 ],
1362 indexes: vec![crate::IndexSnapshot {
1363 name: "Todo_by_user".into(),
1364 columns: vec!["userId".into()],
1365 unique: false,
1366 }],
1367 },
1368 ],
1369 };
1370 let manifest = test_manifest();
1371 let plan = plan_from_snapshot(&snapshot, &manifest);
1372
1373 let add_fields: Vec<_> = plan
1374 .operations
1375 .iter()
1376 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1377 .collect();
1378 assert_eq!(add_fields.len(), 2); }
1380
1381 #[test]
1384 fn json_value_to_string_handles_all_types() {
1385 assert_eq!(
1386 json_value_to_string(&serde_json::Value::String("hello".into())),
1387 "hello"
1388 );
1389 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1390 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1391 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1392 assert_eq!(
1393 json_value_to_string(&serde_json::Value::Bool(false)),
1394 "false"
1395 );
1396 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1397 assert_eq!(
1399 json_value_to_string(&serde_json::json!([1, 2, 3])),
1400 "[1,2,3]"
1401 );
1402 assert_eq!(
1403 json_value_to_string(&serde_json::json!({"a": 1})),
1404 "{\"a\":1}"
1405 );
1406 }
1407
1408 #[test]
1409 fn generate_id_returns_hex_string() {
1410 let id = generate_id();
1411 assert!(!id.is_empty());
1412 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1414 }
1415
1416 #[test]
1417 fn generate_id_is_unique_across_calls() {
1418 let id1 = generate_id();
1419 let id2 = generate_id();
1420 assert_ne!(id1, id2);
1421 }
1422
1423 #[test]
1424 fn generate_id_is_lex_sortable() {
1425 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1428 let sorted = {
1429 let mut s = ids.clone();
1430 s.sort();
1431 s
1432 };
1433 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1434 let len0 = ids[0].len();
1437 assert!(ids.iter().all(|id| id.len() == len0));
1438 ids.dedup();
1439 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1440 }
1441
1442 #[test]
1443 fn build_insert_sql_simple() {
1444 let data = serde_json::json!({
1445 "email": "alice@example.com",
1446 "displayName": "Alice"
1447 });
1448 let (sql, values) = build_insert_sql("User", &data).unwrap();
1449
1450 assert!(sql.starts_with("INSERT INTO \"User\""));
1451 assert!(sql.contains("id"));
1452 assert!(sql.contains("$1"));
1453 assert!(sql.contains("$2"));
1454 assert!(sql.contains("$3"));
1455 assert!(!values[0].is_empty());
1457 assert_eq!(values.len(), 3); }
1459
1460 #[test]
1461 fn build_insert_sql_quotes_column_names() {
1462 let data = serde_json::json!({"createdAt": "2026-01-01"});
1463 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1464 assert!(sql.contains("\"createdAt\""));
1465 assert!(sql.contains("\"Todo\""));
1466 }
1467
1468 #[test]
1469 fn build_insert_sql_rejects_non_object() {
1470 let data = serde_json::json!("not an object");
1471 let result = build_insert_sql("User", &data);
1472 assert!(result.is_err());
1473 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1474 }
1475
1476 #[test]
1477 fn build_update_sql_simple() {
1478 let data = serde_json::json!({
1479 "displayName": "Bob",
1480 "email": "bob@example.com"
1481 });
1482 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1483
1484 assert!(sql.starts_with("UPDATE \"User\" SET"));
1485 assert!(sql.contains("WHERE id = $1"));
1486 assert!(sql.contains("$2"));
1487 assert!(sql.contains("$3"));
1488 assert_eq!(values[0], "abc123");
1489 assert_eq!(values.len(), 3); }
1491
1492 #[test]
1493 fn build_update_sql_quotes_column_names() {
1494 let data = serde_json::json!({"displayName": "Carol"});
1495 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1496 assert!(sql.contains("\"displayName\" = $2"));
1497 }
1498
1499 #[test]
1500 fn build_update_sql_rejects_non_object() {
1501 let data = serde_json::json!(42);
1502 let result = build_update_sql("User", "id1", &data);
1503 assert!(result.is_err());
1504 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1505 }
1506
1507 #[test]
1508 fn build_update_sql_rejects_empty_object() {
1509 let data = serde_json::json!({});
1510 let err = build_update_sql("User", "id1", &data).unwrap_err();
1511 assert_eq!(err.code, "PG_INVALID_DATA");
1512 assert!(err.message.contains("at least one field"));
1513 }
1514}