Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40// ---------------------------------------------------------------------------
41// SQL generation
42// ---------------------------------------------------------------------------
43
44/// Generate a Postgres CREATE TABLE statement.
45pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48    for field in fields {
49        let col_type = pg_column_type(&field.field_type);
50        let not_null = if field.optional { "" } else { " NOT NULL" };
51        let unique = if field.unique { " UNIQUE" } else { "" };
52        columns.push(format!(
53            "{} {}{}{}",
54            quote_ident(&field.name),
55            col_type,
56            not_null,
57            unique
58        ));
59    }
60
61    format!(
62        "CREATE TABLE IF NOT EXISTS {} ({})",
63        quote_ident(entity_name),
64        columns.join(", ")
65    )
66}
67
68/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
69/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
70/// Required-ness is tracked in the manifest; enforcement deferred.
71pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72    let col_type = pg_column_type(&field.field_type);
73    let unique = if field.unique { " UNIQUE" } else { "" };
74    format!(
75        "ALTER TABLE {} ADD COLUMN {} {}{}",
76        quote_ident(entity_name),
77        quote_ident(&field.name),
78        col_type,
79        unique
80    )
81}
82
83/// Generate a Postgres CREATE INDEX statement.
84pub fn create_index_sql(
85    entity_name: &str,
86    index_name: &str,
87    fields: &[String],
88    unique: bool,
89) -> String {
90    let unique_str = if unique { "UNIQUE " } else { "" };
91    let full_index_name = format!("{}_{}", entity_name, index_name);
92    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93    format!(
94        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95        unique_str,
96        quote_ident(&full_index_name),
97        quote_ident(entity_name),
98        quoted_fields.join(", ")
99    )
100}
101
102// ---------------------------------------------------------------------------
103// PostgresAdapter — planning-only adapter
104// ---------------------------------------------------------------------------
105
106/// A Postgres storage adapter. Currently supports planning only.
107/// No live connection — SQL generation and planning from manifest.
108pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112        // Plan from empty baseline.
113        let mut operations = Vec::new();
114
115        for entity in &target.entities {
116            let fields: Vec<FieldSpec> = entity
117                .fields
118                .iter()
119                .map(|f| FieldSpec {
120                    name: f.name.clone(),
121                    field_type: f.field_type.clone(),
122                    optional: f.optional,
123                    unique: f.unique,
124                })
125                .collect();
126
127            operations.push(SchemaOperation::CreateEntity {
128                name: entity.name.clone(),
129                fields,
130            });
131
132            for index in &entity.indexes {
133                operations.push(SchemaOperation::AddIndex {
134                    entity: entity.name.clone(),
135                    name: index.name.clone(),
136                    fields: index.fields.clone(),
137                    unique: index.unique,
138                });
139            }
140        }
141
142        if operations.is_empty() {
143            operations.push(SchemaOperation::Noop);
144        }
145
146        Ok(SchemaPlan { operations })
147    }
148
149    // apply_schema intentionally not implemented — uses default trait error.
150}
151
152/// Generate all SQL statements for a plan, in order.
153/// Useful for dry-run preview of what Postgres DDL would be executed.
154pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155    let mut statements = Vec::new();
156
157    for op in &plan.operations {
158        match op {
159            SchemaOperation::CreateEntity { name, fields } => {
160                statements.push(create_table_sql(name, fields));
161            }
162            SchemaOperation::AddField { entity, field } => {
163                statements.push(add_column_sql(entity, field));
164            }
165            SchemaOperation::AlterField {
166                entity,
167                previous,
168                target,
169            } => {
170                // Only nullable transitions today. SET / DROP NOT NULL is
171                // safe on a populated table when going from required →
172                // optional (existing rows already satisfy NOT NULL); the
173                // reverse direction (optional → required) succeeds only
174                // if every row has a non-null value, which the planner
175                // has no way to know — Postgres will fail the migration
176                // if it can't and the operator gets a clear error from
177                // the apply step.
178                if previous.optional && !target.optional {
179                    statements.push(format!(
180                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
181                        quote_ident(entity),
182                        quote_ident(&target.name)
183                    ));
184                } else if !previous.optional && target.optional {
185                    statements.push(format!(
186                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
187                        quote_ident(entity),
188                        quote_ident(&target.name)
189                    ));
190                }
191                // Nothing emitted when neither nullable nor type changed
192                // — falls through silently. AlterField with no actual
193                // shape change shouldn't happen in practice (the planner
194                // only emits it on real drift), but guard against
195                // emitting empty SQL just in case.
196            }
197            SchemaOperation::AddIndex {
198                entity,
199                name,
200                fields,
201                unique,
202            } => {
203                statements.push(create_index_sql(entity, name, fields, *unique));
204            }
205            SchemaOperation::Noop => {}
206            other => {
207                return Err(StorageError {
208                    code: "PG_OP_UNSUPPORTED".into(),
209                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
210                });
211            }
212        }
213    }
214
215    Ok(statements)
216}
217
218// ---------------------------------------------------------------------------
219// Introspection SQL helpers
220//
221// These generate the SQL queries that a live Postgres connection would run
222// to read the current schema. No connection required — just SQL strings.
223// ---------------------------------------------------------------------------
224
225/// SQL to list user tables in the public schema.
226pub const INTROSPECT_TABLES_SQL: &str = "\
227    SELECT table_name \
228    FROM information_schema.tables \
229    WHERE table_schema = 'public' \
230      AND table_type = 'BASE TABLE' \
231      AND table_name NOT LIKE '_pylon_%' \
232    ORDER BY table_name";
233
234/// SQL to list columns for a given table.
235/// Use with parameter: table_name.
236pub const INTROSPECT_COLUMNS_SQL: &str = "\
237    SELECT column_name, data_type, is_nullable, \
238           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
239            JOIN information_schema.key_column_usage kcu \
240              ON tc.constraint_name = kcu.constraint_name \
241            WHERE tc.table_name = c.table_name \
242              AND kcu.column_name = c.column_name \
243              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
244    FROM information_schema.columns c \
245    WHERE table_schema = 'public' AND table_name = $1 \
246    ORDER BY ordinal_position";
247
248/// SQL to list indexes for a given table.
249/// Use with parameter: table_name.
250pub const INTROSPECT_INDEXES_SQL: &str = "\
251    SELECT i.relname as index_name, \
252           ix.indisunique as is_unique, \
253           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
254    FROM pg_index ix \
255    JOIN pg_class t ON t.oid = ix.indrelid \
256    JOIN pg_class i ON i.oid = ix.indexrelid \
257    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
258    JOIN pg_namespace n ON n.oid = t.relnamespace \
259    WHERE n.nspname = 'public' \
260      AND t.relname = $1 \
261      AND NOT ix.indisprimary \
262    GROUP BY i.relname, ix.indisunique \
263    ORDER BY i.relname";
264
265/// Plan from a snapshot (reuses the shared plan_from_snapshot).
266/// This allows Postgres to plan incrementally once introspection data is available.
267pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
268    crate::plan_from_snapshot(snapshot, target)
269}
270
271// ---------------------------------------------------------------------------
272// CRUD SQL generation helpers (used by live adapter, testable without a DB)
273// ---------------------------------------------------------------------------
274
275/// Generate a lex-sortable, monotonic-ish unique ID.
276///
277/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
278/// of a per-process atomic counter. The counter prevents collisions when two
279/// inserts hit the same nanosecond and — critically — keeps order stable: an
280/// id minted at the same nanosecond is monotonically greater than the
281/// previous one. Width is fixed at 40 chars so lexicographic comparison
282/// matches creation order, which is what cursor pagination relies on.
283pub fn generate_id() -> String {
284    use std::sync::atomic::{AtomicU32, Ordering};
285    use std::time::{SystemTime, UNIX_EPOCH};
286    static COUNTER: AtomicU32 = AtomicU32::new(0);
287    let ts = SystemTime::now()
288        .duration_since(UNIX_EPOCH)
289        .unwrap_or_default()
290        .as_nanos();
291    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
292    format!("{ts:032x}{seq:08x}")
293}
294
295/// Convert a JSON value to its string representation for use as a SQL parameter.
296///
297/// Kept for back-compat with callers that need a textual fallback (e.g.
298/// human-readable logs). New code should bind through [`JsonParam`] so
299/// integers/booleans/nulls reach Postgres in their typed form instead of
300/// collapsing to TEXT, which the driver can't coerce into INTEGER /
301/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
302/// into an empty string for FK columns.
303pub fn json_value_to_string(val: &serde_json::Value) -> String {
304    match val {
305        serde_json::Value::String(s) => s.clone(),
306        serde_json::Value::Number(n) => n.to_string(),
307        serde_json::Value::Bool(b) => b.to_string(),
308        serde_json::Value::Null => String::new(),
309        other => other.to_string(),
310    }
311}
312
313/// A typed wrapper around a JSON scalar that implements
314/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
315///
316/// The previous implementation passed every value as `String`, which broke
317/// non-text columns (the postgres driver can't bind a string literal into
318/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
319/// `null` into the empty string for nullable FKs (so `unlink` left
320/// dangling `""` references instead of NULL). `JsonParam` carries the
321/// JSON variant tag through to `to_sql` so the driver can pick the
322/// correct binary representation per column type.
323///
324/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
325/// runtime layer doesn't currently model array/object columns at the
326/// manifest level on Postgres, so anything that lands here came from
327/// caller-supplied prose that's expected to fit into a TEXT column.
328#[derive(Debug, Clone, PartialEq)]
329pub enum JsonParam {
330    Null,
331    Text(String),
332    Int(i64),
333    Float(f64),
334    Bool(bool),
335}
336
337impl JsonParam {
338    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
339    /// that fit `i64` go through as Int; everything else goes through as
340    /// Float to preserve fractional / large-magnitude values.
341    pub fn from_json(val: &serde_json::Value) -> Self {
342        match val {
343            serde_json::Value::Null => JsonParam::Null,
344            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
345            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
346            serde_json::Value::Number(n) => {
347                if let Some(i) = n.as_i64() {
348                    JsonParam::Int(i)
349                } else if let Some(f) = n.as_f64() {
350                    JsonParam::Float(f)
351                } else {
352                    JsonParam::Text(n.to_string())
353                }
354            }
355            other => JsonParam::Text(other.to_string()),
356        }
357    }
358}
359
360#[cfg(feature = "postgres-live")]
361impl postgres::types::ToSql for JsonParam {
362    fn to_sql(
363        &self,
364        ty: &postgres::types::Type,
365        out: &mut bytes::BytesMut,
366    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
367        use postgres::types::Type;
368
369        // Null binds as SQL NULL regardless of the column's declared
370        // type — the postgres driver treats `IsNull::Yes` as a null
371        // value of the requested type.
372        if matches!(self, JsonParam::Null) {
373            return Ok(postgres::types::IsNull::Yes);
374        }
375
376        // Match each JsonParam variant against the COLUMN's declared
377        // type so the binary encoding actually fits the target slot.
378        // Postgres rejects "binary data of wrong size" if you bind an
379        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
380        // exactly what the previous "everything is a String" path did
381        // on every non-TEXT column.
382        match (self, ty) {
383            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
384
385            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
386            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
387            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
388            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
389            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
390
391            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
392            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
393            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
394            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
395
396            (JsonParam::Text(s), &Type::TEXT)
397            | (JsonParam::Text(s), &Type::VARCHAR)
398            | (JsonParam::Text(s), &Type::BPCHAR)
399            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
400            (JsonParam::Text(s), &Type::TIMESTAMPTZ) | (JsonParam::Text(s), &Type::TIMESTAMP) => {
401                // The runtime currently models datetimes as ISO 8601
402                // strings end-to-end. Bind via the &str impl with the
403                // target type so postgres parses through its TEXT input
404                // function for that type. Cheaper than introducing
405                // chrono just for date binding.
406                s.as_str().to_sql(ty, out)
407            }
408
409            // Cross-type fallback: render as text and bind into a TEXT
410            // slot, OR error if the target column doesn't accept text.
411            // Catches "manifest says INT but caller sent a stringified
412            // number" — better to fail loudly than silently coerce.
413            (other, _) => {
414                let s = match other {
415                    JsonParam::Bool(b) => b.to_string(),
416                    JsonParam::Int(n) => n.to_string(),
417                    JsonParam::Float(f) => f.to_string(),
418                    JsonParam::Text(s) => s.clone(),
419                    JsonParam::Null => unreachable!(),
420                };
421                s.to_sql(ty, out)
422            }
423        }
424    }
425
426    fn accepts(_ty: &postgres::types::Type) -> bool {
427        // Defer per-variant acceptance to to_sql_checked, which dispatches
428        // to the inner type's ToSql impl. Returning `true` here matches
429        // the postgres crate's recommended pattern for sum-type wrappers.
430        true
431    }
432
433    postgres::types::to_sql_checked!();
434}
435
436/// Build an INSERT SQL statement and collect typed parameter values.
437/// Returns `(sql, params)` where `params[0]` is the generated ID
438/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
439/// value so the postgres driver can bind them to typed columns
440/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
441/// the database as SQL NULL — the previous string-collapsing path stored
442/// `""` for nullable FKs and broke any non-text column.
443pub fn build_insert_sql(
444    entity: &str,
445    data: &serde_json::Value,
446) -> Result<(String, Vec<JsonParam>), StorageError> {
447    let id = generate_id();
448    let obj = data.as_object().ok_or_else(|| StorageError {
449        code: "PG_INVALID_DATA".into(),
450        message: "Insert data must be a JSON object".into(),
451    })?;
452
453    let mut col_names = vec!["id".to_string()];
454    let mut placeholders = vec!["$1".to_string()];
455    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
456
457    for (i, (key, val)) in obj.iter().enumerate() {
458        col_names.push(quote_ident(key));
459        placeholders.push(format!("${}", i + 2));
460        values.push(JsonParam::from_json(val));
461    }
462
463    let sql = format!(
464        "INSERT INTO {} ({}) VALUES ({})",
465        quote_ident(entity),
466        col_names.join(", "),
467        placeholders.join(", ")
468    );
469
470    Ok((sql, values))
471}
472
473/// Build an UPDATE SQL statement and collect typed parameter values.
474/// Returns `(sql, params)` where `params[0]` is the row ID.
475pub fn build_update_sql(
476    entity: &str,
477    id: &str,
478    data: &serde_json::Value,
479) -> Result<(String, Vec<JsonParam>), StorageError> {
480    let obj = data.as_object().ok_or_else(|| StorageError {
481        code: "PG_INVALID_DATA".into(),
482        message: "Update data must be a JSON object".into(),
483    })?;
484
485    if obj.is_empty() {
486        return Err(StorageError {
487            code: "PG_INVALID_DATA".into(),
488            message: "Update data must contain at least one field".into(),
489        });
490    }
491
492    let mut set_clauses = Vec::new();
493    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
494
495    for (i, (key, val)) in obj.iter().enumerate() {
496        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
497        values.push(JsonParam::from_json(val));
498    }
499
500    let sql = format!(
501        "UPDATE {} SET {} WHERE id = $1",
502        quote_ident(entity),
503        set_clauses.join(", ")
504    );
505
506    Ok((sql, values))
507}
508
509/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
510/// at insert/update/transact call sites. The postgres driver wants a
511/// slice of trait objects; this avoids repeating the same map at each site.
512#[cfg(feature = "postgres-live")]
513fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
514    values
515        .iter()
516        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
517        .collect()
518}
519
520// ---------------------------------------------------------------------------
521// Live Postgres adapter (requires "postgres-live" feature)
522// ---------------------------------------------------------------------------
523
524#[cfg(feature = "postgres-live")]
525pub mod live {
526    use super::*;
527    use crate::{
528        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
529    };
530
531    /// A live Postgres adapter with a real database connection.
532    pub struct LivePostgresAdapter {
533        client: postgres::Client,
534    }
535
536    impl LivePostgresAdapter {
537        /// Connect to a Postgres database.
538        pub fn connect(url: &str) -> Result<Self, StorageError> {
539            let client =
540                postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
541                    code: "PG_CONNECT_FAILED".into(),
542                    message: format!("Failed to connect to Postgres: {e}"),
543                })?;
544            Ok(Self { client })
545        }
546
547        /// Read the current schema from the live database.
548        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
549            let table_rows = self
550                .client
551                .query(INTROSPECT_TABLES_SQL, &[])
552                .map_err(pg_err)?;
553
554            let mut tables = Vec::new();
555            for row in &table_rows {
556                let table_name: String = row.get(0);
557                let columns = self.read_columns(&table_name)?;
558                let indexes = self.read_indexes(&table_name)?;
559                tables.push(TableSnapshot {
560                    name: table_name,
561                    columns,
562                    indexes,
563                });
564            }
565
566            Ok(SchemaSnapshot { tables })
567        }
568
569        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
570            let rows = self
571                .client
572                .query(INTROSPECT_COLUMNS_SQL, &[&table])
573                .map_err(pg_err)?;
574
575            let mut columns = Vec::new();
576            for row in &rows {
577                let name: String = row.get(0);
578                let data_type: String = row.get(1);
579                let is_nullable: String = row.get(2);
580                let is_pk: i64 = row.get(3);
581                columns.push(ColumnSnapshot {
582                    name,
583                    column_type: data_type,
584                    notnull: is_nullable == "NO",
585                    primary_key: is_pk > 0,
586                });
587            }
588            Ok(columns)
589        }
590
591        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
592            let rows = self
593                .client
594                .query(INTROSPECT_INDEXES_SQL, &[&table])
595                .map_err(pg_err)?;
596
597            let mut indexes = Vec::new();
598            for row in &rows {
599                let name: String = row.get(0);
600                let unique: bool = row.get(1);
601                let columns: Vec<String> = row.get(2);
602                indexes.push(IndexSnapshot {
603                    name,
604                    columns,
605                    unique,
606                });
607            }
608            Ok(indexes)
609        }
610
611        /// Plan from live database state.
612        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
613            let snapshot = self.read_schema()?;
614            Ok(crate::plan_from_snapshot(&snapshot, target))
615        }
616    }
617
618    impl StorageAdapter for LivePostgresAdapter {
619        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
620            Err(StorageError {
621                code: "PG_PLAN_NEEDS_MUTABLE".into(),
622                message: "Use plan_from_live() instead for live Postgres planning".into(),
623            })
624        }
625
626        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
627            Err(StorageError {
628                code: "PG_APPLY_USE_METHOD".into(),
629                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
630            })
631        }
632    }
633
634    impl LivePostgresAdapter {
635        /// Apply a schema plan to the live database.
636        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
637            let statements = plan_to_sql(plan)?;
638            for sql in &statements {
639                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
640            }
641            Ok(())
642        }
643
644        /// Execute a raw SQL statement against the live database. Used by
645        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
646        /// production code should go through `apply_plan` so changes are
647        /// represented in the migration history. Returns the number of
648        /// rows affected.
649        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
650            self.client.execute(sql, &[]).map_err(pg_err)
651        }
652
653        /// Insert a row. Returns the generated ID.
654        pub fn insert(
655            &mut self,
656            entity: &str,
657            data: &serde_json::Value,
658        ) -> Result<String, StorageError> {
659            let (sql, values) = build_insert_sql(entity, data)?;
660            // The first param is always the generated ID — extract it before
661            // we hand `values` off to the postgres driver as borrowed slices.
662            let id = match &values[0] {
663                JsonParam::Text(s) => s.clone(),
664                _ => {
665                    return Err(StorageError {
666                        code: "PG_INTERNAL".into(),
667                        message: "build_insert_sql produced non-text id param".into(),
668                    });
669                }
670            };
671            let params = as_pg_params(&values);
672            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
673            Ok(id)
674        }
675
676        /// Get a row by ID.
677        pub fn get_by_id(
678            &mut self,
679            entity: &str,
680            id: &str,
681        ) -> Result<Option<serde_json::Value>, StorageError> {
682            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
683            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
684
685            match rows.first() {
686                Some(row) => Ok(Some(row_to_json(row))),
687                None => Ok(None),
688            }
689        }
690
691        /// List all rows from an entity.
692        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
693            let sql = format!("SELECT * FROM {}", quote_ident(entity));
694            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
695
696            Ok(rows.iter().map(row_to_json).collect())
697        }
698
699        /// Cursor-paginated list. `after` is the last `id` from the previous
700        /// page; the result contains rows with `id > after` (lex order),
701        /// limited to `limit`. Used for sync push/pull.
702        pub fn list_after(
703            &mut self,
704            entity: &str,
705            after: Option<&str>,
706            limit: usize,
707        ) -> Result<Vec<serde_json::Value>, StorageError> {
708            // Cap limit at a sensible upper bound so a malicious client can't
709            // stream the whole table by passing limit=u64::MAX.
710            let capped: i64 = limit.min(10_000) as i64;
711            let sql = match after {
712                Some(_) => format!(
713                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
714                    quote_ident(entity)
715                ),
716                None => format!(
717                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
718                    quote_ident(entity)
719                ),
720            };
721            let rows = match after {
722                Some(cursor) => self
723                    .client
724                    .query(sql.as_str(), &[&cursor, &capped])
725                    .map_err(pg_err)?,
726                None => self
727                    .client
728                    .query(sql.as_str(), &[&capped])
729                    .map_err(pg_err)?,
730            };
731            Ok(rows.iter().map(row_to_json).collect())
732        }
733
734        /// Update a row by ID. Returns true if the row was found and updated.
735        pub fn update(
736            &mut self,
737            entity: &str,
738            id: &str,
739            data: &serde_json::Value,
740        ) -> Result<bool, StorageError> {
741            let (sql, values) = build_update_sql(entity, id, data)?;
742            let params = as_pg_params(&values);
743            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
744            Ok(rows_affected > 0)
745        }
746
747        /// Delete a row by ID. Returns true if the row was found and deleted.
748        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
749            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
750            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
751            Ok(rows_affected > 0)
752        }
753
754        /// Look up a row by `field = value`. Caller must validate `field`
755        /// against the manifest before calling — we still `quote_ident` it
756        /// but won't catch a typo against the entity definition.
757        pub fn lookup_field(
758            &mut self,
759            entity: &str,
760            field: &str,
761            value: &str,
762        ) -> Result<Option<serde_json::Value>, StorageError> {
763            let sql = format!(
764                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
765                quote_ident(entity),
766                quote_ident(field),
767            );
768            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
769            Ok(rows.first().map(row_to_json))
770        }
771
772        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
773        ///
774        /// Supported operators (parity with the SQLite path):
775        /// - Equality (`field: value`)
776        /// - `$not`: emits `field != value`
777        /// - `$gt` / `$gte` / `$lt` / `$lte`
778        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
779        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
780        ///   if/when the SQLite side adds it)
781        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
782        ///
783        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
784        ///
785        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
786        /// would need a tsvector column or a generic ILIKE OR-fold across
787        /// every text field, neither of which is wired up yet. Returns
788        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
789        /// receiving silently-broad results.
790        ///
791        /// Anything else is silently ignored (matches the in-memory fallback's
792        /// permissive behavior). Field names are validated against `valid_columns`
793        /// to prevent SQL injection — pass the entity's column set.
794        pub fn query_filtered(
795            &mut self,
796            entity: &str,
797            filter: &serde_json::Value,
798            valid_columns: &[String],
799        ) -> Result<Vec<serde_json::Value>, StorageError> {
800            let empty = serde_json::Map::new();
801            let obj = filter.as_object().unwrap_or(&empty);
802
803            let validate = |col: &str| -> Result<(), StorageError> {
804                if col == "id" || valid_columns.iter().any(|c| c == col) {
805                    Ok(())
806                } else {
807                    Err(StorageError {
808                        code: "UNKNOWN_COLUMN".into(),
809                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
810                    })
811                }
812            };
813
814            let mut where_clauses: Vec<String> = Vec::new();
815            let mut order_clause = String::new();
816            let mut limit_clause = String::new();
817            let mut offset_clause = String::new();
818            // Collect (col, op, value) so placeholder numbers can be assigned
819            // in a single materialization pass after the parse loop. Values
820            // are now JsonParam (typed) instead of String — see `value_to_pg`.
821            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
822
823            for (key, val) in obj {
824                match key.as_str() {
825                    "$search" => {
826                        // FTS5 (SQLite) has no portable equivalent in
827                        // Postgres without an explicit tsvector column.
828                        // Return a clear error rather than silently
829                        // ignoring the operator — callers that hit this
830                        // need to either branch on backend or define a
831                        // tsvector + GIN index in their schema.
832                        return Err(StorageError {
833                            code: "SEARCH_NOT_SUPPORTED".into(),
834                            message: "$search is SQLite-FTS5-only; use a Postgres tsvector column with the storage adapter's full-text path"
835                                .into(),
836                        });
837                    }
838                    "$order" => {
839                        if let Some(ord) = val.as_object() {
840                            let mut parts = Vec::new();
841                            for (col, dir) in ord {
842                                validate(col)?;
843                                let d = match dir.as_str().unwrap_or("asc") {
844                                    "desc" | "DESC" => "DESC",
845                                    _ => "ASC",
846                                };
847                                parts.push(format!("{} {d}", quote_ident(col)));
848                            }
849                            if !parts.is_empty() {
850                                order_clause = format!(" ORDER BY {}", parts.join(", "));
851                            }
852                        }
853                    }
854                    "$limit" => {
855                        if let Some(n) = val.as_u64() {
856                            limit_clause = format!(" LIMIT {}", n);
857                        }
858                    }
859                    "$offset" => {
860                        if let Some(n) = val.as_u64() {
861                            offset_clause = format!(" OFFSET {}", n);
862                        }
863                    }
864                    field => {
865                        validate(field)?;
866                        match val {
867                            serde_json::Value::Object(ops) => {
868                                for (op, v) in ops {
869                                    match op.as_str() {
870                                        "$not" => planned.push((
871                                            field.into(),
872                                            "!=".into(),
873                                            value_to_pg(v),
874                                        )),
875                                        "$gt" => {
876                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
877                                        }
878                                        "$gte" => planned.push((
879                                            field.into(),
880                                            ">=".into(),
881                                            value_to_pg(v),
882                                        )),
883                                        "$lt" => {
884                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
885                                        }
886                                        "$lte" => planned.push((
887                                            field.into(),
888                                            "<=".into(),
889                                            value_to_pg(v),
890                                        )),
891                                        "$like" => planned.push((
892                                            field.into(),
893                                            "LIKE".into(),
894                                            value_to_pg(v),
895                                        )),
896                                        "$in" => {
897                                            if let Some(arr) = v.as_array() {
898                                                let placeholders: Vec<String> = (0..arr.len())
899                                                    .map(|i| format!("${}", planned.len() + 1 + i))
900                                                    .collect();
901                                                where_clauses.push(format!(
902                                                    "{} IN ({})",
903                                                    quote_ident(field),
904                                                    placeholders.join(", "),
905                                                ));
906                                                for x in arr {
907                                                    planned.push((
908                                                        format!("__inline_{}", planned.len()),
909                                                        "__INLINE__".into(),
910                                                        value_to_pg(x),
911                                                    ));
912                                                }
913                                            }
914                                        }
915                                        _ => {}
916                                    }
917                                }
918                            }
919                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
920                        }
921                    }
922                }
923            }
924
925            // Materialize planned -> SQL + params.
926            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
927            for (field, op, v) in &planned {
928                if op == "__INLINE__" {
929                    // Already emitted via the IN-clause path; just push the value.
930                    params.push(v.clone());
931                } else {
932                    let placeholder = format!("${}", params.len() + 1);
933                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
934                    params.push(v.clone());
935                }
936            }
937
938            let where_sql = if where_clauses.is_empty() {
939                String::new()
940            } else {
941                format!(" WHERE {}", where_clauses.join(" AND "))
942            };
943            let sql = format!(
944                "SELECT * FROM {}{}{}{}{}",
945                quote_ident(entity),
946                where_sql,
947                order_clause,
948                limit_clause,
949                offset_clause,
950            );
951
952            let pg_params = as_pg_params(&params);
953            let rows = self
954                .client
955                .query(sql.as_str(), &pg_params)
956                .map_err(pg_err)?;
957            Ok(rows.iter().map(row_to_json).collect())
958        }
959
960        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
961        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
962        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
963        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
964        /// via `date_trunc`), and a flat-equality `where` filter.
965        ///
966        /// Spec format (same JSON shape used by the SQLite path):
967        /// ```json
968        /// { "count": "*",
969        ///   "sum": ["amount"],
970        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
971        ///   "where": {"status": "paid"} }
972        /// ```
973        ///
974        /// `valid_columns` is used to validate every field name before it's
975        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
976        /// `DataStore` impl in this crate) supplies the entity's column set
977        /// from the manifest.
978        pub fn aggregate(
979            &mut self,
980            entity: &str,
981            spec: &serde_json::Value,
982            valid_columns: &[String],
983        ) -> Result<serde_json::Value, StorageError> {
984            let obj = spec.as_object().ok_or_else(|| StorageError {
985                code: "INVALID_QUERY".into(),
986                message: "aggregate spec must be a JSON object".into(),
987            })?;
988
989            let validate = |col: &str| -> Result<(), StorageError> {
990                if col == "id" || valid_columns.iter().any(|c| c == col) {
991                    Ok(())
992                } else {
993                    Err(StorageError {
994                        code: "UNKNOWN_COLUMN".into(),
995                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
996                    })
997                }
998            };
999
1000            let mut select_parts: Vec<String> = Vec::new();
1001            let mut result_fields: Vec<String> = Vec::new();
1002
1003            if let Some(count) = obj.get("count") {
1004                match count {
1005                    serde_json::Value::String(s) if s == "*" => {
1006                        select_parts.push("COUNT(*) AS count".into());
1007                        result_fields.push("count".into());
1008                    }
1009                    serde_json::Value::String(field) => {
1010                        validate(field)?;
1011                        let alias = format!("count_{field}");
1012                        select_parts.push(format!(
1013                            "COUNT({}) AS {}",
1014                            quote_ident(field),
1015                            quote_ident(&alias),
1016                        ));
1017                        result_fields.push(alias);
1018                    }
1019                    _ => {}
1020                }
1021            }
1022
1023            for (fn_name, prefix) in [
1024                ("sum", "sum_"),
1025                ("avg", "avg_"),
1026                ("min", "min_"),
1027                ("max", "max_"),
1028            ] {
1029                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1030                    for field in fields {
1031                        if let Some(f) = field.as_str() {
1032                            validate(f)?;
1033                            let alias = format!("{prefix}{f}");
1034                            let sql_fn = fn_name.to_uppercase();
1035                            select_parts.push(format!(
1036                                "{}({}) AS {}",
1037                                sql_fn,
1038                                quote_ident(f),
1039                                quote_ident(&alias),
1040                            ));
1041                            result_fields.push(alias);
1042                        }
1043                    }
1044                }
1045            }
1046
1047            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1048                for field in fields {
1049                    if let Some(f) = field.as_str() {
1050                        validate(f)?;
1051                        let alias = format!("count_distinct_{f}");
1052                        select_parts.push(format!(
1053                            "COUNT(DISTINCT {}) AS {}",
1054                            quote_ident(f),
1055                            quote_ident(&alias),
1056                        ));
1057                        result_fields.push(alias);
1058                    }
1059                }
1060            }
1061
1062            // groupBy: column name or { field, bucket } — same vocabulary as
1063            // the SQLite path. Buckets translate to Postgres `date_trunc`
1064            // (SQLite uses `strftime`); both collapse rows to the bucket
1065            // boundary identically.
1066            let mut group_by: Vec<String> = Vec::new();
1067            let mut group_select: Vec<String> = Vec::new();
1068            let mut group_field_names: Vec<String> = Vec::new();
1069            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1070                for g in groups {
1071                    if let Some(f) = g.as_str() {
1072                        validate(f)?;
1073                        let q = quote_ident(f);
1074                        group_by.push(q.clone());
1075                        group_select.push(q);
1076                        group_field_names.push(f.to_string());
1077                    } else if let Some(spec) = g.as_object() {
1078                        let field =
1079                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1080                                StorageError {
1081                                    code: "INVALID_QUERY".into(),
1082                                    message: "groupBy object spec requires `field`".into(),
1083                                }
1084                            })?;
1085                        validate(field)?;
1086                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1087                        let trunc_unit = match bucket {
1088                            "hour" | "day" | "week" | "month" | "year" => bucket,
1089                            _ => {
1090                                return Err(StorageError {
1091                                    code: "INVALID_QUERY".into(),
1092                                    message: format!(
1093                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1094                                    ),
1095                                });
1096                            }
1097                        };
1098                        let alias = format!("{field}_{bucket}");
1099                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1100                        group_by.push(expr.clone());
1101                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1102                        group_field_names.push(alias);
1103                    }
1104                }
1105            }
1106
1107            let mut full_select = group_select.clone();
1108            full_select.extend(select_parts.iter().cloned());
1109            if full_select.is_empty() {
1110                return Err(StorageError {
1111                    code: "INVALID_QUERY".into(),
1112                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1113                });
1114            }
1115
1116            let mut where_clauses: Vec<String> = Vec::new();
1117            let mut params: Vec<JsonParam> = Vec::new();
1118            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1119                for (k, v) in w {
1120                    validate(k)?;
1121                    let placeholder = format!("${}", params.len() + 1);
1122                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1123                    params.push(value_to_pg(v));
1124                }
1125            }
1126            let where_sql = if where_clauses.is_empty() {
1127                String::new()
1128            } else {
1129                format!(" WHERE {}", where_clauses.join(" AND "))
1130            };
1131            let group_sql = if group_by.is_empty() {
1132                String::new()
1133            } else {
1134                format!(" GROUP BY {}", group_by.join(", "))
1135            };
1136
1137            let sql = format!(
1138                "SELECT {} FROM {}{}{}",
1139                full_select.join(", "),
1140                quote_ident(entity),
1141                where_sql,
1142                group_sql,
1143            );
1144
1145            let pg_params = as_pg_params(&params);
1146            let rows = self
1147                .client
1148                .query(sql.as_str(), &pg_params)
1149                .map_err(pg_err)?;
1150
1151            let column_names: Vec<String> = group_field_names
1152                .iter()
1153                .chain(result_fields.iter())
1154                .cloned()
1155                .collect();
1156
1157            let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1158            for row in &rows {
1159                let row_json = row_to_json(row);
1160                if let serde_json::Value::Object(map) = &row_json {
1161                    let mut filtered = serde_json::Map::new();
1162                    for name in &column_names {
1163                        if let Some(v) = map.get(name) {
1164                            filtered.insert(name.clone(), v.clone());
1165                        }
1166                    }
1167                    out.push(serde_json::Value::Object(filtered));
1168                } else {
1169                    out.push(row_json);
1170                }
1171            }
1172            Ok(serde_json::json!({ "rows": out }))
1173        }
1174    }
1175
1176    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1177    pub enum TxOp<'a> {
1178        Insert {
1179            entity: &'a str,
1180            data: &'a serde_json::Value,
1181        },
1182        Update {
1183            entity: &'a str,
1184            id: &'a str,
1185            data: &'a serde_json::Value,
1186        },
1187        Delete {
1188            entity: &'a str,
1189            id: &'a str,
1190        },
1191    }
1192
1193    /// Result of a single op inside a transaction.
1194    #[derive(Debug, Clone)]
1195    pub enum TxResult {
1196        Inserted(String),
1197        Updated(bool),
1198        Deleted(bool),
1199    }
1200
1201    impl LivePostgresAdapter {
1202        /// Run `ops` inside a single Postgres transaction. Either all of them
1203        /// commit together or none of them do — there is no partial state on
1204        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1205        /// guard is dropped without `commit()` being called.
1206        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1207            let mut tx = self.client.transaction().map_err(pg_err)?;
1208            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1209
1210            for op in ops {
1211                match op {
1212                    TxOp::Insert { entity, data } => {
1213                        let (sql, values) = build_insert_sql(entity, data)?;
1214                        let id = match &values[0] {
1215                            JsonParam::Text(s) => s.clone(),
1216                            _ => {
1217                                return Err(StorageError {
1218                                    code: "PG_INTERNAL".into(),
1219                                    message: "build_insert_sql produced non-text id param".into(),
1220                                });
1221                            }
1222                        };
1223                        let params = as_pg_params(&values);
1224                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1225                        results.push(TxResult::Inserted(id));
1226                    }
1227                    TxOp::Update { entity, id, data } => {
1228                        let (sql, values) = build_update_sql(entity, id, data)?;
1229                        let params = as_pg_params(&values);
1230                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1231                        results.push(TxResult::Updated(n > 0));
1232                    }
1233                    TxOp::Delete { entity, id } => {
1234                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1235                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1236                        results.push(TxResult::Deleted(n > 0));
1237                    }
1238                }
1239            }
1240
1241            tx.commit().map_err(pg_err)?;
1242            Ok(results)
1243        }
1244    }
1245
1246    /// Lift a JSON value into a typed Postgres parameter. The previous
1247    /// implementation collapsed everything to `String`, which silently
1248    /// stringified ints/bools and turned JSON `null` into `""` for
1249    /// nullable columns. Forwarding through `JsonParam` keeps the column
1250    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1251    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1252        JsonParam::from_json(v)
1253    }
1254
1255    fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1256        use postgres::types::Type;
1257        let mut obj = serde_json::Map::new();
1258        for (i, col) in row.columns().iter().enumerate() {
1259            let name = col.name().to_string();
1260
1261            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1262            // and a panic in a query handler poisons the connection mutex,
1263            // taking down all subsequent reads on this datastore. Anything
1264            // that fails to decode becomes Null with a one-shot warning.
1265            //
1266            // Timestamps and the catch-all path explicitly DON'T request
1267            // `String` — the postgres crate uses binary protocol by default
1268            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1269            // ask for `Vec<u8>` and lossy-stringify, which works for all
1270            // text-shaped columns in either protocol.
1271            let value: serde_json::Value = match *col.type_() {
1272                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1273                    .flatten()
1274                    .map(serde_json::Value::Bool)
1275                    .unwrap_or(serde_json::Value::Null),
1276                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1277                    .flatten()
1278                    .map(|v| serde_json::Value::Number(v.into()))
1279                    .unwrap_or(serde_json::Value::Null),
1280                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1281                    .flatten()
1282                    .map(|v| serde_json::Value::Number(v.into()))
1283                    .unwrap_or(serde_json::Value::Null),
1284                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1285                    .flatten()
1286                    .map(|v| serde_json::Value::Number(v.into()))
1287                    .unwrap_or(serde_json::Value::Null),
1288                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1289                    .flatten()
1290                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1291                    .map(serde_json::Value::Number)
1292                    .unwrap_or(serde_json::Value::Null),
1293                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1294                    .flatten()
1295                    .and_then(serde_json::Number::from_f64)
1296                    .map(serde_json::Value::Number)
1297                    .unwrap_or(serde_json::Value::Null),
1298                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1299                    .flatten()
1300                    .unwrap_or(serde_json::Value::Null),
1301                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1302                    .flatten()
1303                    .map(|b| serde_json::Value::String(b64(&b)))
1304                    .unwrap_or(serde_json::Value::Null),
1305                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1306                    try_get_or_null::<Option<String>>(row, i)
1307                        .flatten()
1308                        .map(serde_json::Value::String)
1309                        .unwrap_or(serde_json::Value::Null)
1310                }
1311                _ => {
1312                    // Last resort: ask Postgres to render anything else as
1313                    // text via a stringifying decode through Vec<u8>. If even
1314                    // that fails (rare — Postgres types not implementing the
1315                    // text format), fall through to Null with a warning.
1316                    match row.try_get::<_, Option<String>>(i) {
1317                        Ok(Some(s)) => serde_json::Value::String(s),
1318                        Ok(None) => serde_json::Value::Null,
1319                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1320                            Ok(Some(bytes)) => serde_json::Value::String(
1321                                String::from_utf8_lossy(&bytes).into_owned(),
1322                            ),
1323                            _ => serde_json::Value::Null,
1324                        },
1325                    }
1326                }
1327            };
1328            obj.insert(name, value);
1329        }
1330        serde_json::Value::Object(obj)
1331    }
1332
1333    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1334    where
1335        T: postgres::types::FromSql<'a>,
1336    {
1337        match row.try_get::<_, T>(i) {
1338            Ok(v) => Some(v),
1339            Err(e) => {
1340                tracing::warn!(
1341                    "[postgres] decode failed for column {} ({}): {e}",
1342                    i,
1343                    row.columns()[i].name()
1344                );
1345                None
1346            }
1347        }
1348    }
1349
1350    /// Minimal base64 encoder so we don't need another dependency just for
1351    /// the BYTEA column edge case.
1352    fn b64(bytes: &[u8]) -> String {
1353        const TABLE: &[u8; 64] =
1354            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1355        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1356        let chunks = bytes.chunks(3);
1357        for chunk in chunks {
1358            let b = [
1359                chunk.first().copied().unwrap_or(0),
1360                chunk.get(1).copied().unwrap_or(0),
1361                chunk.get(2).copied().unwrap_or(0),
1362            ];
1363            out.push(TABLE[(b[0] >> 2) as usize] as char);
1364            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1365            if chunk.len() > 1 {
1366                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1367            } else {
1368                out.push('=');
1369            }
1370            if chunk.len() > 2 {
1371                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1372            } else {
1373                out.push('=');
1374            }
1375        }
1376        out
1377    }
1378
1379    fn pg_err(e: postgres::Error) -> StorageError {
1380        // postgres::Error's Display is intentionally short ("db error",
1381        // "connection error" etc.) — the actual SQLSTATE / detail lives
1382        // on the source chain. Walk the chain so the final message has
1383        // enough signal to debug a failed insert/update without
1384        // attaching a debugger.
1385        use std::error::Error;
1386        let mut detail = format!("{e}");
1387        let mut src: Option<&dyn Error> = e.source();
1388        while let Some(s) = src {
1389            detail.push_str(": ");
1390            detail.push_str(&format!("{s}"));
1391            src = s.source();
1392        }
1393        StorageError {
1394            code: "PG_QUERY_FAILED".into(),
1395            message: format!("Postgres query failed: {detail}"),
1396        }
1397    }
1398}
1399
1400// ---------------------------------------------------------------------------
1401// Tests
1402// ---------------------------------------------------------------------------
1403
1404#[cfg(test)]
1405mod tests {
1406    use super::*;
1407
1408    /// Hand-rolled fixture that matches the snapshots in the tests
1409    /// below. Decoupled from any example's `pylon.manifest.json` so
1410    /// changing an example schema doesn't bleed into adapter tests.
1411    fn test_manifest() -> AppManifest {
1412        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1413        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1414            name: name.into(),
1415            field_type: ty.into(),
1416            optional: opt,
1417            unique: uniq,
1418            crdt: None,
1419        };
1420        AppManifest {
1421            manifest_version: 1,
1422            name: "test".into(),
1423            version: "0.0.0".into(),
1424            entities: vec![
1425                ManifestEntity {
1426                    name: "User".into(),
1427                    fields: vec![
1428                        f("email", "string", false, true),
1429                        f("displayName", "string", false, false),
1430                        f("createdAt", "datetime", false, false),
1431                    ],
1432                    indexes: vec![],
1433                    relations: vec![],
1434                    search: None,
1435                    crdt: true,
1436                },
1437                ManifestEntity {
1438                    name: "Todo".into(),
1439                    fields: vec![
1440                        f("title", "string", false, false),
1441                        f("done", "bool", false, false),
1442                        f("userId", "id(User)", false, false),
1443                        f("createdAt", "datetime", false, false),
1444                    ],
1445                    indexes: vec![ManifestIndex {
1446                        name: "by_user".into(),
1447                        fields: vec!["userId".into()],
1448                        unique: false,
1449                    }],
1450                    relations: vec![],
1451                    search: None,
1452                    crdt: true,
1453                },
1454            ],
1455            queries: vec![],
1456            actions: vec![],
1457            policies: vec![],
1458            routes: vec![],
1459        }
1460    }
1461
1462    #[test]
1463    fn pg_type_mapping() {
1464        assert_eq!(pg_column_type("string"), "TEXT");
1465        assert_eq!(pg_column_type("int"), "INTEGER");
1466        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1467        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1468        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1469        assert_eq!(pg_column_type("richtext"), "TEXT");
1470        assert_eq!(pg_column_type("id(User)"), "TEXT");
1471    }
1472
1473    #[test]
1474    fn quote_ident_simple() {
1475        assert_eq!(quote_ident("User"), "\"User\"");
1476        assert_eq!(quote_ident("email"), "\"email\"");
1477    }
1478
1479    #[test]
1480    fn quote_ident_escapes_embedded_double_quotes() {
1481        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1482        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1483    }
1484
1485    #[test]
1486    fn create_table_sql_basic() {
1487        let fields = vec![
1488            FieldSpec {
1489                name: "email".into(),
1490                field_type: "string".into(),
1491                optional: false,
1492                unique: true,
1493            },
1494            FieldSpec {
1495                name: "age".into(),
1496                field_type: "int".into(),
1497                optional: true,
1498                unique: false,
1499            },
1500        ];
1501        let sql = create_table_sql("User", &fields);
1502        assert_eq!(
1503            sql,
1504            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1505        );
1506    }
1507
1508    #[test]
1509    fn create_table_sql_escapes_identifiers() {
1510        let fields = vec![FieldSpec {
1511            name: "col\"x".into(),
1512            field_type: "string".into(),
1513            optional: false,
1514            unique: false,
1515        }];
1516        let sql = create_table_sql("my\"table", &fields);
1517        assert!(sql.contains("\"my\"\"table\""));
1518        assert!(sql.contains("\"col\"\"x\""));
1519    }
1520
1521    #[test]
1522    fn create_index_sql_unique() {
1523        let sql = create_index_sql("User", "by_email", &["email".into()], true);
1524        assert_eq!(
1525            sql,
1526            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1527        );
1528    }
1529
1530    #[test]
1531    fn create_index_sql_non_unique() {
1532        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1533        assert_eq!(
1534            sql,
1535            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1536        );
1537    }
1538
1539    #[test]
1540    fn add_column_sql_basic() {
1541        let field = FieldSpec {
1542            name: "bio".into(),
1543            field_type: "string".into(),
1544            optional: true,
1545            unique: false,
1546        };
1547        let sql = add_column_sql("User", &field);
1548        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1549    }
1550
1551    #[test]
1552    fn plan_from_manifest() {
1553        let adapter = PostgresAdapter;
1554        let manifest = test_manifest();
1555        let plan = adapter.plan_schema(&manifest).unwrap();
1556
1557        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
1558        assert!(plan.operations.iter().any(|op| matches!(
1559            op,
1560            SchemaOperation::CreateEntity { name, .. } if name == "User"
1561        )));
1562        assert!(plan.operations.iter().any(|op| matches!(
1563            op,
1564            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1565        )));
1566        assert!(plan.operations.iter().any(|op| matches!(
1567            op,
1568            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1569        )));
1570    }
1571
1572    #[test]
1573    fn plan_to_sql_produces_statements() {
1574        let adapter = PostgresAdapter;
1575        let manifest = test_manifest();
1576        let plan = adapter.plan_schema(&manifest).unwrap();
1577        let stmts = plan_to_sql(&plan).unwrap();
1578
1579        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
1580        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
1581        // declares a unique by_email index on User which lands as part of
1582        // the table. Final count: 2 tables + 2 indexes.
1583        let create_tables = stmts
1584            .iter()
1585            .filter(|s| s.starts_with("CREATE TABLE"))
1586            .count();
1587        let create_indexes = stmts
1588            .iter()
1589            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1590            .count();
1591        assert_eq!(create_tables, 2);
1592        assert!(create_indexes >= 1);
1593        assert!(stmts[0].starts_with("CREATE TABLE"));
1594        assert!(stmts[1].starts_with("CREATE TABLE"));
1595    }
1596
1597    #[test]
1598    fn plan_to_sql_rejects_unsupported() {
1599        let plan = SchemaPlan {
1600            operations: vec![SchemaOperation::RemoveEntity {
1601                name: "User".into(),
1602            }],
1603        };
1604        let result = plan_to_sql(&plan);
1605        assert!(result.is_err());
1606        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1607    }
1608
1609    #[test]
1610    fn apply_not_implemented() {
1611        let adapter = PostgresAdapter;
1612        let plan = SchemaPlan {
1613            operations: vec![SchemaOperation::Noop],
1614        };
1615        let result = adapter.apply_schema(&plan);
1616        assert!(result.is_err());
1617        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1618    }
1619
1620    #[test]
1621    fn sql_uses_quoted_identifiers() {
1622        let fields = vec![FieldSpec {
1623            name: "createdAt".into(),
1624            field_type: "datetime".into(),
1625            optional: false,
1626            unique: false,
1627        }];
1628        let sql = create_table_sql("User", &fields);
1629        // Postgres identifiers should be quoted for case-sensitivity.
1630        assert!(sql.contains("\"User\""));
1631        assert!(sql.contains("\"createdAt\""));
1632        assert!(sql.contains("TIMESTAMPTZ"));
1633    }
1634
1635    // -- Introspection SQL tests --
1636
1637    #[test]
1638    fn introspect_sql_constants_are_valid() {
1639        // Sanity checks that the SQL strings exist and look reasonable.
1640        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1641        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1642        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1643        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1644    }
1645
1646    // -- Plan from snapshot tests --
1647
1648    #[test]
1649    fn plan_from_empty_snapshot_creates_all() {
1650        let snapshot = crate::SchemaSnapshot { tables: vec![] };
1651        let manifest = test_manifest();
1652        let plan = plan_from_snapshot(&snapshot, &manifest);
1653
1654        assert!(plan.operations.iter().any(|op| matches!(
1655            op,
1656            SchemaOperation::CreateEntity { name, .. } if name == "User"
1657        )));
1658        assert!(plan.operations.iter().any(|op| matches!(
1659            op,
1660            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1661        )));
1662        assert!(plan.operations.iter().any(|op| matches!(
1663            op,
1664            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1665        )));
1666    }
1667
1668    #[test]
1669    fn plan_from_full_snapshot_is_noop() {
1670        let snapshot = crate::SchemaSnapshot {
1671            tables: vec![
1672                crate::TableSnapshot {
1673                    name: "User".into(),
1674                    columns: vec![
1675                        crate::ColumnSnapshot {
1676                            name: "id".into(),
1677                            column_type: "TEXT".into(),
1678                            notnull: true,
1679                            primary_key: true,
1680                        },
1681                        crate::ColumnSnapshot {
1682                            name: "email".into(),
1683                            column_type: "TEXT".into(),
1684                            notnull: true,
1685                            primary_key: false,
1686                        },
1687                        crate::ColumnSnapshot {
1688                            name: "displayName".into(),
1689                            column_type: "TEXT".into(),
1690                            notnull: true,
1691                            primary_key: false,
1692                        },
1693                        crate::ColumnSnapshot {
1694                            name: "createdAt".into(),
1695                            column_type: "TIMESTAMPTZ".into(),
1696                            notnull: true,
1697                            primary_key: false,
1698                        },
1699                    ],
1700                    indexes: vec![],
1701                },
1702                crate::TableSnapshot {
1703                    name: "Todo".into(),
1704                    columns: vec![
1705                        crate::ColumnSnapshot {
1706                            name: "id".into(),
1707                            column_type: "TEXT".into(),
1708                            notnull: true,
1709                            primary_key: true,
1710                        },
1711                        crate::ColumnSnapshot {
1712                            name: "title".into(),
1713                            column_type: "TEXT".into(),
1714                            notnull: true,
1715                            primary_key: false,
1716                        },
1717                        crate::ColumnSnapshot {
1718                            name: "done".into(),
1719                            column_type: "BOOLEAN".into(),
1720                            notnull: true,
1721                            primary_key: false,
1722                        },
1723                        crate::ColumnSnapshot {
1724                            name: "userId".into(),
1725                            column_type: "TEXT".into(),
1726                            notnull: true,
1727                            primary_key: false,
1728                        },
1729                        crate::ColumnSnapshot {
1730                            name: "createdAt".into(),
1731                            column_type: "TIMESTAMPTZ".into(),
1732                            notnull: true,
1733                            primary_key: false,
1734                        },
1735                    ],
1736                    indexes: vec![crate::IndexSnapshot {
1737                        name: "Todo_by_user".into(),
1738                        columns: vec!["userId".into()],
1739                        unique: false,
1740                    }],
1741                },
1742            ],
1743        };
1744        let manifest = test_manifest();
1745        let plan = plan_from_snapshot(&snapshot, &manifest);
1746        assert!(plan.is_empty());
1747    }
1748
1749    #[test]
1750    fn plan_detects_missing_column_in_snapshot() {
1751        let snapshot = crate::SchemaSnapshot {
1752            tables: vec![
1753                crate::TableSnapshot {
1754                    name: "User".into(),
1755                    columns: vec![
1756                        crate::ColumnSnapshot {
1757                            name: "id".into(),
1758                            column_type: "TEXT".into(),
1759                            notnull: true,
1760                            primary_key: true,
1761                        },
1762                        crate::ColumnSnapshot {
1763                            name: "email".into(),
1764                            column_type: "TEXT".into(),
1765                            notnull: true,
1766                            primary_key: false,
1767                        },
1768                        // missing displayName and createdAt
1769                    ],
1770                    indexes: vec![],
1771                },
1772                crate::TableSnapshot {
1773                    name: "Todo".into(),
1774                    columns: vec![
1775                        crate::ColumnSnapshot {
1776                            name: "id".into(),
1777                            column_type: "TEXT".into(),
1778                            notnull: true,
1779                            primary_key: true,
1780                        },
1781                        crate::ColumnSnapshot {
1782                            name: "title".into(),
1783                            column_type: "TEXT".into(),
1784                            notnull: true,
1785                            primary_key: false,
1786                        },
1787                        crate::ColumnSnapshot {
1788                            name: "done".into(),
1789                            column_type: "BOOLEAN".into(),
1790                            notnull: true,
1791                            primary_key: false,
1792                        },
1793                        crate::ColumnSnapshot {
1794                            name: "userId".into(),
1795                            column_type: "TEXT".into(),
1796                            notnull: true,
1797                            primary_key: false,
1798                        },
1799                        crate::ColumnSnapshot {
1800                            name: "createdAt".into(),
1801                            column_type: "TIMESTAMPTZ".into(),
1802                            notnull: true,
1803                            primary_key: false,
1804                        },
1805                    ],
1806                    indexes: vec![crate::IndexSnapshot {
1807                        name: "Todo_by_user".into(),
1808                        columns: vec!["userId".into()],
1809                        unique: false,
1810                    }],
1811                },
1812            ],
1813        };
1814        let manifest = test_manifest();
1815        let plan = plan_from_snapshot(&snapshot, &manifest);
1816
1817        let add_fields: Vec<_> = plan
1818            .operations
1819            .iter()
1820            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1821            .collect();
1822        assert_eq!(add_fields.len(), 2); // displayName + createdAt
1823    }
1824
1825    // -- CRUD helper tests (no live database required) --
1826
1827    #[test]
1828    fn json_value_to_string_handles_all_types() {
1829        assert_eq!(
1830            json_value_to_string(&serde_json::Value::String("hello".into())),
1831            "hello"
1832        );
1833        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1834        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1835        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1836        assert_eq!(
1837            json_value_to_string(&serde_json::Value::Bool(false)),
1838            "false"
1839        );
1840        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1841        // Arrays and objects get their JSON representation.
1842        assert_eq!(
1843            json_value_to_string(&serde_json::json!([1, 2, 3])),
1844            "[1,2,3]"
1845        );
1846        assert_eq!(
1847            json_value_to_string(&serde_json::json!({"a": 1})),
1848            "{\"a\":1}"
1849        );
1850    }
1851
1852    #[test]
1853    fn generate_id_returns_hex_string() {
1854        let id = generate_id();
1855        assert!(!id.is_empty());
1856        // Must be valid hex characters.
1857        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1858    }
1859
1860    #[test]
1861    fn generate_id_is_unique_across_calls() {
1862        let id1 = generate_id();
1863        let id2 = generate_id();
1864        assert_ne!(id1, id2);
1865    }
1866
1867    #[test]
1868    fn generate_id_is_lex_sortable() {
1869        // 1000 IDs back-to-back must come out in monotonically increasing
1870        // lexicographic order. This is what makes cursor pagination correct.
1871        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1872        let sorted = {
1873            let mut s = ids.clone();
1874            s.sort();
1875            s
1876        };
1877        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1878        // And every id must be the same width (otherwise lex comparison is
1879        // wrong at width boundaries).
1880        let len0 = ids[0].len();
1881        assert!(ids.iter().all(|id| id.len() == len0));
1882        ids.dedup();
1883        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1884    }
1885
1886    #[test]
1887    fn build_insert_sql_simple() {
1888        let data = serde_json::json!({
1889            "email": "alice@example.com",
1890            "displayName": "Alice"
1891        });
1892        let (sql, values) = build_insert_sql("User", &data).unwrap();
1893
1894        assert!(sql.starts_with("INSERT INTO \"User\""));
1895        assert!(sql.contains("id"));
1896        assert!(sql.contains("$1"));
1897        assert!(sql.contains("$2"));
1898        assert!(sql.contains("$3"));
1899        // First value is the generated ID — JsonParam::Text variant.
1900        match &values[0] {
1901            JsonParam::Text(s) => assert!(!s.is_empty()),
1902            other => panic!("expected Text id param, got {other:?}"),
1903        }
1904        assert_eq!(values.len(), 3); // id + 2 fields
1905    }
1906
1907    #[test]
1908    fn build_insert_sql_preserves_json_types() {
1909        let data = serde_json::json!({
1910            "n": 42,
1911            "f": 1.5,
1912            "b": true,
1913            "s": "hi",
1914            "z": null,
1915        });
1916        let (_sql, values) = build_insert_sql("T", &data).unwrap();
1917        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
1918        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
1919        assert!(matches!(kinds[0], JsonParam::Bool(true)));
1920        assert!(matches!(kinds[1], JsonParam::Float(_)));
1921        assert!(matches!(kinds[2], JsonParam::Int(42)));
1922        assert!(matches!(kinds[3], JsonParam::Text(_)));
1923        assert!(matches!(kinds[4], JsonParam::Null));
1924    }
1925
1926    #[test]
1927    fn build_insert_sql_quotes_column_names() {
1928        let data = serde_json::json!({"createdAt": "2026-01-01"});
1929        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1930        assert!(sql.contains("\"createdAt\""));
1931        assert!(sql.contains("\"Todo\""));
1932    }
1933
1934    #[test]
1935    fn build_insert_sql_rejects_non_object() {
1936        let data = serde_json::json!("not an object");
1937        let result = build_insert_sql("User", &data);
1938        assert!(result.is_err());
1939        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1940    }
1941
1942    #[test]
1943    fn build_update_sql_simple() {
1944        let data = serde_json::json!({
1945            "displayName": "Bob",
1946            "email": "bob@example.com"
1947        });
1948        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1949
1950        assert!(sql.starts_with("UPDATE \"User\" SET"));
1951        assert!(sql.contains("WHERE id = $1"));
1952        assert!(sql.contains("$2"));
1953        assert!(sql.contains("$3"));
1954        match &values[0] {
1955            JsonParam::Text(s) => assert_eq!(s, "abc123"),
1956            other => panic!("expected Text id param, got {other:?}"),
1957        }
1958        assert_eq!(values.len(), 3); // id + 2 fields
1959    }
1960
1961    #[test]
1962    fn build_update_sql_quotes_column_names() {
1963        let data = serde_json::json!({"displayName": "Carol"});
1964        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1965        assert!(sql.contains("\"displayName\" = $2"));
1966    }
1967
1968    #[test]
1969    fn build_update_sql_rejects_non_object() {
1970        let data = serde_json::json!(42);
1971        let result = build_update_sql("User", "id1", &data);
1972        assert!(result.is_err());
1973        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1974    }
1975
1976    #[test]
1977    fn build_update_sql_rejects_empty_object() {
1978        let data = serde_json::json!({});
1979        let err = build_update_sql("User", "id1", &data).unwrap_err();
1980        assert_eq!(err.code, "PG_INVALID_DATA");
1981        assert!(err.message.contains("at least one field"));
1982    }
1983}