1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29pub(crate) fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46 quote_ident(name)
47}
48
49#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54 live::row_to_json(row)
55}
56
57#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64 entity: &str,
65 filter: &serde_json::Value,
66 valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68 live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76 entity: &str,
77 spec: &serde_json::Value,
78 valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80 live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87 rows: &[postgres::Row],
88 column_names: &[String],
89) -> serde_json::Value {
90 live::aggregate_rows_to_json(rows, column_names)
91}
92
93pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101 for field in fields {
102 let col_type = pg_column_type(&field.field_type);
103 let not_null = if field.optional { "" } else { " NOT NULL" };
104 let unique = if field.unique { " UNIQUE" } else { "" };
105 columns.push(format!(
106 "{} {}{}{}",
107 quote_ident(&field.name),
108 col_type,
109 not_null,
110 unique
111 ));
112 }
113
114 format!(
115 "CREATE TABLE IF NOT EXISTS {} ({})",
116 quote_ident(entity_name),
117 columns.join(", ")
118 )
119}
120
121pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125 let col_type = pg_column_type(&field.field_type);
126 let unique = if field.unique { " UNIQUE" } else { "" };
127 format!(
128 "ALTER TABLE {} ADD COLUMN {} {}{}",
129 quote_ident(entity_name),
130 quote_ident(&field.name),
131 col_type,
132 unique
133 )
134}
135
136pub fn create_index_sql(
138 entity_name: &str,
139 index_name: &str,
140 fields: &[String],
141 unique: bool,
142) -> String {
143 let unique_str = if unique { "UNIQUE " } else { "" };
144 let full_index_name = format!("{}_{}", entity_name, index_name);
145 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146 format!(
147 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148 unique_str,
149 quote_ident(&full_index_name),
150 quote_ident(entity_name),
151 quoted_fields.join(", ")
152 )
153}
154
155pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165 let mut operations = Vec::new();
167
168 for entity in &target.entities {
169 let fields: Vec<FieldSpec> = entity
170 .fields
171 .iter()
172 .map(|f| FieldSpec {
173 name: f.name.clone(),
174 field_type: f.field_type.clone(),
175 optional: f.optional,
176 unique: f.unique,
177 })
178 .collect();
179
180 operations.push(SchemaOperation::CreateEntity {
181 name: entity.name.clone(),
182 fields,
183 });
184
185 for index in &entity.indexes {
186 operations.push(SchemaOperation::AddIndex {
187 entity: entity.name.clone(),
188 name: index.name.clone(),
189 fields: index.fields.clone(),
190 unique: index.unique,
191 });
192 }
193 }
194
195 if operations.is_empty() {
196 operations.push(SchemaOperation::Noop);
197 }
198
199 Ok(SchemaPlan { operations })
200 }
201
202 }
204
205pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208 let mut statements = Vec::new();
209
210 for op in &plan.operations {
211 match op {
212 SchemaOperation::CreateEntity { name, fields } => {
213 statements.push(create_table_sql(name, fields));
214 }
215 SchemaOperation::AddField { entity, field } => {
216 statements.push(add_column_sql(entity, field));
217 }
218 SchemaOperation::AlterField {
219 entity,
220 previous,
221 target,
222 } => {
223 if previous.optional && !target.optional {
232 statements.push(format!(
233 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234 quote_ident(entity),
235 quote_ident(&target.name)
236 ));
237 } else if !previous.optional && target.optional {
238 statements.push(format!(
239 "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240 quote_ident(entity),
241 quote_ident(&target.name)
242 ));
243 }
244 }
250 SchemaOperation::AddIndex {
251 entity,
252 name,
253 fields,
254 unique,
255 } => {
256 statements.push(create_index_sql(entity, name, fields, *unique));
257 }
258 SchemaOperation::CreateSearchIndex { entity, config } => {
259 #[cfg(feature = "postgres-live")]
260 {
261 statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262 }
263 #[cfg(not(feature = "postgres-live"))]
264 {
265 let _ = (entity, config);
266 return Err(StorageError {
267 code: "PG_SEARCH_FEATURE_OFF".into(),
268 message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269 });
270 }
271 }
272 SchemaOperation::RemoveSearchIndex { entity } => {
273 statements.push(format!(
286 "DROP TABLE IF EXISTS {} CASCADE",
287 quote_ident(&format!("_fts_{entity}"))
288 ));
289 statements.push(format!(
290 "DROP INDEX IF EXISTS {}",
291 quote_ident(&format!("{entity}_fts_gin"))
292 ));
293 }
294 SchemaOperation::Noop => {}
295 other => {
296 return Err(StorageError {
297 code: "PG_OP_UNSUPPORTED".into(),
298 message: format!("Operation not supported by Postgres adapter: {other:?}"),
299 });
300 }
301 }
302 }
303
304 Ok(statements)
305}
306
307pub const INTROSPECT_TABLES_SQL: &str = "\
316 SELECT table_name \
317 FROM information_schema.tables \
318 WHERE table_schema = 'public' \
319 AND table_type = 'BASE TABLE' \
320 AND table_name NOT LIKE '_pylon_%' \
321 ORDER BY table_name";
322
323pub const INTROSPECT_COLUMNS_SQL: &str = "\
326 SELECT column_name, data_type, is_nullable, \
327 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328 JOIN information_schema.key_column_usage kcu \
329 ON tc.constraint_name = kcu.constraint_name \
330 WHERE tc.table_name = c.table_name \
331 AND kcu.column_name = c.column_name \
332 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333 FROM information_schema.columns c \
334 WHERE table_schema = 'public' AND table_name = $1 \
335 ORDER BY ordinal_position";
336
337pub const INTROSPECT_INDEXES_SQL: &str = "\
340 SELECT i.relname as index_name, \
341 ix.indisunique as is_unique, \
342 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343 FROM pg_index ix \
344 JOIN pg_class t ON t.oid = ix.indrelid \
345 JOIN pg_class i ON i.oid = ix.indexrelid \
346 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347 JOIN pg_namespace n ON n.oid = t.relnamespace \
348 WHERE n.nspname = 'public' \
349 AND t.relname = $1 \
350 AND NOT ix.indisprimary \
351 GROUP BY i.relname, ix.indisunique \
352 ORDER BY i.relname";
353
354pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357 crate::plan_from_snapshot(snapshot, target)
358}
359
360pub fn generate_id() -> String {
373 use std::sync::atomic::{AtomicU32, Ordering};
374 use std::time::{SystemTime, UNIX_EPOCH};
375 static COUNTER: AtomicU32 = AtomicU32::new(0);
376 let ts = SystemTime::now()
377 .duration_since(UNIX_EPOCH)
378 .unwrap_or_default()
379 .as_nanos();
380 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381 format!("{ts:032x}{seq:08x}")
382}
383
384pub fn json_value_to_string(val: &serde_json::Value) -> String {
393 match val {
394 serde_json::Value::String(s) => s.clone(),
395 serde_json::Value::Number(n) => n.to_string(),
396 serde_json::Value::Bool(b) => b.to_string(),
397 serde_json::Value::Null => String::new(),
398 other => other.to_string(),
399 }
400}
401
402#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419 Null,
420 Text(String),
421 Int(i64),
422 Float(f64),
423 Bool(bool),
424}
425
426impl JsonParam {
427 pub fn from_json(val: &serde_json::Value) -> Self {
431 match val {
432 serde_json::Value::Null => JsonParam::Null,
433 serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434 serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435 serde_json::Value::Number(n) => {
436 if let Some(i) = n.as_i64() {
437 JsonParam::Int(i)
438 } else if let Some(f) = n.as_f64() {
439 JsonParam::Float(f)
440 } else {
441 JsonParam::Text(n.to_string())
442 }
443 }
444 other => JsonParam::Text(other.to_string()),
445 }
446 }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451 fn to_sql(
452 &self,
453 ty: &postgres::types::Type,
454 out: &mut bytes::BytesMut,
455 ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456 use postgres::types::Type;
457
458 if matches!(self, JsonParam::Null) {
462 return Ok(postgres::types::IsNull::Yes);
463 }
464
465 match (self, ty) {
472 (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474 (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475 (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476 (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477 (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478 (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480 (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481 (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482 (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483 (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485 (JsonParam::Text(s), &Type::TEXT)
486 | (JsonParam::Text(s), &Type::VARCHAR)
487 | (JsonParam::Text(s), &Type::BPCHAR)
488 | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489 (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490 let dt = chrono::DateTime::parse_from_rfc3339(s)
504 .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505 .with_timezone(&chrono::Utc);
506 dt.to_sql(ty, out)
507 }
508 (JsonParam::Text(s), &Type::TIMESTAMP) => {
509 let dt = chrono::DateTime::parse_from_rfc3339(s)
513 .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514 .with_timezone(&chrono::Utc)
515 .naive_utc();
516 dt.to_sql(ty, out)
517 }
518 (JsonParam::Text(s), &Type::DATE) => {
519 let dt = chrono::DateTime::parse_from_rfc3339(s)
520 .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521 .with_timezone(&chrono::Utc)
522 .date_naive();
523 dt.to_sql(ty, out)
524 }
525
526 (other, _) => {
531 let s = match other {
532 JsonParam::Bool(b) => b.to_string(),
533 JsonParam::Int(n) => n.to_string(),
534 JsonParam::Float(f) => f.to_string(),
535 JsonParam::Text(s) => s.clone(),
536 JsonParam::Null => unreachable!(),
537 };
538 s.to_sql(ty, out)
539 }
540 }
541 }
542
543 fn accepts(_ty: &postgres::types::Type) -> bool {
544 true
548 }
549
550 postgres::types::to_sql_checked!();
551}
552
553pub fn build_insert_sql(
561 entity: &str,
562 data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564 let obj = data.as_object().ok_or_else(|| StorageError {
565 code: "PG_INVALID_DATA".into(),
566 message: "Insert data must be a JSON object".into(),
567 })?;
568
569 let id = match obj.get("id") {
576 None | Some(serde_json::Value::Null) => generate_id(),
577 Some(serde_json::Value::String(s)) => s.clone(),
578 Some(other) => {
579 return Err(StorageError {
580 code: "PG_INVALID_ID".into(),
581 message: format!(
582 "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583 are always strings (40-char hex). Drop the `id` field to let the \
584 server generate one, or supply a string."
585 ),
586 });
587 }
588 };
589
590 let mut col_names = vec!["id".to_string()];
591 let mut placeholders = vec!["$1".to_string()];
592 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594 let mut i = 0usize;
595 for (key, val) in obj {
596 if key == "id" {
597 continue;
601 }
602 col_names.push(quote_ident(key));
603 placeholders.push(format!("${}", i + 2));
604 values.push(JsonParam::from_json(val));
605 i += 1;
606 }
607
608 let sql = format!(
609 "INSERT INTO {} ({}) VALUES ({})",
610 quote_ident(entity),
611 col_names.join(", "),
612 placeholders.join(", ")
613 );
614
615 Ok((sql, values))
616}
617
618pub fn build_update_sql(
621 entity: &str,
622 id: &str,
623 data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625 let obj = data.as_object().ok_or_else(|| StorageError {
626 code: "PG_INVALID_DATA".into(),
627 message: "Update data must be a JSON object".into(),
628 })?;
629
630 if obj.is_empty() {
631 return Err(StorageError {
632 code: "PG_INVALID_DATA".into(),
633 message: "Update data must contain at least one field".into(),
634 });
635 }
636
637 let mut set_clauses = Vec::new();
638 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640 let mut i = 0usize;
641 for (key, val) in obj {
642 if key == "id" {
643 return Err(StorageError {
650 code: "PG_INVALID_UPDATE".into(),
651 message:
652 "Updating the `id` column is not allowed — Pylon row ids are immutable; \
653 drop the field from the patch."
654 .into(),
655 });
656 }
657 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
658 values.push(JsonParam::from_json(val));
659 i += 1;
660 }
661
662 if set_clauses.is_empty() {
663 return Err(StorageError {
664 code: "PG_INVALID_DATA".into(),
665 message: "Update data must contain at least one non-id field".into(),
666 });
667 }
668
669 let sql = format!(
670 "UPDATE {} SET {} WHERE id = $1",
671 quote_ident(entity),
672 set_clauses.join(", ")
673 );
674
675 Ok((sql, values))
676}
677
678#[cfg(feature = "postgres-live")]
682fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
683 values
684 .iter()
685 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
686 .collect()
687}
688
689#[cfg(feature = "postgres-live")]
694pub mod live {
695 use super::*;
696 use crate::{
697 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
698 };
699
700 pub(super) struct PgUrlSsl {
705 pub use_tls: bool,
706 }
707
708 pub(super) fn parse_pg_url_ssl(url: &str) -> (String, PgUrlSsl) {
724 let (base, query) = match url.find('?') {
725 Some(idx) => (&url[..idx], &url[idx + 1..]),
726 None => return (url.to_string(), PgUrlSsl { use_tls: false }),
727 };
728
729 let mut use_tls = false;
730 let mut kept: Vec<String> = Vec::new();
731 for pair in query.split('&') {
732 if pair.is_empty() {
733 continue;
734 }
735 let (k, v) = match pair.find('=') {
736 Some(i) => (&pair[..i], &pair[i + 1..]),
737 None => (pair, ""),
738 };
739 match k {
740 "sslmode" => match v.to_ascii_lowercase().as_str() {
741 "disable" | "allow" => {
742 kept.push("sslmode=disable".to_string());
745 }
746 "prefer" => {
747 kept.push("sslmode=prefer".to_string());
749 }
750 "require" | "verify-ca" | "verify-full" | "" => {
751 use_tls = true;
756 kept.push("sslmode=require".to_string());
757 }
758 other => {
759 tracing::warn!(
760 "[pg] unknown sslmode='{other}' — defaulting to require + TLS"
761 );
762 use_tls = true;
763 kept.push("sslmode=require".to_string());
764 }
765 },
766 "sslrootcert" => {
767 if v != "system" && !v.is_empty() {
772 tracing::warn!(
773 "[pg] sslrootcert={v} ignored — native-tls uses system roots"
774 );
775 }
776 use_tls = true;
777 }
778 _ => {
779 kept.push(pair.to_string());
783 }
784 }
785 }
786
787 let cleaned = if kept.is_empty() {
788 base.to_string()
789 } else {
790 format!("{}?{}", base, kept.join("&"))
791 };
792 (cleaned, PgUrlSsl { use_tls })
793 }
794
795 #[cfg(test)]
796 mod url_tests {
797 use super::parse_pg_url_ssl;
798
799 #[test]
800 fn strips_libpq_only_sslmode_verify_full() {
801 let (cleaned, ssl) = parse_pg_url_ssl(
802 "postgres://u:p@h:5432/db?sslmode=verify-full&sslrootcert=system",
803 );
804 assert!(ssl.use_tls);
805 assert_eq!(cleaned, "postgres://u:p@h:5432/db?sslmode=require");
807 }
808
809 #[test]
810 fn passes_through_disable() {
811 let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslmode=disable");
812 assert!(!ssl.use_tls);
813 assert_eq!(cleaned, "postgres://h/db?sslmode=disable");
814 }
815
816 #[test]
817 fn no_query_string_no_tls() {
818 let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db");
819 assert!(!ssl.use_tls);
820 assert_eq!(cleaned, "postgres://h/db");
821 }
822
823 #[test]
824 fn unknown_params_pass_through() {
825 let (cleaned, _) = parse_pg_url_ssl(
826 "postgres://h/db?application_name=pylon&connect_timeout=5",
827 );
828 assert!(cleaned.contains("application_name=pylon"));
829 assert!(cleaned.contains("connect_timeout=5"));
830 }
831
832 #[test]
833 fn sslrootcert_alone_enables_tls() {
834 let (cleaned, ssl) = parse_pg_url_ssl(
837 "postgres://h/db?sslrootcert=system",
838 );
839 assert!(ssl.use_tls);
840 assert_eq!(cleaned, "postgres://h/db");
843 }
844 }
845
846 pub struct LivePostgresAdapter {
848 client: postgres::Client,
849 }
850
851 impl LivePostgresAdapter {
852 pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
860 &mut self.client
861 }
862
863 pub fn connect(url: &str) -> Result<Self, StorageError> {
881 let (cleaned, ssl) = parse_pg_url_ssl(url);
882 let result = if ssl.use_tls {
883 let connector =
884 native_tls::TlsConnector::new().map_err(|e| StorageError {
885 code: "PG_TLS_INIT_FAILED".into(),
886 message: format!("Failed to initialize TLS: {e}"),
887 })?;
888 let tls = postgres_native_tls::MakeTlsConnector::new(connector);
889 postgres::Client::connect(&cleaned, tls)
890 } else {
891 postgres::Client::connect(&cleaned, postgres::NoTls)
892 };
893 let client = result.map_err(|e| StorageError {
894 code: "PG_CONNECT_FAILED".into(),
895 message: format!("Failed to connect to Postgres: {e}"),
896 })?;
897 Ok(Self { client })
898 }
899
900 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
902 let table_rows = self
903 .client
904 .query(INTROSPECT_TABLES_SQL, &[])
905 .map_err(pg_err)?;
906
907 let mut tables = Vec::new();
908 for row in &table_rows {
909 let table_name: String = row.get(0);
910 let columns = self.read_columns(&table_name)?;
911 let indexes = self.read_indexes(&table_name)?;
912 tables.push(TableSnapshot {
913 name: table_name,
914 columns,
915 indexes,
916 });
917 }
918
919 Ok(SchemaSnapshot { tables })
920 }
921
922 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
923 let rows = self
924 .client
925 .query(INTROSPECT_COLUMNS_SQL, &[&table])
926 .map_err(pg_err)?;
927
928 let mut columns = Vec::new();
929 for row in &rows {
930 let name: String = row.get(0);
931 let data_type: String = row.get(1);
932 let is_nullable: String = row.get(2);
933 let is_pk: i64 = row.get(3);
934 columns.push(ColumnSnapshot {
935 name,
936 column_type: data_type,
937 notnull: is_nullable == "NO",
938 primary_key: is_pk > 0,
939 });
940 }
941 Ok(columns)
942 }
943
944 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
945 let rows = self
946 .client
947 .query(INTROSPECT_INDEXES_SQL, &[&table])
948 .map_err(pg_err)?;
949
950 let mut indexes = Vec::new();
951 for row in &rows {
952 let name: String = row.get(0);
953 let unique: bool = row.get(1);
954 let columns: Vec<String> = row.get(2);
955 indexes.push(IndexSnapshot {
956 name,
957 columns,
958 unique,
959 });
960 }
961 Ok(indexes)
962 }
963
964 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
966 let snapshot = self.read_schema()?;
967 Ok(crate::plan_from_snapshot(&snapshot, target))
968 }
969 }
970
971 impl StorageAdapter for LivePostgresAdapter {
972 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
973 Err(StorageError {
974 code: "PG_PLAN_NEEDS_MUTABLE".into(),
975 message: "Use plan_from_live() instead for live Postgres planning".into(),
976 })
977 }
978
979 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
980 Err(StorageError {
981 code: "PG_APPLY_USE_METHOD".into(),
982 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
983 })
984 }
985 }
986
987 impl LivePostgresAdapter {
988 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
990 let statements = plan_to_sql(plan)?;
991 for sql in &statements {
992 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
993 }
994 Ok(())
995 }
996
997 pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
1003 self.client.execute(sql, &[]).map_err(pg_err)
1004 }
1005
1006 pub fn insert(
1008 &mut self,
1009 entity: &str,
1010 data: &serde_json::Value,
1011 ) -> Result<String, StorageError> {
1012 let (sql, values) = build_insert_sql(entity, data)?;
1013 let id = match &values[0] {
1016 JsonParam::Text(s) => s.clone(),
1017 _ => {
1018 return Err(StorageError {
1019 code: "PG_INTERNAL".into(),
1020 message: "build_insert_sql produced non-text id param".into(),
1021 });
1022 }
1023 };
1024 let params = as_pg_params(&values);
1025 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1026 Ok(id)
1027 }
1028
1029 pub fn get_by_id(
1031 &mut self,
1032 entity: &str,
1033 id: &str,
1034 ) -> Result<Option<serde_json::Value>, StorageError> {
1035 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
1036 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
1037
1038 match rows.first() {
1039 Some(row) => Ok(Some(row_to_json(row))),
1040 None => Ok(None),
1041 }
1042 }
1043
1044 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
1046 let sql = format!("SELECT * FROM {}", quote_ident(entity));
1047 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
1048
1049 Ok(rows.iter().map(row_to_json).collect())
1050 }
1051
1052 pub fn list_after(
1056 &mut self,
1057 entity: &str,
1058 after: Option<&str>,
1059 limit: usize,
1060 ) -> Result<Vec<serde_json::Value>, StorageError> {
1061 let capped: i64 = limit.min(10_000) as i64;
1064 let sql = match after {
1065 Some(_) => format!(
1066 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
1067 quote_ident(entity)
1068 ),
1069 None => format!(
1070 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
1071 quote_ident(entity)
1072 ),
1073 };
1074 let rows = match after {
1075 Some(cursor) => self
1076 .client
1077 .query(sql.as_str(), &[&cursor, &capped])
1078 .map_err(pg_err)?,
1079 None => self
1080 .client
1081 .query(sql.as_str(), &[&capped])
1082 .map_err(pg_err)?,
1083 };
1084 Ok(rows.iter().map(row_to_json).collect())
1085 }
1086
1087 pub fn update(
1089 &mut self,
1090 entity: &str,
1091 id: &str,
1092 data: &serde_json::Value,
1093 ) -> Result<bool, StorageError> {
1094 let (sql, values) = build_update_sql(entity, id, data)?;
1095 let params = as_pg_params(&values);
1096 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1097 Ok(rows_affected > 0)
1098 }
1099
1100 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
1102 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1103 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
1104 Ok(rows_affected > 0)
1105 }
1106
1107 pub fn lookup_field(
1111 &mut self,
1112 entity: &str,
1113 field: &str,
1114 value: &str,
1115 ) -> Result<Option<serde_json::Value>, StorageError> {
1116 let sql = format!(
1117 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
1118 quote_ident(entity),
1119 quote_ident(field),
1120 );
1121 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
1122 Ok(rows.first().map(row_to_json))
1123 }
1124
1125 pub fn query_filtered(
1148 &mut self,
1149 entity: &str,
1150 filter: &serde_json::Value,
1151 valid_columns: &[String],
1152 ) -> Result<Vec<serde_json::Value>, StorageError> {
1153 let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
1154 let pg_params = as_pg_params(¶ms);
1155 let rows = self
1156 .client
1157 .query(sql.as_str(), &pg_params)
1158 .map_err(pg_err)?;
1159 Ok(rows.iter().map(row_to_json).collect())
1160 }
1161
1162 pub(crate) fn build_query_filtered_sql(
1169 entity: &str,
1170 filter: &serde_json::Value,
1171 valid_columns: &[String],
1172 ) -> Result<(String, Vec<JsonParam>), StorageError> {
1173 let empty = serde_json::Map::new();
1174 let obj = filter.as_object().unwrap_or(&empty);
1175
1176 let validate = |col: &str| -> Result<(), StorageError> {
1177 if col == "id" || valid_columns.iter().any(|c| c == col) {
1178 Ok(())
1179 } else {
1180 Err(StorageError {
1181 code: "UNKNOWN_COLUMN".into(),
1182 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1183 })
1184 }
1185 };
1186
1187 let mut where_clauses: Vec<String> = Vec::new();
1188 let mut order_clause = String::new();
1189 let mut limit_clause = String::new();
1190 let mut offset_clause = String::new();
1191 let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1195
1196 for (key, val) in obj {
1197 match key.as_str() {
1198 "$search" => {
1199 let raw = match val {
1207 serde_json::Value::String(s) => s.clone(),
1208 other => other.to_string(),
1209 };
1210 let placeholder_n = planned.len() + 1;
1219 where_clauses.push(format!(
1220 "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1221 WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1222 quote_ident(entity),
1223 ));
1224 planned.push((
1228 format!("__search_{}", planned.len()),
1229 "__INLINE__".into(),
1230 JsonParam::Text(raw),
1231 ));
1232 }
1233 "$order" => {
1234 if let Some(ord) = val.as_object() {
1235 let mut parts = Vec::new();
1236 for (col, dir) in ord {
1237 validate(col)?;
1238 let d = match dir.as_str().unwrap_or("asc") {
1239 "desc" | "DESC" => "DESC",
1240 _ => "ASC",
1241 };
1242 parts.push(format!("{} {d}", quote_ident(col)));
1243 }
1244 if !parts.is_empty() {
1245 order_clause = format!(" ORDER BY {}", parts.join(", "));
1246 }
1247 }
1248 }
1249 "$limit" => {
1250 if let Some(n) = val.as_u64() {
1251 limit_clause = format!(" LIMIT {}", n);
1252 }
1253 }
1254 "$offset" => {
1255 if let Some(n) = val.as_u64() {
1256 offset_clause = format!(" OFFSET {}", n);
1257 }
1258 }
1259 field => {
1260 validate(field)?;
1261 match val {
1262 serde_json::Value::Object(ops) => {
1263 for (op, v) in ops {
1264 match op.as_str() {
1265 "$not" => planned.push((
1266 field.into(),
1267 "!=".into(),
1268 value_to_pg(v),
1269 )),
1270 "$gt" => {
1271 planned.push((field.into(), ">".into(), value_to_pg(v)))
1272 }
1273 "$gte" => planned.push((
1274 field.into(),
1275 ">=".into(),
1276 value_to_pg(v),
1277 )),
1278 "$lt" => {
1279 planned.push((field.into(), "<".into(), value_to_pg(v)))
1280 }
1281 "$lte" => planned.push((
1282 field.into(),
1283 "<=".into(),
1284 value_to_pg(v),
1285 )),
1286 "$like" => {
1287 let raw = match v {
1298 serde_json::Value::String(s) => s.clone(),
1299 other => other.to_string(),
1300 };
1301 planned.push((
1302 field.into(),
1303 "LIKE".into(),
1304 JsonParam::Text(format!("%{raw}%")),
1305 ));
1306 }
1307 "$in" => {
1308 if let Some(arr) = v.as_array() {
1309 if arr.is_empty() {
1310 where_clauses.push("FALSE".into());
1321 } else {
1322 let placeholders: Vec<String> = (0..arr.len())
1323 .map(|i| {
1324 format!("${}", planned.len() + 1 + i)
1325 })
1326 .collect();
1327 where_clauses.push(format!(
1328 "{} IN ({})",
1329 quote_ident(field),
1330 placeholders.join(", "),
1331 ));
1332 for x in arr {
1333 planned.push((
1334 format!("__inline_{}", planned.len()),
1335 "__INLINE__".into(),
1336 value_to_pg(x),
1337 ));
1338 }
1339 }
1340 }
1341 }
1342 _ => {}
1343 }
1344 }
1345 }
1346 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1347 }
1348 }
1349 }
1350 }
1351
1352 let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1354 for (field, op, v) in &planned {
1355 if op == "__INLINE__" {
1356 params.push(v.clone());
1358 } else {
1359 let placeholder = format!("${}", params.len() + 1);
1360 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1361 params.push(v.clone());
1362 }
1363 }
1364
1365 let where_sql = if where_clauses.is_empty() {
1366 String::new()
1367 } else {
1368 format!(" WHERE {}", where_clauses.join(" AND "))
1369 };
1370 let final_order = if order_clause.is_empty() {
1375 format!(" ORDER BY {}", quote_ident("id"))
1376 } else {
1377 order_clause
1378 };
1379 let sql = format!(
1380 "SELECT * FROM {}{}{}{}{}",
1381 quote_ident(entity),
1382 where_sql,
1383 final_order,
1384 limit_clause,
1385 offset_clause,
1386 );
1387
1388 Ok((sql, params))
1389 }
1390
1391 pub fn aggregate(
1410 &mut self,
1411 entity: &str,
1412 spec: &serde_json::Value,
1413 valid_columns: &[String],
1414 ) -> Result<serde_json::Value, StorageError> {
1415 let (sql, params, column_names) =
1416 Self::build_aggregate_sql(entity, spec, valid_columns)?;
1417 let pg_params = as_pg_params(¶ms);
1418 let rows = self
1419 .client
1420 .query(sql.as_str(), &pg_params)
1421 .map_err(pg_err)?;
1422 Ok(aggregate_rows_to_json(&rows, &column_names))
1423 }
1424
1425 pub(crate) fn build_aggregate_sql(
1432 entity: &str,
1433 spec: &serde_json::Value,
1434 valid_columns: &[String],
1435 ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1436 let obj = spec.as_object().ok_or_else(|| StorageError {
1437 code: "INVALID_QUERY".into(),
1438 message: "aggregate spec must be a JSON object".into(),
1439 })?;
1440
1441 let validate = |col: &str| -> Result<(), StorageError> {
1442 if col == "id" || valid_columns.iter().any(|c| c == col) {
1443 Ok(())
1444 } else {
1445 Err(StorageError {
1446 code: "UNKNOWN_COLUMN".into(),
1447 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1448 })
1449 }
1450 };
1451
1452 let mut select_parts: Vec<String> = Vec::new();
1453 let mut result_fields: Vec<String> = Vec::new();
1454
1455 if let Some(count) = obj.get("count") {
1456 match count {
1457 serde_json::Value::String(s) if s == "*" => {
1458 select_parts.push("COUNT(*) AS count".into());
1459 result_fields.push("count".into());
1460 }
1461 serde_json::Value::String(field) => {
1462 validate(field)?;
1463 let alias = format!("count_{field}");
1464 select_parts.push(format!(
1465 "COUNT({}) AS {}",
1466 quote_ident(field),
1467 quote_ident(&alias),
1468 ));
1469 result_fields.push(alias);
1470 }
1471 _ => {}
1472 }
1473 }
1474
1475 for (fn_name, prefix) in [
1476 ("sum", "sum_"),
1477 ("avg", "avg_"),
1478 ("min", "min_"),
1479 ("max", "max_"),
1480 ] {
1481 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1482 for field in fields {
1483 if let Some(f) = field.as_str() {
1484 validate(f)?;
1485 let alias = format!("{prefix}{f}");
1486 let sql_fn = fn_name.to_uppercase();
1487 select_parts.push(format!(
1488 "{}({}) AS {}",
1489 sql_fn,
1490 quote_ident(f),
1491 quote_ident(&alias),
1492 ));
1493 result_fields.push(alias);
1494 }
1495 }
1496 }
1497 }
1498
1499 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1500 for field in fields {
1501 if let Some(f) = field.as_str() {
1502 validate(f)?;
1503 let alias = format!("count_distinct_{f}");
1504 select_parts.push(format!(
1505 "COUNT(DISTINCT {}) AS {}",
1506 quote_ident(f),
1507 quote_ident(&alias),
1508 ));
1509 result_fields.push(alias);
1510 }
1511 }
1512 }
1513
1514 let mut group_by: Vec<String> = Vec::new();
1519 let mut group_select: Vec<String> = Vec::new();
1520 let mut group_field_names: Vec<String> = Vec::new();
1521 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1522 for g in groups {
1523 if let Some(f) = g.as_str() {
1524 validate(f)?;
1525 let q = quote_ident(f);
1526 group_by.push(q.clone());
1527 group_select.push(q);
1528 group_field_names.push(f.to_string());
1529 } else if let Some(spec) = g.as_object() {
1530 let field =
1531 spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1532 StorageError {
1533 code: "INVALID_QUERY".into(),
1534 message: "groupBy object spec requires `field`".into(),
1535 }
1536 })?;
1537 validate(field)?;
1538 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1539 let trunc_unit = match bucket {
1540 "hour" | "day" | "week" | "month" | "year" => bucket,
1541 _ => {
1542 return Err(StorageError {
1543 code: "INVALID_QUERY".into(),
1544 message: format!(
1545 "bucket must be one of hour/day/week/month/year, got {bucket}"
1546 ),
1547 });
1548 }
1549 };
1550 let alias = format!("{field}_{bucket}");
1551 let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1552 group_by.push(expr.clone());
1553 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1554 group_field_names.push(alias);
1555 }
1556 }
1557 }
1558
1559 let mut full_select = group_select.clone();
1560 full_select.extend(select_parts.iter().cloned());
1561 if full_select.is_empty() {
1562 return Err(StorageError {
1563 code: "INVALID_QUERY".into(),
1564 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1565 });
1566 }
1567
1568 let mut where_clauses: Vec<String> = Vec::new();
1569 let mut params: Vec<JsonParam> = Vec::new();
1570 if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1571 for (k, v) in w {
1572 validate(k)?;
1573 let placeholder = format!("${}", params.len() + 1);
1574 where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1575 params.push(value_to_pg(v));
1576 }
1577 }
1578 let where_sql = if where_clauses.is_empty() {
1579 String::new()
1580 } else {
1581 format!(" WHERE {}", where_clauses.join(" AND "))
1582 };
1583 let group_sql = if group_by.is_empty() {
1584 String::new()
1585 } else {
1586 format!(" GROUP BY {}", group_by.join(", "))
1587 };
1588
1589 let sql = format!(
1590 "SELECT {} FROM {}{}{}",
1591 full_select.join(", "),
1592 quote_ident(entity),
1593 where_sql,
1594 group_sql,
1595 );
1596
1597 let column_names: Vec<String> = group_field_names
1598 .iter()
1599 .chain(result_fields.iter())
1600 .cloned()
1601 .collect();
1602
1603 Ok((sql, params, column_names))
1604 }
1605 }
1606
1607 pub fn aggregate_rows_to_json(
1612 rows: &[postgres::Row],
1613 column_names: &[String],
1614 ) -> serde_json::Value {
1615 let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1616 for row in rows {
1617 let row_json = row_to_json(row);
1618 if let serde_json::Value::Object(map) = &row_json {
1619 let mut filtered = serde_json::Map::new();
1620 for name in column_names {
1621 if let Some(v) = map.get(name) {
1622 filtered.insert(name.clone(), v.clone());
1623 }
1624 }
1625 out.push(serde_json::Value::Object(filtered));
1626 } else {
1627 out.push(row_json);
1628 }
1629 }
1630 serde_json::json!({ "rows": out })
1631 }
1632
1633 pub enum TxOp<'a> {
1635 Insert {
1636 entity: &'a str,
1637 data: &'a serde_json::Value,
1638 },
1639 Update {
1640 entity: &'a str,
1641 id: &'a str,
1642 data: &'a serde_json::Value,
1643 },
1644 Delete {
1645 entity: &'a str,
1646 id: &'a str,
1647 },
1648 }
1649
1650 #[derive(Debug, Clone)]
1652 pub enum TxResult {
1653 Inserted(String),
1654 Updated(bool),
1655 Deleted(bool),
1656 }
1657
1658 impl LivePostgresAdapter {
1659 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1664 let mut tx = self.client.transaction().map_err(pg_err)?;
1665 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1666
1667 for op in ops {
1668 match op {
1669 TxOp::Insert { entity, data } => {
1670 let (sql, values) = build_insert_sql(entity, data)?;
1671 let id = match &values[0] {
1672 JsonParam::Text(s) => s.clone(),
1673 _ => {
1674 return Err(StorageError {
1675 code: "PG_INTERNAL".into(),
1676 message: "build_insert_sql produced non-text id param".into(),
1677 });
1678 }
1679 };
1680 let params = as_pg_params(&values);
1681 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1682 results.push(TxResult::Inserted(id));
1683 }
1684 TxOp::Update { entity, id, data } => {
1685 let (sql, values) = build_update_sql(entity, id, data)?;
1686 let params = as_pg_params(&values);
1687 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1688 results.push(TxResult::Updated(n > 0));
1689 }
1690 TxOp::Delete { entity, id } => {
1691 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1692 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1693 results.push(TxResult::Deleted(n > 0));
1694 }
1695 }
1696 }
1697
1698 tx.commit().map_err(pg_err)?;
1699 Ok(results)
1700 }
1701 }
1702
1703 fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1709 JsonParam::from_json(v)
1710 }
1711
1712 pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1713 use postgres::types::Type;
1714 let mut obj = serde_json::Map::new();
1715 for (i, col) in row.columns().iter().enumerate() {
1716 let name = col.name().to_string();
1717
1718 let value: serde_json::Value = match *col.type_() {
1729 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1730 .flatten()
1731 .map(serde_json::Value::Bool)
1732 .unwrap_or(serde_json::Value::Null),
1733 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1734 .flatten()
1735 .map(|v| serde_json::Value::Number(v.into()))
1736 .unwrap_or(serde_json::Value::Null),
1737 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1738 .flatten()
1739 .map(|v| serde_json::Value::Number(v.into()))
1740 .unwrap_or(serde_json::Value::Null),
1741 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1742 .flatten()
1743 .map(|v| serde_json::Value::Number(v.into()))
1744 .unwrap_or(serde_json::Value::Null),
1745 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1746 .flatten()
1747 .and_then(|v| serde_json::Number::from_f64(v as f64))
1748 .map(serde_json::Value::Number)
1749 .unwrap_or(serde_json::Value::Null),
1750 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1751 .flatten()
1752 .and_then(serde_json::Number::from_f64)
1753 .map(serde_json::Value::Number)
1754 .unwrap_or(serde_json::Value::Null),
1755 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1756 .flatten()
1757 .unwrap_or(serde_json::Value::Null),
1758 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1759 .flatten()
1760 .map(|b| serde_json::Value::String(b64(&b)))
1761 .unwrap_or(serde_json::Value::Null),
1762 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1763 try_get_or_null::<Option<String>>(row, i)
1764 .flatten()
1765 .map(serde_json::Value::String)
1766 .unwrap_or(serde_json::Value::Null)
1767 }
1768 Type::TIMESTAMPTZ => {
1769 try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1776 .flatten()
1777 .map(|dt| {
1778 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1779 })
1780 .unwrap_or(serde_json::Value::Null)
1781 }
1782 Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1783 .flatten()
1784 .map(|dt| {
1785 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1786 })
1787 .unwrap_or(serde_json::Value::Null),
1788 Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1789 .flatten()
1790 .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1791 .unwrap_or(serde_json::Value::Null),
1792 _ => {
1793 match row.try_get::<_, Option<String>>(i) {
1798 Ok(Some(s)) => serde_json::Value::String(s),
1799 Ok(None) => serde_json::Value::Null,
1800 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1801 Ok(Some(bytes)) => serde_json::Value::String(
1802 String::from_utf8_lossy(&bytes).into_owned(),
1803 ),
1804 _ => serde_json::Value::Null,
1805 },
1806 }
1807 }
1808 };
1809 obj.insert(name, value);
1810 }
1811 serde_json::Value::Object(obj)
1812 }
1813
1814 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1815 where
1816 T: postgres::types::FromSql<'a>,
1817 {
1818 match row.try_get::<_, T>(i) {
1819 Ok(v) => Some(v),
1820 Err(e) => {
1821 tracing::warn!(
1822 "[postgres] decode failed for column {} ({}): {e}",
1823 i,
1824 row.columns()[i].name()
1825 );
1826 None
1827 }
1828 }
1829 }
1830
1831 fn b64(bytes: &[u8]) -> String {
1834 const TABLE: &[u8; 64] =
1835 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1836 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1837 let chunks = bytes.chunks(3);
1838 for chunk in chunks {
1839 let b = [
1840 chunk.first().copied().unwrap_or(0),
1841 chunk.get(1).copied().unwrap_or(0),
1842 chunk.get(2).copied().unwrap_or(0),
1843 ];
1844 out.push(TABLE[(b[0] >> 2) as usize] as char);
1845 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1846 if chunk.len() > 1 {
1847 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1848 } else {
1849 out.push('=');
1850 }
1851 if chunk.len() > 2 {
1852 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1853 } else {
1854 out.push('=');
1855 }
1856 }
1857 out
1858 }
1859
1860 fn pg_err(e: postgres::Error) -> StorageError {
1861 use std::error::Error;
1867 let mut detail = format!("{e}");
1868 let mut src: Option<&dyn Error> = e.source();
1869 while let Some(s) = src {
1870 detail.push_str(": ");
1871 detail.push_str(&format!("{s}"));
1872 src = s.source();
1873 }
1874 StorageError {
1875 code: "PG_QUERY_FAILED".into(),
1876 message: format!("Postgres query failed: {detail}"),
1877 }
1878 }
1879}
1880
1881#[cfg(test)]
1886mod tests {
1887 use super::*;
1888
1889 fn test_manifest() -> AppManifest {
1893 use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1894 let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1895 name: name.into(),
1896 field_type: ty.into(),
1897 optional: opt,
1898 unique: uniq,
1899 crdt: None,
1900 };
1901 AppManifest {
1902 manifest_version: 1,
1903 name: "test".into(),
1904 version: "0.0.0".into(),
1905 entities: vec![
1906 ManifestEntity {
1907 name: "User".into(),
1908 fields: vec![
1909 f("email", "string", false, true),
1910 f("displayName", "string", false, false),
1911 f("createdAt", "datetime", false, false),
1912 ],
1913 indexes: vec![],
1914 relations: vec![],
1915 search: None,
1916 crdt: true,
1917 },
1918 ManifestEntity {
1919 name: "Todo".into(),
1920 fields: vec![
1921 f("title", "string", false, false),
1922 f("done", "bool", false, false),
1923 f("userId", "id(User)", false, false),
1924 f("createdAt", "datetime", false, false),
1925 ],
1926 indexes: vec![ManifestIndex {
1927 name: "by_user".into(),
1928 fields: vec!["userId".into()],
1929 unique: false,
1930 }],
1931 relations: vec![],
1932 search: None,
1933 crdt: true,
1934 },
1935 ],
1936 queries: vec![],
1937 actions: vec![],
1938 policies: vec![],
1939 routes: vec![],
1940 auth: Default::default(),
1941 }
1942 }
1943
1944 #[test]
1945 fn pg_type_mapping() {
1946 assert_eq!(pg_column_type("string"), "TEXT");
1947 assert_eq!(pg_column_type("int"), "INTEGER");
1948 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1949 assert_eq!(pg_column_type("bool"), "BOOLEAN");
1950 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1951 assert_eq!(pg_column_type("richtext"), "TEXT");
1952 assert_eq!(pg_column_type("id(User)"), "TEXT");
1953 }
1954
1955 #[test]
1956 fn quote_ident_simple() {
1957 assert_eq!(quote_ident("User"), "\"User\"");
1958 assert_eq!(quote_ident("email"), "\"email\"");
1959 }
1960
1961 #[test]
1962 fn quote_ident_escapes_embedded_double_quotes() {
1963 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1964 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1965 }
1966
1967 #[test]
1968 fn create_table_sql_basic() {
1969 let fields = vec![
1970 FieldSpec {
1971 name: "email".into(),
1972 field_type: "string".into(),
1973 optional: false,
1974 unique: true,
1975 },
1976 FieldSpec {
1977 name: "age".into(),
1978 field_type: "int".into(),
1979 optional: true,
1980 unique: false,
1981 },
1982 ];
1983 let sql = create_table_sql("User", &fields);
1984 assert_eq!(
1985 sql,
1986 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1987 );
1988 }
1989
1990 #[test]
1991 fn create_table_sql_escapes_identifiers() {
1992 let fields = vec![FieldSpec {
1993 name: "col\"x".into(),
1994 field_type: "string".into(),
1995 optional: false,
1996 unique: false,
1997 }];
1998 let sql = create_table_sql("my\"table", &fields);
1999 assert!(sql.contains("\"my\"\"table\""));
2000 assert!(sql.contains("\"col\"\"x\""));
2001 }
2002
2003 #[test]
2004 fn create_index_sql_unique() {
2005 let sql = create_index_sql("User", "by_email", &["email".into()], true);
2006 assert_eq!(
2007 sql,
2008 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
2009 );
2010 }
2011
2012 #[test]
2013 fn create_index_sql_non_unique() {
2014 let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
2015 assert_eq!(
2016 sql,
2017 "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
2018 );
2019 }
2020
2021 #[test]
2022 fn add_column_sql_basic() {
2023 let field = FieldSpec {
2024 name: "bio".into(),
2025 field_type: "string".into(),
2026 optional: true,
2027 unique: false,
2028 };
2029 let sql = add_column_sql("User", &field);
2030 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
2031 }
2032
2033 #[test]
2034 fn plan_from_manifest() {
2035 let adapter = PostgresAdapter;
2036 let manifest = test_manifest();
2037 let plan = adapter.plan_schema(&manifest).unwrap();
2038
2039 assert!(plan.operations.iter().any(|op| matches!(
2041 op,
2042 SchemaOperation::CreateEntity { name, .. } if name == "User"
2043 )));
2044 assert!(plan.operations.iter().any(|op| matches!(
2045 op,
2046 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2047 )));
2048 assert!(plan.operations.iter().any(|op| matches!(
2049 op,
2050 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2051 )));
2052 }
2053
2054 #[test]
2055 fn plan_to_sql_produces_statements() {
2056 let adapter = PostgresAdapter;
2057 let manifest = test_manifest();
2058 let plan = adapter.plan_schema(&manifest).unwrap();
2059 let stmts = plan_to_sql(&plan).unwrap();
2060
2061 let create_tables = stmts
2066 .iter()
2067 .filter(|s| s.starts_with("CREATE TABLE"))
2068 .count();
2069 let create_indexes = stmts
2070 .iter()
2071 .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
2072 .count();
2073 assert_eq!(create_tables, 2);
2074 assert!(create_indexes >= 1);
2075 assert!(stmts[0].starts_with("CREATE TABLE"));
2076 assert!(stmts[1].starts_with("CREATE TABLE"));
2077 }
2078
2079 #[test]
2080 fn plan_to_sql_rejects_unsupported() {
2081 let plan = SchemaPlan {
2082 operations: vec![SchemaOperation::RemoveEntity {
2083 name: "User".into(),
2084 }],
2085 };
2086 let result = plan_to_sql(&plan);
2087 assert!(result.is_err());
2088 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
2089 }
2090
2091 #[test]
2092 fn apply_not_implemented() {
2093 let adapter = PostgresAdapter;
2094 let plan = SchemaPlan {
2095 operations: vec![SchemaOperation::Noop],
2096 };
2097 let result = adapter.apply_schema(&plan);
2098 assert!(result.is_err());
2099 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
2100 }
2101
2102 #[test]
2103 fn sql_uses_quoted_identifiers() {
2104 let fields = vec![FieldSpec {
2105 name: "createdAt".into(),
2106 field_type: "datetime".into(),
2107 optional: false,
2108 unique: false,
2109 }];
2110 let sql = create_table_sql("User", &fields);
2111 assert!(sql.contains("\"User\""));
2113 assert!(sql.contains("\"createdAt\""));
2114 assert!(sql.contains("TIMESTAMPTZ"));
2115 }
2116
2117 #[test]
2120 fn introspect_sql_constants_are_valid() {
2121 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
2123 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
2124 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
2125 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
2126 }
2127
2128 #[test]
2131 fn plan_from_empty_snapshot_creates_all() {
2132 let snapshot = crate::SchemaSnapshot { tables: vec![] };
2133 let manifest = test_manifest();
2134 let plan = plan_from_snapshot(&snapshot, &manifest);
2135
2136 assert!(plan.operations.iter().any(|op| matches!(
2137 op,
2138 SchemaOperation::CreateEntity { name, .. } if name == "User"
2139 )));
2140 assert!(plan.operations.iter().any(|op| matches!(
2141 op,
2142 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2143 )));
2144 assert!(plan.operations.iter().any(|op| matches!(
2145 op,
2146 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2147 )));
2148 }
2149
2150 #[test]
2151 fn plan_from_full_snapshot_is_noop() {
2152 let snapshot = crate::SchemaSnapshot {
2153 tables: vec![
2154 crate::TableSnapshot {
2155 name: "User".into(),
2156 columns: vec![
2157 crate::ColumnSnapshot {
2158 name: "id".into(),
2159 column_type: "TEXT".into(),
2160 notnull: true,
2161 primary_key: true,
2162 },
2163 crate::ColumnSnapshot {
2164 name: "email".into(),
2165 column_type: "TEXT".into(),
2166 notnull: true,
2167 primary_key: false,
2168 },
2169 crate::ColumnSnapshot {
2170 name: "displayName".into(),
2171 column_type: "TEXT".into(),
2172 notnull: true,
2173 primary_key: false,
2174 },
2175 crate::ColumnSnapshot {
2176 name: "createdAt".into(),
2177 column_type: "TIMESTAMPTZ".into(),
2178 notnull: true,
2179 primary_key: false,
2180 },
2181 ],
2182 indexes: vec![],
2183 },
2184 crate::TableSnapshot {
2185 name: "Todo".into(),
2186 columns: vec![
2187 crate::ColumnSnapshot {
2188 name: "id".into(),
2189 column_type: "TEXT".into(),
2190 notnull: true,
2191 primary_key: true,
2192 },
2193 crate::ColumnSnapshot {
2194 name: "title".into(),
2195 column_type: "TEXT".into(),
2196 notnull: true,
2197 primary_key: false,
2198 },
2199 crate::ColumnSnapshot {
2200 name: "done".into(),
2201 column_type: "BOOLEAN".into(),
2202 notnull: true,
2203 primary_key: false,
2204 },
2205 crate::ColumnSnapshot {
2206 name: "userId".into(),
2207 column_type: "TEXT".into(),
2208 notnull: true,
2209 primary_key: false,
2210 },
2211 crate::ColumnSnapshot {
2212 name: "createdAt".into(),
2213 column_type: "TIMESTAMPTZ".into(),
2214 notnull: true,
2215 primary_key: false,
2216 },
2217 ],
2218 indexes: vec![crate::IndexSnapshot {
2219 name: "Todo_by_user".into(),
2220 columns: vec!["userId".into()],
2221 unique: false,
2222 }],
2223 },
2224 ],
2225 };
2226 let manifest = test_manifest();
2227 let plan = plan_from_snapshot(&snapshot, &manifest);
2228 assert!(plan.is_empty());
2229 }
2230
2231 #[test]
2232 fn plan_detects_missing_column_in_snapshot() {
2233 let snapshot = crate::SchemaSnapshot {
2234 tables: vec![
2235 crate::TableSnapshot {
2236 name: "User".into(),
2237 columns: vec![
2238 crate::ColumnSnapshot {
2239 name: "id".into(),
2240 column_type: "TEXT".into(),
2241 notnull: true,
2242 primary_key: true,
2243 },
2244 crate::ColumnSnapshot {
2245 name: "email".into(),
2246 column_type: "TEXT".into(),
2247 notnull: true,
2248 primary_key: false,
2249 },
2250 ],
2252 indexes: vec![],
2253 },
2254 crate::TableSnapshot {
2255 name: "Todo".into(),
2256 columns: vec![
2257 crate::ColumnSnapshot {
2258 name: "id".into(),
2259 column_type: "TEXT".into(),
2260 notnull: true,
2261 primary_key: true,
2262 },
2263 crate::ColumnSnapshot {
2264 name: "title".into(),
2265 column_type: "TEXT".into(),
2266 notnull: true,
2267 primary_key: false,
2268 },
2269 crate::ColumnSnapshot {
2270 name: "done".into(),
2271 column_type: "BOOLEAN".into(),
2272 notnull: true,
2273 primary_key: false,
2274 },
2275 crate::ColumnSnapshot {
2276 name: "userId".into(),
2277 column_type: "TEXT".into(),
2278 notnull: true,
2279 primary_key: false,
2280 },
2281 crate::ColumnSnapshot {
2282 name: "createdAt".into(),
2283 column_type: "TIMESTAMPTZ".into(),
2284 notnull: true,
2285 primary_key: false,
2286 },
2287 ],
2288 indexes: vec![crate::IndexSnapshot {
2289 name: "Todo_by_user".into(),
2290 columns: vec!["userId".into()],
2291 unique: false,
2292 }],
2293 },
2294 ],
2295 };
2296 let manifest = test_manifest();
2297 let plan = plan_from_snapshot(&snapshot, &manifest);
2298
2299 let add_fields: Vec<_> = plan
2300 .operations
2301 .iter()
2302 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2303 .collect();
2304 assert_eq!(add_fields.len(), 2); }
2306
2307 #[test]
2310 fn json_value_to_string_handles_all_types() {
2311 assert_eq!(
2312 json_value_to_string(&serde_json::Value::String("hello".into())),
2313 "hello"
2314 );
2315 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2316 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2317 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2318 assert_eq!(
2319 json_value_to_string(&serde_json::Value::Bool(false)),
2320 "false"
2321 );
2322 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2323 assert_eq!(
2325 json_value_to_string(&serde_json::json!([1, 2, 3])),
2326 "[1,2,3]"
2327 );
2328 assert_eq!(
2329 json_value_to_string(&serde_json::json!({"a": 1})),
2330 "{\"a\":1}"
2331 );
2332 }
2333
2334 #[test]
2335 fn generate_id_returns_hex_string() {
2336 let id = generate_id();
2337 assert!(!id.is_empty());
2338 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2340 }
2341
2342 #[test]
2343 fn generate_id_is_unique_across_calls() {
2344 let id1 = generate_id();
2345 let id2 = generate_id();
2346 assert_ne!(id1, id2);
2347 }
2348
2349 #[test]
2350 fn generate_id_is_lex_sortable() {
2351 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2354 let sorted = {
2355 let mut s = ids.clone();
2356 s.sort();
2357 s
2358 };
2359 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2360 let len0 = ids[0].len();
2363 assert!(ids.iter().all(|id| id.len() == len0));
2364 ids.dedup();
2365 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2366 }
2367
2368 #[test]
2369 fn build_insert_sql_simple() {
2370 let data = serde_json::json!({
2371 "email": "alice@example.com",
2372 "displayName": "Alice"
2373 });
2374 let (sql, values) = build_insert_sql("User", &data).unwrap();
2375
2376 assert!(sql.starts_with("INSERT INTO \"User\""));
2377 assert!(sql.contains("id"));
2378 assert!(sql.contains("$1"));
2379 assert!(sql.contains("$2"));
2380 assert!(sql.contains("$3"));
2381 match &values[0] {
2383 JsonParam::Text(s) => assert!(!s.is_empty()),
2384 other => panic!("expected Text id param, got {other:?}"),
2385 }
2386 assert_eq!(values.len(), 3); }
2388
2389 #[test]
2390 fn build_insert_sql_preserves_json_types() {
2391 let data = serde_json::json!({
2392 "n": 42,
2393 "f": 1.5,
2394 "b": true,
2395 "s": "hi",
2396 "z": null,
2397 });
2398 let (_sql, values) = build_insert_sql("T", &data).unwrap();
2399 let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2401 assert!(matches!(kinds[0], JsonParam::Bool(true)));
2402 assert!(matches!(kinds[1], JsonParam::Float(_)));
2403 assert!(matches!(kinds[2], JsonParam::Int(42)));
2404 assert!(matches!(kinds[3], JsonParam::Text(_)));
2405 assert!(matches!(kinds[4], JsonParam::Null));
2406 }
2407
2408 #[test]
2409 fn build_insert_sql_quotes_column_names() {
2410 let data = serde_json::json!({"createdAt": "2026-01-01"});
2411 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2412 assert!(sql.contains("\"createdAt\""));
2413 assert!(sql.contains("\"Todo\""));
2414 }
2415
2416 #[test]
2417 fn build_insert_sql_rejects_non_object() {
2418 let data = serde_json::json!("not an object");
2419 let result = build_insert_sql("User", &data);
2420 assert!(result.is_err());
2421 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2422 }
2423
2424 #[test]
2425 fn build_update_sql_simple() {
2426 let data = serde_json::json!({
2427 "displayName": "Bob",
2428 "email": "bob@example.com"
2429 });
2430 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2431
2432 assert!(sql.starts_with("UPDATE \"User\" SET"));
2433 assert!(sql.contains("WHERE id = $1"));
2434 assert!(sql.contains("$2"));
2435 assert!(sql.contains("$3"));
2436 match &values[0] {
2437 JsonParam::Text(s) => assert_eq!(s, "abc123"),
2438 other => panic!("expected Text id param, got {other:?}"),
2439 }
2440 assert_eq!(values.len(), 3); }
2442
2443 #[test]
2444 fn build_update_sql_quotes_column_names() {
2445 let data = serde_json::json!({"displayName": "Carol"});
2446 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2447 assert!(sql.contains("\"displayName\" = $2"));
2448 }
2449
2450 #[test]
2451 fn build_update_sql_rejects_non_object() {
2452 let data = serde_json::json!(42);
2453 let result = build_update_sql("User", "id1", &data);
2454 assert!(result.is_err());
2455 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2456 }
2457
2458 #[test]
2459 fn build_update_sql_rejects_empty_object() {
2460 let data = serde_json::json!({});
2461 let err = build_update_sql("User", "id1", &data).unwrap_err();
2462 assert_eq!(err.code, "PG_INVALID_DATA");
2463 assert!(err.message.contains("at least one field"));
2464 }
2465}