1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4fn pg_column_type(field_type: &str) -> &'static str {
17 match field_type {
18 "string" => "TEXT",
19 "int" => "INTEGER",
20 "float" => "DOUBLE PRECISION",
21 "bool" => "BOOLEAN",
22 "datetime" => "TIMESTAMPTZ",
23 "richtext" => "TEXT",
24 _ if field_type.starts_with("id(") => "TEXT",
25 _ => "TEXT",
26 }
27}
28
29pub(crate) fn quote_ident(name: &str) -> String {
37 format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46 quote_ident(name)
47}
48
49#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54 live::row_to_json(row)
55}
56
57#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64 entity: &str,
65 filter: &serde_json::Value,
66 valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68 live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76 entity: &str,
77 spec: &serde_json::Value,
78 valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80 live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87 rows: &[postgres::Row],
88 column_names: &[String],
89) -> serde_json::Value {
90 live::aggregate_rows_to_json(rows, column_names)
91}
92
93pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99 let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101 for field in fields {
102 let col_type = pg_column_type(&field.field_type);
103 let not_null = if field.optional { "" } else { " NOT NULL" };
104 let unique = if field.unique { " UNIQUE" } else { "" };
105 columns.push(format!(
106 "{} {}{}{}",
107 quote_ident(&field.name),
108 col_type,
109 not_null,
110 unique
111 ));
112 }
113
114 format!(
115 "CREATE TABLE IF NOT EXISTS {} ({})",
116 quote_ident(entity_name),
117 columns.join(", ")
118 )
119}
120
121pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125 let col_type = pg_column_type(&field.field_type);
126 let unique = if field.unique { " UNIQUE" } else { "" };
127 format!(
128 "ALTER TABLE {} ADD COLUMN {} {}{}",
129 quote_ident(entity_name),
130 quote_ident(&field.name),
131 col_type,
132 unique
133 )
134}
135
136pub fn create_index_sql(
138 entity_name: &str,
139 index_name: &str,
140 fields: &[String],
141 unique: bool,
142) -> String {
143 let unique_str = if unique { "UNIQUE " } else { "" };
144 let full_index_name = format!("{}_{}", entity_name, index_name);
145 let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146 format!(
147 "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148 unique_str,
149 quote_ident(&full_index_name),
150 quote_ident(entity_name),
151 quoted_fields.join(", ")
152 )
153}
154
155pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164 fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165 let mut operations = Vec::new();
167
168 for entity in &target.entities {
169 let fields: Vec<FieldSpec> = entity
170 .fields
171 .iter()
172 .map(|f| FieldSpec {
173 name: f.name.clone(),
174 field_type: f.field_type.clone(),
175 optional: f.optional,
176 unique: f.unique,
177 })
178 .collect();
179
180 operations.push(SchemaOperation::CreateEntity {
181 name: entity.name.clone(),
182 fields,
183 });
184
185 for index in &entity.indexes {
186 operations.push(SchemaOperation::AddIndex {
187 entity: entity.name.clone(),
188 name: index.name.clone(),
189 fields: index.fields.clone(),
190 unique: index.unique,
191 });
192 }
193 }
194
195 if operations.is_empty() {
196 operations.push(SchemaOperation::Noop);
197 }
198
199 Ok(SchemaPlan { operations })
200 }
201
202 }
204
205pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208 let mut statements = Vec::new();
209
210 for op in &plan.operations {
211 match op {
212 SchemaOperation::CreateEntity { name, fields } => {
213 statements.push(create_table_sql(name, fields));
214 }
215 SchemaOperation::AddField { entity, field } => {
216 statements.push(add_column_sql(entity, field));
217 }
218 SchemaOperation::AlterField {
219 entity,
220 previous,
221 target,
222 } => {
223 if previous.optional && !target.optional {
232 statements.push(format!(
233 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234 quote_ident(entity),
235 quote_ident(&target.name)
236 ));
237 } else if !previous.optional && target.optional {
238 statements.push(format!(
239 "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240 quote_ident(entity),
241 quote_ident(&target.name)
242 ));
243 }
244 }
250 SchemaOperation::AddIndex {
251 entity,
252 name,
253 fields,
254 unique,
255 } => {
256 statements.push(create_index_sql(entity, name, fields, *unique));
257 }
258 SchemaOperation::CreateSearchIndex { entity, config } => {
259 #[cfg(feature = "postgres-live")]
260 {
261 statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262 }
263 #[cfg(not(feature = "postgres-live"))]
264 {
265 let _ = (entity, config);
266 return Err(StorageError {
267 code: "PG_SEARCH_FEATURE_OFF".into(),
268 message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269 });
270 }
271 }
272 SchemaOperation::RemoveSearchIndex { entity } => {
273 statements.push(format!(
286 "DROP TABLE IF EXISTS {} CASCADE",
287 quote_ident(&format!("_fts_{entity}"))
288 ));
289 statements.push(format!(
290 "DROP INDEX IF EXISTS {}",
291 quote_ident(&format!("{entity}_fts_gin"))
292 ));
293 }
294 SchemaOperation::Noop => {}
295 other => {
296 return Err(StorageError {
297 code: "PG_OP_UNSUPPORTED".into(),
298 message: format!("Operation not supported by Postgres adapter: {other:?}"),
299 });
300 }
301 }
302 }
303
304 Ok(statements)
305}
306
307pub const INTROSPECT_TABLES_SQL: &str = "\
316 SELECT table_name \
317 FROM information_schema.tables \
318 WHERE table_schema = 'public' \
319 AND table_type = 'BASE TABLE' \
320 AND table_name NOT LIKE '_pylon_%' \
321 ORDER BY table_name";
322
323pub const INTROSPECT_COLUMNS_SQL: &str = "\
326 SELECT column_name, data_type, is_nullable, \
327 (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328 JOIN information_schema.key_column_usage kcu \
329 ON tc.constraint_name = kcu.constraint_name \
330 WHERE tc.table_name = c.table_name \
331 AND kcu.column_name = c.column_name \
332 AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333 FROM information_schema.columns c \
334 WHERE table_schema = 'public' AND table_name = $1 \
335 ORDER BY ordinal_position";
336
337pub const INTROSPECT_INDEXES_SQL: &str = "\
340 SELECT i.relname as index_name, \
341 ix.indisunique as is_unique, \
342 array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343 FROM pg_index ix \
344 JOIN pg_class t ON t.oid = ix.indrelid \
345 JOIN pg_class i ON i.oid = ix.indexrelid \
346 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347 JOIN pg_namespace n ON n.oid = t.relnamespace \
348 WHERE n.nspname = 'public' \
349 AND t.relname = $1 \
350 AND NOT ix.indisprimary \
351 GROUP BY i.relname, ix.indisunique \
352 ORDER BY i.relname";
353
354pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357 crate::plan_from_snapshot(snapshot, target)
358}
359
360pub fn generate_id() -> String {
373 use std::sync::atomic::{AtomicU32, Ordering};
374 use std::time::{SystemTime, UNIX_EPOCH};
375 static COUNTER: AtomicU32 = AtomicU32::new(0);
376 let ts = SystemTime::now()
377 .duration_since(UNIX_EPOCH)
378 .unwrap_or_default()
379 .as_nanos();
380 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381 format!("{ts:032x}{seq:08x}")
382}
383
384pub fn json_value_to_string(val: &serde_json::Value) -> String {
393 match val {
394 serde_json::Value::String(s) => s.clone(),
395 serde_json::Value::Number(n) => n.to_string(),
396 serde_json::Value::Bool(b) => b.to_string(),
397 serde_json::Value::Null => String::new(),
398 other => other.to_string(),
399 }
400}
401
402#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419 Null,
420 Text(String),
421 Int(i64),
422 Float(f64),
423 Bool(bool),
424}
425
426impl JsonParam {
427 pub fn from_json(val: &serde_json::Value) -> Self {
431 match val {
432 serde_json::Value::Null => JsonParam::Null,
433 serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434 serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435 serde_json::Value::Number(n) => {
436 if let Some(i) = n.as_i64() {
437 JsonParam::Int(i)
438 } else if let Some(f) = n.as_f64() {
439 JsonParam::Float(f)
440 } else {
441 JsonParam::Text(n.to_string())
442 }
443 }
444 other => JsonParam::Text(other.to_string()),
445 }
446 }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451 fn to_sql(
452 &self,
453 ty: &postgres::types::Type,
454 out: &mut bytes::BytesMut,
455 ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456 use postgres::types::Type;
457
458 if matches!(self, JsonParam::Null) {
462 return Ok(postgres::types::IsNull::Yes);
463 }
464
465 match (self, ty) {
472 (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474 (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475 (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476 (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477 (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478 (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480 (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481 (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482 (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483 (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485 (JsonParam::Text(s), &Type::TEXT)
486 | (JsonParam::Text(s), &Type::VARCHAR)
487 | (JsonParam::Text(s), &Type::BPCHAR)
488 | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489 (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490 let dt = chrono::DateTime::parse_from_rfc3339(s)
504 .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505 .with_timezone(&chrono::Utc);
506 dt.to_sql(ty, out)
507 }
508 (JsonParam::Text(s), &Type::TIMESTAMP) => {
509 let dt = chrono::DateTime::parse_from_rfc3339(s)
513 .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514 .with_timezone(&chrono::Utc)
515 .naive_utc();
516 dt.to_sql(ty, out)
517 }
518 (JsonParam::Text(s), &Type::DATE) => {
519 let dt = chrono::DateTime::parse_from_rfc3339(s)
520 .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521 .with_timezone(&chrono::Utc)
522 .date_naive();
523 dt.to_sql(ty, out)
524 }
525
526 (other, _) => {
531 let s = match other {
532 JsonParam::Bool(b) => b.to_string(),
533 JsonParam::Int(n) => n.to_string(),
534 JsonParam::Float(f) => f.to_string(),
535 JsonParam::Text(s) => s.clone(),
536 JsonParam::Null => unreachable!(),
537 };
538 s.to_sql(ty, out)
539 }
540 }
541 }
542
543 fn accepts(_ty: &postgres::types::Type) -> bool {
544 true
548 }
549
550 postgres::types::to_sql_checked!();
551}
552
553pub fn build_insert_sql(
561 entity: &str,
562 data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564 let obj = data.as_object().ok_or_else(|| StorageError {
565 code: "PG_INVALID_DATA".into(),
566 message: "Insert data must be a JSON object".into(),
567 })?;
568
569 let id = match obj.get("id") {
576 None | Some(serde_json::Value::Null) => generate_id(),
577 Some(serde_json::Value::String(s)) => s.clone(),
578 Some(other) => {
579 return Err(StorageError {
580 code: "PG_INVALID_ID".into(),
581 message: format!(
582 "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583 are always strings (40-char hex). Drop the `id` field to let the \
584 server generate one, or supply a string."
585 ),
586 });
587 }
588 };
589
590 let mut col_names = vec!["id".to_string()];
591 let mut placeholders = vec!["$1".to_string()];
592 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594 let mut i = 0usize;
595 for (key, val) in obj {
596 if key == "id" {
597 continue;
601 }
602 col_names.push(quote_ident(key));
603 placeholders.push(format!("${}", i + 2));
604 values.push(JsonParam::from_json(val));
605 i += 1;
606 }
607
608 let sql = format!(
609 "INSERT INTO {} ({}) VALUES ({})",
610 quote_ident(entity),
611 col_names.join(", "),
612 placeholders.join(", ")
613 );
614
615 Ok((sql, values))
616}
617
618pub fn build_update_sql(
621 entity: &str,
622 id: &str,
623 data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625 let obj = data.as_object().ok_or_else(|| StorageError {
626 code: "PG_INVALID_DATA".into(),
627 message: "Update data must be a JSON object".into(),
628 })?;
629
630 if obj.is_empty() {
631 return Err(StorageError {
632 code: "PG_INVALID_DATA".into(),
633 message: "Update data must contain at least one field".into(),
634 });
635 }
636
637 let mut set_clauses = Vec::new();
638 let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640 let mut i = 0usize;
641 for (key, val) in obj {
642 if key == "id" {
643 return Err(StorageError {
650 code: "PG_INVALID_UPDATE".into(),
651 message: "Updating the `id` column is not allowed — Pylon row ids are immutable; \
652 drop the field from the patch."
653 .into(),
654 });
655 }
656 set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
657 values.push(JsonParam::from_json(val));
658 i += 1;
659 }
660
661 if set_clauses.is_empty() {
662 return Err(StorageError {
663 code: "PG_INVALID_DATA".into(),
664 message: "Update data must contain at least one non-id field".into(),
665 });
666 }
667
668 let sql = format!(
669 "UPDATE {} SET {} WHERE id = $1",
670 quote_ident(entity),
671 set_clauses.join(", ")
672 );
673
674 Ok((sql, values))
675}
676
677#[cfg(feature = "postgres-live")]
681fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
682 values
683 .iter()
684 .map(|v| v as &(dyn postgres::types::ToSql + Sync))
685 .collect()
686}
687
688#[cfg(feature = "postgres-live")]
693pub mod live {
694 use super::*;
695 use crate::{
696 ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
697 };
698
699 pub fn connect_pg(url: &str) -> Result<postgres::Client, String> {
707 let (cleaned, ssl) = parse_pg_url_ssl(url);
708 if ssl.use_tls {
709 let mut roots = rustls::RootCertStore::empty();
710 let native_certs = rustls_native_certs::load_native_certs();
711 for cert in native_certs.certs {
712 let _ = roots.add(cert);
713 }
714 if !native_certs.errors.is_empty() {
715 tracing::warn!(
716 "[pg] rustls native cert load reported {} non-fatal errors",
717 native_certs.errors.len()
718 );
719 }
720 let config = rustls::ClientConfig::builder()
721 .with_root_certificates(roots)
722 .with_no_client_auth();
723 let tls = tokio_postgres_rustls::MakeRustlsConnect::new(config);
724 postgres::Client::connect(&cleaned, tls).map_err(|e| format!("PG connect: {e}"))
725 } else {
726 postgres::Client::connect(&cleaned, postgres::NoTls)
727 .map_err(|e| format!("PG connect: {e}"))
728 }
729 }
730
731 pub struct PgUrlSsl {
736 pub use_tls: bool,
737 }
738
739 pub fn parse_pg_url_ssl(url: &str) -> (String, PgUrlSsl) {
755 let (base, query) = match url.find('?') {
756 Some(idx) => (&url[..idx], &url[idx + 1..]),
757 None => return (url.to_string(), PgUrlSsl { use_tls: false }),
758 };
759
760 let mut use_tls = false;
761 let mut kept: Vec<String> = Vec::new();
762 for pair in query.split('&') {
763 if pair.is_empty() {
764 continue;
765 }
766 let (k, v) = match pair.find('=') {
767 Some(i) => (&pair[..i], &pair[i + 1..]),
768 None => (pair, ""),
769 };
770 match k {
771 "sslmode" => match v.to_ascii_lowercase().as_str() {
772 "disable" | "allow" => {
773 kept.push("sslmode=disable".to_string());
776 }
777 "prefer" => {
778 kept.push("sslmode=prefer".to_string());
780 }
781 "require" | "verify-ca" | "verify-full" | "" => {
782 use_tls = true;
787 kept.push("sslmode=require".to_string());
788 }
789 other => {
790 tracing::warn!(
791 "[pg] unknown sslmode='{other}' — defaulting to require + TLS"
792 );
793 use_tls = true;
794 kept.push("sslmode=require".to_string());
795 }
796 },
797 "sslrootcert" => {
798 if v != "system" && !v.is_empty() {
803 tracing::warn!(
804 "[pg] sslrootcert={v} ignored — native-tls uses system roots"
805 );
806 }
807 use_tls = true;
808 }
809 _ => {
810 kept.push(pair.to_string());
814 }
815 }
816 }
817
818 let cleaned = if kept.is_empty() {
819 base.to_string()
820 } else {
821 format!("{}?{}", base, kept.join("&"))
822 };
823 (cleaned, PgUrlSsl { use_tls })
824 }
825
826 #[cfg(test)]
827 mod url_tests {
828 use super::parse_pg_url_ssl;
829
830 #[test]
831 fn strips_libpq_only_sslmode_verify_full() {
832 let (cleaned, ssl) =
833 parse_pg_url_ssl("postgres://u:p@h:5432/db?sslmode=verify-full&sslrootcert=system");
834 assert!(ssl.use_tls);
835 assert_eq!(cleaned, "postgres://u:p@h:5432/db?sslmode=require");
837 }
838
839 #[test]
840 fn passes_through_disable() {
841 let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslmode=disable");
842 assert!(!ssl.use_tls);
843 assert_eq!(cleaned, "postgres://h/db?sslmode=disable");
844 }
845
846 #[test]
847 fn no_query_string_no_tls() {
848 let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db");
849 assert!(!ssl.use_tls);
850 assert_eq!(cleaned, "postgres://h/db");
851 }
852
853 #[test]
854 fn unknown_params_pass_through() {
855 let (cleaned, _) =
856 parse_pg_url_ssl("postgres://h/db?application_name=pylon&connect_timeout=5");
857 assert!(cleaned.contains("application_name=pylon"));
858 assert!(cleaned.contains("connect_timeout=5"));
859 }
860
861 #[test]
862 fn sslrootcert_alone_enables_tls() {
863 let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslrootcert=system");
866 assert!(ssl.use_tls);
867 assert_eq!(cleaned, "postgres://h/db");
870 }
871 }
872
873 pub struct LivePostgresAdapter {
875 client: postgres::Client,
876 }
877
878 impl LivePostgresAdapter {
879 pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
887 &mut self.client
888 }
889
890 pub fn connect(url: &str) -> Result<Self, StorageError> {
912 let client = connect_pg(url).map_err(|e| StorageError {
913 code: "PG_CONNECT_FAILED".into(),
914 message: format!("Failed to connect to Postgres: {e}"),
915 })?;
916 Ok(Self { client })
917 }
918
919 pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
921 let table_rows = self
922 .client
923 .query(INTROSPECT_TABLES_SQL, &[])
924 .map_err(pg_err)?;
925
926 let mut tables = Vec::new();
927 for row in &table_rows {
928 let table_name: String = row.get(0);
929 let columns = self.read_columns(&table_name)?;
930 let indexes = self.read_indexes(&table_name)?;
931 tables.push(TableSnapshot {
932 name: table_name,
933 columns,
934 indexes,
935 });
936 }
937
938 Ok(SchemaSnapshot { tables })
939 }
940
941 fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
942 let rows = self
943 .client
944 .query(INTROSPECT_COLUMNS_SQL, &[&table])
945 .map_err(pg_err)?;
946
947 let mut columns = Vec::new();
948 for row in &rows {
949 let name: String = row.get(0);
950 let data_type: String = row.get(1);
951 let is_nullable: String = row.get(2);
952 let is_pk: i64 = row.get(3);
953 columns.push(ColumnSnapshot {
954 name,
955 column_type: data_type,
956 notnull: is_nullable == "NO",
957 primary_key: is_pk > 0,
958 });
959 }
960 Ok(columns)
961 }
962
963 fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
964 let rows = self
965 .client
966 .query(INTROSPECT_INDEXES_SQL, &[&table])
967 .map_err(pg_err)?;
968
969 let mut indexes = Vec::new();
970 for row in &rows {
971 let name: String = row.get(0);
972 let unique: bool = row.get(1);
973 let columns: Vec<String> = row.get(2);
974 indexes.push(IndexSnapshot {
975 name,
976 columns,
977 unique,
978 });
979 }
980 Ok(indexes)
981 }
982
983 pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
985 let snapshot = self.read_schema()?;
986 Ok(crate::plan_from_snapshot(&snapshot, target))
987 }
988 }
989
990 impl StorageAdapter for LivePostgresAdapter {
991 fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
992 Err(StorageError {
993 code: "PG_PLAN_NEEDS_MUTABLE".into(),
994 message: "Use plan_from_live() instead for live Postgres planning".into(),
995 })
996 }
997
998 fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
999 Err(StorageError {
1000 code: "PG_APPLY_USE_METHOD".into(),
1001 message: "Use apply_plan() instead of the trait method for live Postgres".into(),
1002 })
1003 }
1004 }
1005
1006 impl LivePostgresAdapter {
1007 pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
1009 let statements = plan_to_sql(plan)?;
1010 for sql in &statements {
1011 self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
1012 }
1013 Ok(())
1014 }
1015
1016 pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
1022 self.client.execute(sql, &[]).map_err(pg_err)
1023 }
1024
1025 pub fn insert(
1027 &mut self,
1028 entity: &str,
1029 data: &serde_json::Value,
1030 ) -> Result<String, StorageError> {
1031 let (sql, values) = build_insert_sql(entity, data)?;
1032 let id = match &values[0] {
1035 JsonParam::Text(s) => s.clone(),
1036 _ => {
1037 return Err(StorageError {
1038 code: "PG_INTERNAL".into(),
1039 message: "build_insert_sql produced non-text id param".into(),
1040 });
1041 }
1042 };
1043 let params = as_pg_params(&values);
1044 self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1045 Ok(id)
1046 }
1047
1048 pub fn get_by_id(
1050 &mut self,
1051 entity: &str,
1052 id: &str,
1053 ) -> Result<Option<serde_json::Value>, StorageError> {
1054 let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
1055 let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
1056
1057 match rows.first() {
1058 Some(row) => Ok(Some(row_to_json(row))),
1059 None => Ok(None),
1060 }
1061 }
1062
1063 pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
1065 let sql = format!("SELECT * FROM {}", quote_ident(entity));
1066 let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
1067
1068 Ok(rows.iter().map(row_to_json).collect())
1069 }
1070
1071 pub fn list_after(
1075 &mut self,
1076 entity: &str,
1077 after: Option<&str>,
1078 limit: usize,
1079 ) -> Result<Vec<serde_json::Value>, StorageError> {
1080 let capped: i64 = limit.min(10_000) as i64;
1083 let sql = match after {
1084 Some(_) => format!(
1085 "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
1086 quote_ident(entity)
1087 ),
1088 None => format!(
1089 "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
1090 quote_ident(entity)
1091 ),
1092 };
1093 let rows = match after {
1094 Some(cursor) => self
1095 .client
1096 .query(sql.as_str(), &[&cursor, &capped])
1097 .map_err(pg_err)?,
1098 None => self
1099 .client
1100 .query(sql.as_str(), &[&capped])
1101 .map_err(pg_err)?,
1102 };
1103 Ok(rows.iter().map(row_to_json).collect())
1104 }
1105
1106 pub fn update(
1108 &mut self,
1109 entity: &str,
1110 id: &str,
1111 data: &serde_json::Value,
1112 ) -> Result<bool, StorageError> {
1113 let (sql, values) = build_update_sql(entity, id, data)?;
1114 let params = as_pg_params(&values);
1115 let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1116 Ok(rows_affected > 0)
1117 }
1118
1119 pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
1121 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1122 let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
1123 Ok(rows_affected > 0)
1124 }
1125
1126 pub fn lookup_field(
1130 &mut self,
1131 entity: &str,
1132 field: &str,
1133 value: &str,
1134 ) -> Result<Option<serde_json::Value>, StorageError> {
1135 let sql = format!(
1136 "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
1137 quote_ident(entity),
1138 quote_ident(field),
1139 );
1140 let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
1141 Ok(rows.first().map(row_to_json))
1142 }
1143
1144 pub fn query_filtered(
1167 &mut self,
1168 entity: &str,
1169 filter: &serde_json::Value,
1170 valid_columns: &[String],
1171 ) -> Result<Vec<serde_json::Value>, StorageError> {
1172 let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
1173 let pg_params = as_pg_params(¶ms);
1174 let rows = self
1175 .client
1176 .query(sql.as_str(), &pg_params)
1177 .map_err(pg_err)?;
1178 Ok(rows.iter().map(row_to_json).collect())
1179 }
1180
1181 pub(crate) fn build_query_filtered_sql(
1188 entity: &str,
1189 filter: &serde_json::Value,
1190 valid_columns: &[String],
1191 ) -> Result<(String, Vec<JsonParam>), StorageError> {
1192 let empty = serde_json::Map::new();
1193 let obj = filter.as_object().unwrap_or(&empty);
1194
1195 let validate = |col: &str| -> Result<(), StorageError> {
1196 if col == "id" || valid_columns.iter().any(|c| c == col) {
1197 Ok(())
1198 } else {
1199 Err(StorageError {
1200 code: "UNKNOWN_COLUMN".into(),
1201 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1202 })
1203 }
1204 };
1205
1206 let mut where_clauses: Vec<String> = Vec::new();
1207 let mut order_clause = String::new();
1208 let mut limit_clause = String::new();
1209 let mut offset_clause = String::new();
1210 let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1214
1215 for (key, val) in obj {
1216 match key.as_str() {
1217 "$search" => {
1218 let raw = match val {
1226 serde_json::Value::String(s) => s.clone(),
1227 other => other.to_string(),
1228 };
1229 let placeholder_n = planned.len() + 1;
1238 where_clauses.push(format!(
1239 "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1240 WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1241 quote_ident(entity),
1242 ));
1243 planned.push((
1247 format!("__search_{}", planned.len()),
1248 "__INLINE__".into(),
1249 JsonParam::Text(raw),
1250 ));
1251 }
1252 "$order" => {
1253 if let Some(ord) = val.as_object() {
1254 let mut parts = Vec::new();
1255 for (col, dir) in ord {
1256 validate(col)?;
1257 let d = match dir.as_str().unwrap_or("asc") {
1258 "desc" | "DESC" => "DESC",
1259 _ => "ASC",
1260 };
1261 parts.push(format!("{} {d}", quote_ident(col)));
1262 }
1263 if !parts.is_empty() {
1264 order_clause = format!(" ORDER BY {}", parts.join(", "));
1265 }
1266 }
1267 }
1268 "$limit" => {
1269 if let Some(n) = val.as_u64() {
1270 limit_clause = format!(" LIMIT {}", n);
1271 }
1272 }
1273 "$offset" => {
1274 if let Some(n) = val.as_u64() {
1275 offset_clause = format!(" OFFSET {}", n);
1276 }
1277 }
1278 field => {
1279 validate(field)?;
1280 match val {
1281 serde_json::Value::Object(ops) => {
1282 for (op, v) in ops {
1283 match op.as_str() {
1284 "$not" => planned.push((
1285 field.into(),
1286 "!=".into(),
1287 value_to_pg(v),
1288 )),
1289 "$gt" => {
1290 planned.push((field.into(), ">".into(), value_to_pg(v)))
1291 }
1292 "$gte" => planned.push((
1293 field.into(),
1294 ">=".into(),
1295 value_to_pg(v),
1296 )),
1297 "$lt" => {
1298 planned.push((field.into(), "<".into(), value_to_pg(v)))
1299 }
1300 "$lte" => planned.push((
1301 field.into(),
1302 "<=".into(),
1303 value_to_pg(v),
1304 )),
1305 "$like" => {
1306 let raw = match v {
1317 serde_json::Value::String(s) => s.clone(),
1318 other => other.to_string(),
1319 };
1320 planned.push((
1321 field.into(),
1322 "LIKE".into(),
1323 JsonParam::Text(format!("%{raw}%")),
1324 ));
1325 }
1326 "$in" => {
1327 if let Some(arr) = v.as_array() {
1328 if arr.is_empty() {
1329 where_clauses.push("FALSE".into());
1340 } else {
1341 let placeholders: Vec<String> = (0..arr.len())
1342 .map(|i| {
1343 format!("${}", planned.len() + 1 + i)
1344 })
1345 .collect();
1346 where_clauses.push(format!(
1347 "{} IN ({})",
1348 quote_ident(field),
1349 placeholders.join(", "),
1350 ));
1351 for x in arr {
1352 planned.push((
1353 format!("__inline_{}", planned.len()),
1354 "__INLINE__".into(),
1355 value_to_pg(x),
1356 ));
1357 }
1358 }
1359 }
1360 }
1361 _ => {}
1362 }
1363 }
1364 }
1365 _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1366 }
1367 }
1368 }
1369 }
1370
1371 let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1373 for (field, op, v) in &planned {
1374 if op == "__INLINE__" {
1375 params.push(v.clone());
1377 } else {
1378 let placeholder = format!("${}", params.len() + 1);
1379 where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1380 params.push(v.clone());
1381 }
1382 }
1383
1384 let where_sql = if where_clauses.is_empty() {
1385 String::new()
1386 } else {
1387 format!(" WHERE {}", where_clauses.join(" AND "))
1388 };
1389 let final_order = if order_clause.is_empty() {
1394 format!(" ORDER BY {}", quote_ident("id"))
1395 } else {
1396 order_clause
1397 };
1398 let sql = format!(
1399 "SELECT * FROM {}{}{}{}{}",
1400 quote_ident(entity),
1401 where_sql,
1402 final_order,
1403 limit_clause,
1404 offset_clause,
1405 );
1406
1407 Ok((sql, params))
1408 }
1409
1410 pub fn aggregate(
1429 &mut self,
1430 entity: &str,
1431 spec: &serde_json::Value,
1432 valid_columns: &[String],
1433 ) -> Result<serde_json::Value, StorageError> {
1434 let (sql, params, column_names) =
1435 Self::build_aggregate_sql(entity, spec, valid_columns)?;
1436 let pg_params = as_pg_params(¶ms);
1437 let rows = self
1438 .client
1439 .query(sql.as_str(), &pg_params)
1440 .map_err(pg_err)?;
1441 Ok(aggregate_rows_to_json(&rows, &column_names))
1442 }
1443
1444 pub(crate) fn build_aggregate_sql(
1451 entity: &str,
1452 spec: &serde_json::Value,
1453 valid_columns: &[String],
1454 ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1455 let obj = spec.as_object().ok_or_else(|| StorageError {
1456 code: "INVALID_QUERY".into(),
1457 message: "aggregate spec must be a JSON object".into(),
1458 })?;
1459
1460 let validate = |col: &str| -> Result<(), StorageError> {
1461 if col == "id" || valid_columns.iter().any(|c| c == col) {
1462 Ok(())
1463 } else {
1464 Err(StorageError {
1465 code: "UNKNOWN_COLUMN".into(),
1466 message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1467 })
1468 }
1469 };
1470
1471 let mut select_parts: Vec<String> = Vec::new();
1472 let mut result_fields: Vec<String> = Vec::new();
1473
1474 if let Some(count) = obj.get("count") {
1475 match count {
1476 serde_json::Value::String(s) if s == "*" => {
1477 select_parts.push("COUNT(*) AS count".into());
1478 result_fields.push("count".into());
1479 }
1480 serde_json::Value::String(field) => {
1481 validate(field)?;
1482 let alias = format!("count_{field}");
1483 select_parts.push(format!(
1484 "COUNT({}) AS {}",
1485 quote_ident(field),
1486 quote_ident(&alias),
1487 ));
1488 result_fields.push(alias);
1489 }
1490 _ => {}
1491 }
1492 }
1493
1494 for (fn_name, prefix) in [
1495 ("sum", "sum_"),
1496 ("avg", "avg_"),
1497 ("min", "min_"),
1498 ("max", "max_"),
1499 ] {
1500 if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1501 for field in fields {
1502 if let Some(f) = field.as_str() {
1503 validate(f)?;
1504 let alias = format!("{prefix}{f}");
1505 let sql_fn = fn_name.to_uppercase();
1506 select_parts.push(format!(
1507 "{}({}) AS {}",
1508 sql_fn,
1509 quote_ident(f),
1510 quote_ident(&alias),
1511 ));
1512 result_fields.push(alias);
1513 }
1514 }
1515 }
1516 }
1517
1518 if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1519 for field in fields {
1520 if let Some(f) = field.as_str() {
1521 validate(f)?;
1522 let alias = format!("count_distinct_{f}");
1523 select_parts.push(format!(
1524 "COUNT(DISTINCT {}) AS {}",
1525 quote_ident(f),
1526 quote_ident(&alias),
1527 ));
1528 result_fields.push(alias);
1529 }
1530 }
1531 }
1532
1533 let mut group_by: Vec<String> = Vec::new();
1538 let mut group_select: Vec<String> = Vec::new();
1539 let mut group_field_names: Vec<String> = Vec::new();
1540 if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1541 for g in groups {
1542 if let Some(f) = g.as_str() {
1543 validate(f)?;
1544 let q = quote_ident(f);
1545 group_by.push(q.clone());
1546 group_select.push(q);
1547 group_field_names.push(f.to_string());
1548 } else if let Some(spec) = g.as_object() {
1549 let field =
1550 spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1551 StorageError {
1552 code: "INVALID_QUERY".into(),
1553 message: "groupBy object spec requires `field`".into(),
1554 }
1555 })?;
1556 validate(field)?;
1557 let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1558 let trunc_unit = match bucket {
1559 "hour" | "day" | "week" | "month" | "year" => bucket,
1560 _ => {
1561 return Err(StorageError {
1562 code: "INVALID_QUERY".into(),
1563 message: format!(
1564 "bucket must be one of hour/day/week/month/year, got {bucket}"
1565 ),
1566 });
1567 }
1568 };
1569 let alias = format!("{field}_{bucket}");
1570 let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1571 group_by.push(expr.clone());
1572 group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1573 group_field_names.push(alias);
1574 }
1575 }
1576 }
1577
1578 let mut full_select = group_select.clone();
1579 full_select.extend(select_parts.iter().cloned());
1580 if full_select.is_empty() {
1581 return Err(StorageError {
1582 code: "INVALID_QUERY".into(),
1583 message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1584 });
1585 }
1586
1587 let mut where_clauses: Vec<String> = Vec::new();
1588 let mut params: Vec<JsonParam> = Vec::new();
1589 if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1590 for (k, v) in w {
1591 validate(k)?;
1592 let placeholder = format!("${}", params.len() + 1);
1593 where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1594 params.push(value_to_pg(v));
1595 }
1596 }
1597 let where_sql = if where_clauses.is_empty() {
1598 String::new()
1599 } else {
1600 format!(" WHERE {}", where_clauses.join(" AND "))
1601 };
1602 let group_sql = if group_by.is_empty() {
1603 String::new()
1604 } else {
1605 format!(" GROUP BY {}", group_by.join(", "))
1606 };
1607
1608 let sql = format!(
1609 "SELECT {} FROM {}{}{}",
1610 full_select.join(", "),
1611 quote_ident(entity),
1612 where_sql,
1613 group_sql,
1614 );
1615
1616 let column_names: Vec<String> = group_field_names
1617 .iter()
1618 .chain(result_fields.iter())
1619 .cloned()
1620 .collect();
1621
1622 Ok((sql, params, column_names))
1623 }
1624 }
1625
1626 pub fn aggregate_rows_to_json(
1631 rows: &[postgres::Row],
1632 column_names: &[String],
1633 ) -> serde_json::Value {
1634 let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1635 for row in rows {
1636 let row_json = row_to_json(row);
1637 if let serde_json::Value::Object(map) = &row_json {
1638 let mut filtered = serde_json::Map::new();
1639 for name in column_names {
1640 if let Some(v) = map.get(name) {
1641 filtered.insert(name.clone(), v.clone());
1642 }
1643 }
1644 out.push(serde_json::Value::Object(filtered));
1645 } else {
1646 out.push(row_json);
1647 }
1648 }
1649 serde_json::json!({ "rows": out })
1650 }
1651
1652 pub enum TxOp<'a> {
1654 Insert {
1655 entity: &'a str,
1656 data: &'a serde_json::Value,
1657 },
1658 Update {
1659 entity: &'a str,
1660 id: &'a str,
1661 data: &'a serde_json::Value,
1662 },
1663 Delete {
1664 entity: &'a str,
1665 id: &'a str,
1666 },
1667 }
1668
1669 #[derive(Debug, Clone)]
1671 pub enum TxResult {
1672 Inserted(String),
1673 Updated(bool),
1674 Deleted(bool),
1675 }
1676
1677 impl LivePostgresAdapter {
1678 pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1683 let mut tx = self.client.transaction().map_err(pg_err)?;
1684 let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1685
1686 for op in ops {
1687 match op {
1688 TxOp::Insert { entity, data } => {
1689 let (sql, values) = build_insert_sql(entity, data)?;
1690 let id = match &values[0] {
1691 JsonParam::Text(s) => s.clone(),
1692 _ => {
1693 return Err(StorageError {
1694 code: "PG_INTERNAL".into(),
1695 message: "build_insert_sql produced non-text id param".into(),
1696 });
1697 }
1698 };
1699 let params = as_pg_params(&values);
1700 tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1701 results.push(TxResult::Inserted(id));
1702 }
1703 TxOp::Update { entity, id, data } => {
1704 let (sql, values) = build_update_sql(entity, id, data)?;
1705 let params = as_pg_params(&values);
1706 let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
1707 results.push(TxResult::Updated(n > 0));
1708 }
1709 TxOp::Delete { entity, id } => {
1710 let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1711 let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1712 results.push(TxResult::Deleted(n > 0));
1713 }
1714 }
1715 }
1716
1717 tx.commit().map_err(pg_err)?;
1718 Ok(results)
1719 }
1720 }
1721
1722 fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1728 JsonParam::from_json(v)
1729 }
1730
1731 pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1732 use postgres::types::Type;
1733 let mut obj = serde_json::Map::new();
1734 for (i, col) in row.columns().iter().enumerate() {
1735 let name = col.name().to_string();
1736
1737 let value: serde_json::Value = match *col.type_() {
1748 Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1749 .flatten()
1750 .map(serde_json::Value::Bool)
1751 .unwrap_or(serde_json::Value::Null),
1752 Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1753 .flatten()
1754 .map(|v| serde_json::Value::Number(v.into()))
1755 .unwrap_or(serde_json::Value::Null),
1756 Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1757 .flatten()
1758 .map(|v| serde_json::Value::Number(v.into()))
1759 .unwrap_or(serde_json::Value::Null),
1760 Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1761 .flatten()
1762 .map(|v| serde_json::Value::Number(v.into()))
1763 .unwrap_or(serde_json::Value::Null),
1764 Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1765 .flatten()
1766 .and_then(|v| serde_json::Number::from_f64(v as f64))
1767 .map(serde_json::Value::Number)
1768 .unwrap_or(serde_json::Value::Null),
1769 Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1770 .flatten()
1771 .and_then(serde_json::Number::from_f64)
1772 .map(serde_json::Value::Number)
1773 .unwrap_or(serde_json::Value::Null),
1774 Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1775 .flatten()
1776 .unwrap_or(serde_json::Value::Null),
1777 Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1778 .flatten()
1779 .map(|b| serde_json::Value::String(b64(&b)))
1780 .unwrap_or(serde_json::Value::Null),
1781 Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1782 try_get_or_null::<Option<String>>(row, i)
1783 .flatten()
1784 .map(serde_json::Value::String)
1785 .unwrap_or(serde_json::Value::Null)
1786 }
1787 Type::TIMESTAMPTZ => {
1788 try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1795 .flatten()
1796 .map(|dt| {
1797 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1798 })
1799 .unwrap_or(serde_json::Value::Null)
1800 }
1801 Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1802 .flatten()
1803 .map(|dt| {
1804 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1805 })
1806 .unwrap_or(serde_json::Value::Null),
1807 Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1808 .flatten()
1809 .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1810 .unwrap_or(serde_json::Value::Null),
1811 _ => {
1812 match row.try_get::<_, Option<String>>(i) {
1817 Ok(Some(s)) => serde_json::Value::String(s),
1818 Ok(None) => serde_json::Value::Null,
1819 Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1820 Ok(Some(bytes)) => serde_json::Value::String(
1821 String::from_utf8_lossy(&bytes).into_owned(),
1822 ),
1823 _ => serde_json::Value::Null,
1824 },
1825 }
1826 }
1827 };
1828 obj.insert(name, value);
1829 }
1830 serde_json::Value::Object(obj)
1831 }
1832
1833 fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1834 where
1835 T: postgres::types::FromSql<'a>,
1836 {
1837 match row.try_get::<_, T>(i) {
1838 Ok(v) => Some(v),
1839 Err(e) => {
1840 tracing::warn!(
1841 "[postgres] decode failed for column {} ({}): {e}",
1842 i,
1843 row.columns()[i].name()
1844 );
1845 None
1846 }
1847 }
1848 }
1849
1850 fn b64(bytes: &[u8]) -> String {
1853 const TABLE: &[u8; 64] =
1854 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1855 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1856 let chunks = bytes.chunks(3);
1857 for chunk in chunks {
1858 let b = [
1859 chunk.first().copied().unwrap_or(0),
1860 chunk.get(1).copied().unwrap_or(0),
1861 chunk.get(2).copied().unwrap_or(0),
1862 ];
1863 out.push(TABLE[(b[0] >> 2) as usize] as char);
1864 out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1865 if chunk.len() > 1 {
1866 out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1867 } else {
1868 out.push('=');
1869 }
1870 if chunk.len() > 2 {
1871 out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1872 } else {
1873 out.push('=');
1874 }
1875 }
1876 out
1877 }
1878
1879 fn pg_err(e: postgres::Error) -> StorageError {
1880 use std::error::Error;
1886 let mut detail = format!("{e}");
1887 let mut src: Option<&dyn Error> = e.source();
1888 while let Some(s) = src {
1889 detail.push_str(": ");
1890 detail.push_str(&format!("{s}"));
1891 src = s.source();
1892 }
1893 StorageError {
1894 code: "PG_QUERY_FAILED".into(),
1895 message: format!("Postgres query failed: {detail}"),
1896 }
1897 }
1898}
1899
1900#[cfg(test)]
1905mod tests {
1906 use super::*;
1907
1908 fn test_manifest() -> AppManifest {
1912 use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1913 let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1914 name: name.into(),
1915 field_type: ty.into(),
1916 optional: opt,
1917 unique: uniq,
1918 crdt: None,
1919 };
1920 AppManifest {
1921 manifest_version: 1,
1922 name: "test".into(),
1923 version: "0.0.0".into(),
1924 entities: vec![
1925 ManifestEntity {
1926 name: "User".into(),
1927 fields: vec![
1928 f("email", "string", false, true),
1929 f("displayName", "string", false, false),
1930 f("createdAt", "datetime", false, false),
1931 ],
1932 indexes: vec![],
1933 relations: vec![],
1934 search: None,
1935 crdt: true,
1936 },
1937 ManifestEntity {
1938 name: "Todo".into(),
1939 fields: vec![
1940 f("title", "string", false, false),
1941 f("done", "bool", false, false),
1942 f("userId", "id(User)", false, false),
1943 f("createdAt", "datetime", false, false),
1944 ],
1945 indexes: vec![ManifestIndex {
1946 name: "by_user".into(),
1947 fields: vec!["userId".into()],
1948 unique: false,
1949 }],
1950 relations: vec![],
1951 search: None,
1952 crdt: true,
1953 },
1954 ],
1955 queries: vec![],
1956 actions: vec![],
1957 policies: vec![],
1958 routes: vec![],
1959 auth: Default::default(),
1960 }
1961 }
1962
1963 #[test]
1964 fn pg_type_mapping() {
1965 assert_eq!(pg_column_type("string"), "TEXT");
1966 assert_eq!(pg_column_type("int"), "INTEGER");
1967 assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1968 assert_eq!(pg_column_type("bool"), "BOOLEAN");
1969 assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1970 assert_eq!(pg_column_type("richtext"), "TEXT");
1971 assert_eq!(pg_column_type("id(User)"), "TEXT");
1972 }
1973
1974 #[test]
1975 fn quote_ident_simple() {
1976 assert_eq!(quote_ident("User"), "\"User\"");
1977 assert_eq!(quote_ident("email"), "\"email\"");
1978 }
1979
1980 #[test]
1981 fn quote_ident_escapes_embedded_double_quotes() {
1982 assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1983 assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1984 }
1985
1986 #[test]
1987 fn create_table_sql_basic() {
1988 let fields = vec![
1989 FieldSpec {
1990 name: "email".into(),
1991 field_type: "string".into(),
1992 optional: false,
1993 unique: true,
1994 },
1995 FieldSpec {
1996 name: "age".into(),
1997 field_type: "int".into(),
1998 optional: true,
1999 unique: false,
2000 },
2001 ];
2002 let sql = create_table_sql("User", &fields);
2003 assert_eq!(
2004 sql,
2005 "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
2006 );
2007 }
2008
2009 #[test]
2010 fn create_table_sql_escapes_identifiers() {
2011 let fields = vec![FieldSpec {
2012 name: "col\"x".into(),
2013 field_type: "string".into(),
2014 optional: false,
2015 unique: false,
2016 }];
2017 let sql = create_table_sql("my\"table", &fields);
2018 assert!(sql.contains("\"my\"\"table\""));
2019 assert!(sql.contains("\"col\"\"x\""));
2020 }
2021
2022 #[test]
2023 fn create_index_sql_unique() {
2024 let sql = create_index_sql("User", "by_email", &["email".into()], true);
2025 assert_eq!(
2026 sql,
2027 "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
2028 );
2029 }
2030
2031 #[test]
2032 fn create_index_sql_non_unique() {
2033 let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
2034 assert_eq!(
2035 sql,
2036 "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
2037 );
2038 }
2039
2040 #[test]
2041 fn add_column_sql_basic() {
2042 let field = FieldSpec {
2043 name: "bio".into(),
2044 field_type: "string".into(),
2045 optional: true,
2046 unique: false,
2047 };
2048 let sql = add_column_sql("User", &field);
2049 assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
2050 }
2051
2052 #[test]
2053 fn plan_from_manifest() {
2054 let adapter = PostgresAdapter;
2055 let manifest = test_manifest();
2056 let plan = adapter.plan_schema(&manifest).unwrap();
2057
2058 assert!(plan.operations.iter().any(|op| matches!(
2060 op,
2061 SchemaOperation::CreateEntity { name, .. } if name == "User"
2062 )));
2063 assert!(plan.operations.iter().any(|op| matches!(
2064 op,
2065 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2066 )));
2067 assert!(plan.operations.iter().any(|op| matches!(
2068 op,
2069 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2070 )));
2071 }
2072
2073 #[test]
2074 fn plan_to_sql_produces_statements() {
2075 let adapter = PostgresAdapter;
2076 let manifest = test_manifest();
2077 let plan = adapter.plan_schema(&manifest).unwrap();
2078 let stmts = plan_to_sql(&plan).unwrap();
2079
2080 let create_tables = stmts
2085 .iter()
2086 .filter(|s| s.starts_with("CREATE TABLE"))
2087 .count();
2088 let create_indexes = stmts
2089 .iter()
2090 .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
2091 .count();
2092 assert_eq!(create_tables, 2);
2093 assert!(create_indexes >= 1);
2094 assert!(stmts[0].starts_with("CREATE TABLE"));
2095 assert!(stmts[1].starts_with("CREATE TABLE"));
2096 }
2097
2098 #[test]
2099 fn plan_to_sql_rejects_unsupported() {
2100 let plan = SchemaPlan {
2101 operations: vec![SchemaOperation::RemoveEntity {
2102 name: "User".into(),
2103 }],
2104 };
2105 let result = plan_to_sql(&plan);
2106 assert!(result.is_err());
2107 assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
2108 }
2109
2110 #[test]
2111 fn apply_not_implemented() {
2112 let adapter = PostgresAdapter;
2113 let plan = SchemaPlan {
2114 operations: vec![SchemaOperation::Noop],
2115 };
2116 let result = adapter.apply_schema(&plan);
2117 assert!(result.is_err());
2118 assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
2119 }
2120
2121 #[test]
2122 fn sql_uses_quoted_identifiers() {
2123 let fields = vec![FieldSpec {
2124 name: "createdAt".into(),
2125 field_type: "datetime".into(),
2126 optional: false,
2127 unique: false,
2128 }];
2129 let sql = create_table_sql("User", &fields);
2130 assert!(sql.contains("\"User\""));
2132 assert!(sql.contains("\"createdAt\""));
2133 assert!(sql.contains("TIMESTAMPTZ"));
2134 }
2135
2136 #[test]
2139 fn introspect_sql_constants_are_valid() {
2140 assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
2142 assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
2143 assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
2144 assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
2145 }
2146
2147 #[test]
2150 fn plan_from_empty_snapshot_creates_all() {
2151 let snapshot = crate::SchemaSnapshot { tables: vec![] };
2152 let manifest = test_manifest();
2153 let plan = plan_from_snapshot(&snapshot, &manifest);
2154
2155 assert!(plan.operations.iter().any(|op| matches!(
2156 op,
2157 SchemaOperation::CreateEntity { name, .. } if name == "User"
2158 )));
2159 assert!(plan.operations.iter().any(|op| matches!(
2160 op,
2161 SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2162 )));
2163 assert!(plan.operations.iter().any(|op| matches!(
2164 op,
2165 SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2166 )));
2167 }
2168
2169 #[test]
2170 fn plan_from_full_snapshot_is_noop() {
2171 let snapshot = crate::SchemaSnapshot {
2172 tables: vec![
2173 crate::TableSnapshot {
2174 name: "User".into(),
2175 columns: vec![
2176 crate::ColumnSnapshot {
2177 name: "id".into(),
2178 column_type: "TEXT".into(),
2179 notnull: true,
2180 primary_key: true,
2181 },
2182 crate::ColumnSnapshot {
2183 name: "email".into(),
2184 column_type: "TEXT".into(),
2185 notnull: true,
2186 primary_key: false,
2187 },
2188 crate::ColumnSnapshot {
2189 name: "displayName".into(),
2190 column_type: "TEXT".into(),
2191 notnull: true,
2192 primary_key: false,
2193 },
2194 crate::ColumnSnapshot {
2195 name: "createdAt".into(),
2196 column_type: "TIMESTAMPTZ".into(),
2197 notnull: true,
2198 primary_key: false,
2199 },
2200 ],
2201 indexes: vec![],
2202 },
2203 crate::TableSnapshot {
2204 name: "Todo".into(),
2205 columns: vec![
2206 crate::ColumnSnapshot {
2207 name: "id".into(),
2208 column_type: "TEXT".into(),
2209 notnull: true,
2210 primary_key: true,
2211 },
2212 crate::ColumnSnapshot {
2213 name: "title".into(),
2214 column_type: "TEXT".into(),
2215 notnull: true,
2216 primary_key: false,
2217 },
2218 crate::ColumnSnapshot {
2219 name: "done".into(),
2220 column_type: "BOOLEAN".into(),
2221 notnull: true,
2222 primary_key: false,
2223 },
2224 crate::ColumnSnapshot {
2225 name: "userId".into(),
2226 column_type: "TEXT".into(),
2227 notnull: true,
2228 primary_key: false,
2229 },
2230 crate::ColumnSnapshot {
2231 name: "createdAt".into(),
2232 column_type: "TIMESTAMPTZ".into(),
2233 notnull: true,
2234 primary_key: false,
2235 },
2236 ],
2237 indexes: vec![crate::IndexSnapshot {
2238 name: "Todo_by_user".into(),
2239 columns: vec!["userId".into()],
2240 unique: false,
2241 }],
2242 },
2243 ],
2244 };
2245 let manifest = test_manifest();
2246 let plan = plan_from_snapshot(&snapshot, &manifest);
2247 assert!(plan.is_empty());
2248 }
2249
2250 #[test]
2251 fn plan_detects_missing_column_in_snapshot() {
2252 let snapshot = crate::SchemaSnapshot {
2253 tables: vec![
2254 crate::TableSnapshot {
2255 name: "User".into(),
2256 columns: vec![
2257 crate::ColumnSnapshot {
2258 name: "id".into(),
2259 column_type: "TEXT".into(),
2260 notnull: true,
2261 primary_key: true,
2262 },
2263 crate::ColumnSnapshot {
2264 name: "email".into(),
2265 column_type: "TEXT".into(),
2266 notnull: true,
2267 primary_key: false,
2268 },
2269 ],
2271 indexes: vec![],
2272 },
2273 crate::TableSnapshot {
2274 name: "Todo".into(),
2275 columns: vec![
2276 crate::ColumnSnapshot {
2277 name: "id".into(),
2278 column_type: "TEXT".into(),
2279 notnull: true,
2280 primary_key: true,
2281 },
2282 crate::ColumnSnapshot {
2283 name: "title".into(),
2284 column_type: "TEXT".into(),
2285 notnull: true,
2286 primary_key: false,
2287 },
2288 crate::ColumnSnapshot {
2289 name: "done".into(),
2290 column_type: "BOOLEAN".into(),
2291 notnull: true,
2292 primary_key: false,
2293 },
2294 crate::ColumnSnapshot {
2295 name: "userId".into(),
2296 column_type: "TEXT".into(),
2297 notnull: true,
2298 primary_key: false,
2299 },
2300 crate::ColumnSnapshot {
2301 name: "createdAt".into(),
2302 column_type: "TIMESTAMPTZ".into(),
2303 notnull: true,
2304 primary_key: false,
2305 },
2306 ],
2307 indexes: vec![crate::IndexSnapshot {
2308 name: "Todo_by_user".into(),
2309 columns: vec!["userId".into()],
2310 unique: false,
2311 }],
2312 },
2313 ],
2314 };
2315 let manifest = test_manifest();
2316 let plan = plan_from_snapshot(&snapshot, &manifest);
2317
2318 let add_fields: Vec<_> = plan
2319 .operations
2320 .iter()
2321 .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2322 .collect();
2323 assert_eq!(add_fields.len(), 2); }
2325
2326 #[test]
2329 fn json_value_to_string_handles_all_types() {
2330 assert_eq!(
2331 json_value_to_string(&serde_json::Value::String("hello".into())),
2332 "hello"
2333 );
2334 assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2335 assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2336 assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2337 assert_eq!(
2338 json_value_to_string(&serde_json::Value::Bool(false)),
2339 "false"
2340 );
2341 assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2342 assert_eq!(
2344 json_value_to_string(&serde_json::json!([1, 2, 3])),
2345 "[1,2,3]"
2346 );
2347 assert_eq!(
2348 json_value_to_string(&serde_json::json!({"a": 1})),
2349 "{\"a\":1}"
2350 );
2351 }
2352
2353 #[test]
2354 fn generate_id_returns_hex_string() {
2355 let id = generate_id();
2356 assert!(!id.is_empty());
2357 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2359 }
2360
2361 #[test]
2362 fn generate_id_is_unique_across_calls() {
2363 let id1 = generate_id();
2364 let id2 = generate_id();
2365 assert_ne!(id1, id2);
2366 }
2367
2368 #[test]
2369 fn generate_id_is_lex_sortable() {
2370 let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2373 let sorted = {
2374 let mut s = ids.clone();
2375 s.sort();
2376 s
2377 };
2378 assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2379 let len0 = ids[0].len();
2382 assert!(ids.iter().all(|id| id.len() == len0));
2383 ids.dedup();
2384 assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2385 }
2386
2387 #[test]
2388 fn build_insert_sql_simple() {
2389 let data = serde_json::json!({
2390 "email": "alice@example.com",
2391 "displayName": "Alice"
2392 });
2393 let (sql, values) = build_insert_sql("User", &data).unwrap();
2394
2395 assert!(sql.starts_with("INSERT INTO \"User\""));
2396 assert!(sql.contains("id"));
2397 assert!(sql.contains("$1"));
2398 assert!(sql.contains("$2"));
2399 assert!(sql.contains("$3"));
2400 match &values[0] {
2402 JsonParam::Text(s) => assert!(!s.is_empty()),
2403 other => panic!("expected Text id param, got {other:?}"),
2404 }
2405 assert_eq!(values.len(), 3); }
2407
2408 #[test]
2409 fn build_insert_sql_preserves_json_types() {
2410 let data = serde_json::json!({
2411 "n": 42,
2412 "f": 1.5,
2413 "b": true,
2414 "s": "hi",
2415 "z": null,
2416 });
2417 let (_sql, values) = build_insert_sql("T", &data).unwrap();
2418 let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2420 assert!(matches!(kinds[0], JsonParam::Bool(true)));
2421 assert!(matches!(kinds[1], JsonParam::Float(_)));
2422 assert!(matches!(kinds[2], JsonParam::Int(42)));
2423 assert!(matches!(kinds[3], JsonParam::Text(_)));
2424 assert!(matches!(kinds[4], JsonParam::Null));
2425 }
2426
2427 #[test]
2428 fn build_insert_sql_quotes_column_names() {
2429 let data = serde_json::json!({"createdAt": "2026-01-01"});
2430 let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2431 assert!(sql.contains("\"createdAt\""));
2432 assert!(sql.contains("\"Todo\""));
2433 }
2434
2435 #[test]
2436 fn build_insert_sql_rejects_non_object() {
2437 let data = serde_json::json!("not an object");
2438 let result = build_insert_sql("User", &data);
2439 assert!(result.is_err());
2440 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2441 }
2442
2443 #[test]
2444 fn build_update_sql_simple() {
2445 let data = serde_json::json!({
2446 "displayName": "Bob",
2447 "email": "bob@example.com"
2448 });
2449 let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2450
2451 assert!(sql.starts_with("UPDATE \"User\" SET"));
2452 assert!(sql.contains("WHERE id = $1"));
2453 assert!(sql.contains("$2"));
2454 assert!(sql.contains("$3"));
2455 match &values[0] {
2456 JsonParam::Text(s) => assert_eq!(s, "abc123"),
2457 other => panic!("expected Text id param, got {other:?}"),
2458 }
2459 assert_eq!(values.len(), 3); }
2461
2462 #[test]
2463 fn build_update_sql_quotes_column_names() {
2464 let data = serde_json::json!({"displayName": "Carol"});
2465 let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2466 assert!(sql.contains("\"displayName\" = $2"));
2467 }
2468
2469 #[test]
2470 fn build_update_sql_rejects_non_object() {
2471 let data = serde_json::json!(42);
2472 let result = build_update_sql("User", "id1", &data);
2473 assert!(result.is_err());
2474 assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2475 }
2476
2477 #[test]
2478 fn build_update_sql_rejects_empty_object() {
2479 let data = serde_json::json!({});
2480 let err = build_update_sql("User", "id1", &data).unwrap_err();
2481 assert_eq!(err.code, "PG_INVALID_DATA");
2482 assert!(err.message.contains("at least one field"));
2483 }
2484}