Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40// ---------------------------------------------------------------------------
41// SQL generation
42// ---------------------------------------------------------------------------
43
44/// Generate a Postgres CREATE TABLE statement.
45pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48    for field in fields {
49        let col_type = pg_column_type(&field.field_type);
50        let not_null = if field.optional { "" } else { " NOT NULL" };
51        let unique = if field.unique { " UNIQUE" } else { "" };
52        columns.push(format!(
53            "{} {}{}{}",
54            quote_ident(&field.name),
55            col_type,
56            not_null,
57            unique
58        ));
59    }
60
61    format!(
62        "CREATE TABLE IF NOT EXISTS {} ({})",
63        quote_ident(entity_name),
64        columns.join(", ")
65    )
66}
67
68/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
69/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
70/// Required-ness is tracked in the manifest; enforcement deferred.
71pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72    let col_type = pg_column_type(&field.field_type);
73    let unique = if field.unique { " UNIQUE" } else { "" };
74    format!(
75        "ALTER TABLE {} ADD COLUMN {} {}{}",
76        quote_ident(entity_name),
77        quote_ident(&field.name),
78        col_type,
79        unique
80    )
81}
82
83/// Generate a Postgres CREATE INDEX statement.
84pub fn create_index_sql(
85    entity_name: &str,
86    index_name: &str,
87    fields: &[String],
88    unique: bool,
89) -> String {
90    let unique_str = if unique { "UNIQUE " } else { "" };
91    let full_index_name = format!("{}_{}", entity_name, index_name);
92    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93    format!(
94        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95        unique_str,
96        quote_ident(&full_index_name),
97        quote_ident(entity_name),
98        quoted_fields.join(", ")
99    )
100}
101
102// ---------------------------------------------------------------------------
103// PostgresAdapter — planning-only adapter
104// ---------------------------------------------------------------------------
105
106/// A Postgres storage adapter. Currently supports planning only.
107/// No live connection — SQL generation and planning from manifest.
108pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112        // Plan from empty baseline.
113        let mut operations = Vec::new();
114
115        for entity in &target.entities {
116            let fields: Vec<FieldSpec> = entity
117                .fields
118                .iter()
119                .map(|f| FieldSpec {
120                    name: f.name.clone(),
121                    field_type: f.field_type.clone(),
122                    optional: f.optional,
123                    unique: f.unique,
124                })
125                .collect();
126
127            operations.push(SchemaOperation::CreateEntity {
128                name: entity.name.clone(),
129                fields,
130            });
131
132            for index in &entity.indexes {
133                operations.push(SchemaOperation::AddIndex {
134                    entity: entity.name.clone(),
135                    name: index.name.clone(),
136                    fields: index.fields.clone(),
137                    unique: index.unique,
138                });
139            }
140        }
141
142        if operations.is_empty() {
143            operations.push(SchemaOperation::Noop);
144        }
145
146        Ok(SchemaPlan { operations })
147    }
148
149    // apply_schema intentionally not implemented — uses default trait error.
150}
151
152/// Generate all SQL statements for a plan, in order.
153/// Useful for dry-run preview of what Postgres DDL would be executed.
154pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155    let mut statements = Vec::new();
156
157    for op in &plan.operations {
158        match op {
159            SchemaOperation::CreateEntity { name, fields } => {
160                statements.push(create_table_sql(name, fields));
161            }
162            SchemaOperation::AddField { entity, field } => {
163                statements.push(add_column_sql(entity, field));
164            }
165            SchemaOperation::AddIndex {
166                entity,
167                name,
168                fields,
169                unique,
170            } => {
171                statements.push(create_index_sql(entity, name, fields, *unique));
172            }
173            SchemaOperation::Noop => {}
174            other => {
175                return Err(StorageError {
176                    code: "PG_OP_UNSUPPORTED".into(),
177                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
178                });
179            }
180        }
181    }
182
183    Ok(statements)
184}
185
186// ---------------------------------------------------------------------------
187// Introspection SQL helpers
188//
189// These generate the SQL queries that a live Postgres connection would run
190// to read the current schema. No connection required — just SQL strings.
191// ---------------------------------------------------------------------------
192
193/// SQL to list user tables in the public schema.
194pub const INTROSPECT_TABLES_SQL: &str = "\
195    SELECT table_name \
196    FROM information_schema.tables \
197    WHERE table_schema = 'public' \
198      AND table_type = 'BASE TABLE' \
199      AND table_name NOT LIKE '_pylon_%' \
200    ORDER BY table_name";
201
202/// SQL to list columns for a given table.
203/// Use with parameter: table_name.
204pub const INTROSPECT_COLUMNS_SQL: &str = "\
205    SELECT column_name, data_type, is_nullable, \
206           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
207            JOIN information_schema.key_column_usage kcu \
208              ON tc.constraint_name = kcu.constraint_name \
209            WHERE tc.table_name = c.table_name \
210              AND kcu.column_name = c.column_name \
211              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
212    FROM information_schema.columns c \
213    WHERE table_schema = 'public' AND table_name = $1 \
214    ORDER BY ordinal_position";
215
216/// SQL to list indexes for a given table.
217/// Use with parameter: table_name.
218pub const INTROSPECT_INDEXES_SQL: &str = "\
219    SELECT i.relname as index_name, \
220           ix.indisunique as is_unique, \
221           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
222    FROM pg_index ix \
223    JOIN pg_class t ON t.oid = ix.indrelid \
224    JOIN pg_class i ON i.oid = ix.indexrelid \
225    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
226    JOIN pg_namespace n ON n.oid = t.relnamespace \
227    WHERE n.nspname = 'public' \
228      AND t.relname = $1 \
229      AND NOT ix.indisprimary \
230    GROUP BY i.relname, ix.indisunique \
231    ORDER BY i.relname";
232
233/// Plan from a snapshot (reuses the shared plan_from_snapshot).
234/// This allows Postgres to plan incrementally once introspection data is available.
235pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
236    crate::plan_from_snapshot(snapshot, target)
237}
238
239// ---------------------------------------------------------------------------
240// CRUD SQL generation helpers (used by live adapter, testable without a DB)
241// ---------------------------------------------------------------------------
242
243/// Generate a lex-sortable, monotonic-ish unique ID.
244///
245/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
246/// of a per-process atomic counter. The counter prevents collisions when two
247/// inserts hit the same nanosecond and — critically — keeps order stable: an
248/// id minted at the same nanosecond is monotonically greater than the
249/// previous one. Width is fixed at 40 chars so lexicographic comparison
250/// matches creation order, which is what cursor pagination relies on.
251pub fn generate_id() -> String {
252    use std::sync::atomic::{AtomicU32, Ordering};
253    use std::time::{SystemTime, UNIX_EPOCH};
254    static COUNTER: AtomicU32 = AtomicU32::new(0);
255    let ts = SystemTime::now()
256        .duration_since(UNIX_EPOCH)
257        .unwrap_or_default()
258        .as_nanos();
259    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
260    format!("{ts:032x}{seq:08x}")
261}
262
263/// Convert a JSON value to its string representation for use as a SQL parameter.
264///
265/// Kept for back-compat with callers that need a textual fallback (e.g.
266/// human-readable logs). New code should bind through [`JsonParam`] so
267/// integers/booleans/nulls reach Postgres in their typed form instead of
268/// collapsing to TEXT, which the driver can't coerce into INTEGER /
269/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
270/// into an empty string for FK columns.
271pub fn json_value_to_string(val: &serde_json::Value) -> String {
272    match val {
273        serde_json::Value::String(s) => s.clone(),
274        serde_json::Value::Number(n) => n.to_string(),
275        serde_json::Value::Bool(b) => b.to_string(),
276        serde_json::Value::Null => String::new(),
277        other => other.to_string(),
278    }
279}
280
281/// A typed wrapper around a JSON scalar that implements
282/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
283///
284/// The previous implementation passed every value as `String`, which broke
285/// non-text columns (the postgres driver can't bind a string literal into
286/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
287/// `null` into the empty string for nullable FKs (so `unlink` left
288/// dangling `""` references instead of NULL). `JsonParam` carries the
289/// JSON variant tag through to `to_sql` so the driver can pick the
290/// correct binary representation per column type.
291///
292/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
293/// runtime layer doesn't currently model array/object columns at the
294/// manifest level on Postgres, so anything that lands here came from
295/// caller-supplied prose that's expected to fit into a TEXT column.
296#[derive(Debug, Clone, PartialEq)]
297pub enum JsonParam {
298    Null,
299    Text(String),
300    Int(i64),
301    Float(f64),
302    Bool(bool),
303}
304
305impl JsonParam {
306    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
307    /// that fit `i64` go through as Int; everything else goes through as
308    /// Float to preserve fractional / large-magnitude values.
309    pub fn from_json(val: &serde_json::Value) -> Self {
310        match val {
311            serde_json::Value::Null => JsonParam::Null,
312            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
313            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
314            serde_json::Value::Number(n) => {
315                if let Some(i) = n.as_i64() {
316                    JsonParam::Int(i)
317                } else if let Some(f) = n.as_f64() {
318                    JsonParam::Float(f)
319                } else {
320                    JsonParam::Text(n.to_string())
321                }
322            }
323            other => JsonParam::Text(other.to_string()),
324        }
325    }
326}
327
328#[cfg(feature = "postgres-live")]
329impl postgres::types::ToSql for JsonParam {
330    fn to_sql(
331        &self,
332        ty: &postgres::types::Type,
333        out: &mut bytes::BytesMut,
334    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
335        use postgres::types::Type;
336
337        // Null binds as SQL NULL regardless of the column's declared
338        // type — the postgres driver treats `IsNull::Yes` as a null
339        // value of the requested type.
340        if matches!(self, JsonParam::Null) {
341            return Ok(postgres::types::IsNull::Yes);
342        }
343
344        // Match each JsonParam variant against the COLUMN's declared
345        // type so the binary encoding actually fits the target slot.
346        // Postgres rejects "binary data of wrong size" if you bind an
347        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
348        // exactly what the previous "everything is a String" path did
349        // on every non-TEXT column.
350        match (self, ty) {
351            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
352
353            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
354            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
355            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
356            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
357            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
358
359            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
360            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
361            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
362            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
363
364            (JsonParam::Text(s), &Type::TEXT)
365            | (JsonParam::Text(s), &Type::VARCHAR)
366            | (JsonParam::Text(s), &Type::BPCHAR)
367            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
368            (JsonParam::Text(s), &Type::TIMESTAMPTZ) | (JsonParam::Text(s), &Type::TIMESTAMP) => {
369                // The runtime currently models datetimes as ISO 8601
370                // strings end-to-end. Bind via the &str impl with the
371                // target type so postgres parses through its TEXT input
372                // function for that type. Cheaper than introducing
373                // chrono just for date binding.
374                s.as_str().to_sql(ty, out)
375            }
376
377            // Cross-type fallback: render as text and bind into a TEXT
378            // slot, OR error if the target column doesn't accept text.
379            // Catches "manifest says INT but caller sent a stringified
380            // number" — better to fail loudly than silently coerce.
381            (other, _) => {
382                let s = match other {
383                    JsonParam::Bool(b) => b.to_string(),
384                    JsonParam::Int(n) => n.to_string(),
385                    JsonParam::Float(f) => f.to_string(),
386                    JsonParam::Text(s) => s.clone(),
387                    JsonParam::Null => unreachable!(),
388                };
389                s.to_sql(ty, out)
390            }
391        }
392    }
393
394    fn accepts(_ty: &postgres::types::Type) -> bool {
395        // Defer per-variant acceptance to to_sql_checked, which dispatches
396        // to the inner type's ToSql impl. Returning `true` here matches
397        // the postgres crate's recommended pattern for sum-type wrappers.
398        true
399    }
400
401    postgres::types::to_sql_checked!();
402}
403
404/// Build an INSERT SQL statement and collect typed parameter values.
405/// Returns `(sql, params)` where `params[0]` is the generated ID
406/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
407/// value so the postgres driver can bind them to typed columns
408/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
409/// the database as SQL NULL — the previous string-collapsing path stored
410/// `""` for nullable FKs and broke any non-text column.
411pub fn build_insert_sql(
412    entity: &str,
413    data: &serde_json::Value,
414) -> Result<(String, Vec<JsonParam>), StorageError> {
415    let id = generate_id();
416    let obj = data.as_object().ok_or_else(|| StorageError {
417        code: "PG_INVALID_DATA".into(),
418        message: "Insert data must be a JSON object".into(),
419    })?;
420
421    let mut col_names = vec!["id".to_string()];
422    let mut placeholders = vec!["$1".to_string()];
423    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
424
425    for (i, (key, val)) in obj.iter().enumerate() {
426        col_names.push(quote_ident(key));
427        placeholders.push(format!("${}", i + 2));
428        values.push(JsonParam::from_json(val));
429    }
430
431    let sql = format!(
432        "INSERT INTO {} ({}) VALUES ({})",
433        quote_ident(entity),
434        col_names.join(", "),
435        placeholders.join(", ")
436    );
437
438    Ok((sql, values))
439}
440
441/// Build an UPDATE SQL statement and collect typed parameter values.
442/// Returns `(sql, params)` where `params[0]` is the row ID.
443pub fn build_update_sql(
444    entity: &str,
445    id: &str,
446    data: &serde_json::Value,
447) -> Result<(String, Vec<JsonParam>), StorageError> {
448    let obj = data.as_object().ok_or_else(|| StorageError {
449        code: "PG_INVALID_DATA".into(),
450        message: "Update data must be a JSON object".into(),
451    })?;
452
453    if obj.is_empty() {
454        return Err(StorageError {
455            code: "PG_INVALID_DATA".into(),
456            message: "Update data must contain at least one field".into(),
457        });
458    }
459
460    let mut set_clauses = Vec::new();
461    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
462
463    for (i, (key, val)) in obj.iter().enumerate() {
464        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
465        values.push(JsonParam::from_json(val));
466    }
467
468    let sql = format!(
469        "UPDATE {} SET {} WHERE id = $1",
470        quote_ident(entity),
471        set_clauses.join(", ")
472    );
473
474    Ok((sql, values))
475}
476
477/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
478/// at insert/update/transact call sites. The postgres driver wants a
479/// slice of trait objects; this avoids repeating the same map at each site.
480#[cfg(feature = "postgres-live")]
481fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
482    values
483        .iter()
484        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
485        .collect()
486}
487
488// ---------------------------------------------------------------------------
489// Live Postgres adapter (requires "postgres-live" feature)
490// ---------------------------------------------------------------------------
491
492#[cfg(feature = "postgres-live")]
493pub mod live {
494    use super::*;
495    use crate::{
496        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
497    };
498
499    /// A live Postgres adapter with a real database connection.
500    pub struct LivePostgresAdapter {
501        client: postgres::Client,
502    }
503
504    impl LivePostgresAdapter {
505        /// Connect to a Postgres database.
506        pub fn connect(url: &str) -> Result<Self, StorageError> {
507            let client =
508                postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
509                    code: "PG_CONNECT_FAILED".into(),
510                    message: format!("Failed to connect to Postgres: {e}"),
511                })?;
512            Ok(Self { client })
513        }
514
515        /// Read the current schema from the live database.
516        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
517            let table_rows = self
518                .client
519                .query(INTROSPECT_TABLES_SQL, &[])
520                .map_err(pg_err)?;
521
522            let mut tables = Vec::new();
523            for row in &table_rows {
524                let table_name: String = row.get(0);
525                let columns = self.read_columns(&table_name)?;
526                let indexes = self.read_indexes(&table_name)?;
527                tables.push(TableSnapshot {
528                    name: table_name,
529                    columns,
530                    indexes,
531                });
532            }
533
534            Ok(SchemaSnapshot { tables })
535        }
536
537        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
538            let rows = self
539                .client
540                .query(INTROSPECT_COLUMNS_SQL, &[&table])
541                .map_err(pg_err)?;
542
543            let mut columns = Vec::new();
544            for row in &rows {
545                let name: String = row.get(0);
546                let data_type: String = row.get(1);
547                let is_nullable: String = row.get(2);
548                let is_pk: i64 = row.get(3);
549                columns.push(ColumnSnapshot {
550                    name,
551                    column_type: data_type,
552                    notnull: is_nullable == "NO",
553                    primary_key: is_pk > 0,
554                });
555            }
556            Ok(columns)
557        }
558
559        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
560            let rows = self
561                .client
562                .query(INTROSPECT_INDEXES_SQL, &[&table])
563                .map_err(pg_err)?;
564
565            let mut indexes = Vec::new();
566            for row in &rows {
567                let name: String = row.get(0);
568                let unique: bool = row.get(1);
569                let columns: Vec<String> = row.get(2);
570                indexes.push(IndexSnapshot {
571                    name,
572                    columns,
573                    unique,
574                });
575            }
576            Ok(indexes)
577        }
578
579        /// Plan from live database state.
580        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
581            let snapshot = self.read_schema()?;
582            Ok(crate::plan_from_snapshot(&snapshot, target))
583        }
584    }
585
586    impl StorageAdapter for LivePostgresAdapter {
587        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
588            Err(StorageError {
589                code: "PG_PLAN_NEEDS_MUTABLE".into(),
590                message: "Use plan_from_live() instead for live Postgres planning".into(),
591            })
592        }
593
594        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
595            Err(StorageError {
596                code: "PG_APPLY_USE_METHOD".into(),
597                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
598            })
599        }
600    }
601
602    impl LivePostgresAdapter {
603        /// Apply a schema plan to the live database.
604        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
605            let statements = plan_to_sql(plan)?;
606            for sql in &statements {
607                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
608            }
609            Ok(())
610        }
611
612        /// Execute a raw SQL statement against the live database. Used by
613        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
614        /// production code should go through `apply_plan` so changes are
615        /// represented in the migration history. Returns the number of
616        /// rows affected.
617        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
618            self.client.execute(sql, &[]).map_err(pg_err)
619        }
620
621        /// Insert a row. Returns the generated ID.
622        pub fn insert(
623            &mut self,
624            entity: &str,
625            data: &serde_json::Value,
626        ) -> Result<String, StorageError> {
627            let (sql, values) = build_insert_sql(entity, data)?;
628            // The first param is always the generated ID — extract it before
629            // we hand `values` off to the postgres driver as borrowed slices.
630            let id = match &values[0] {
631                JsonParam::Text(s) => s.clone(),
632                _ => {
633                    return Err(StorageError {
634                        code: "PG_INTERNAL".into(),
635                        message: "build_insert_sql produced non-text id param".into(),
636                    });
637                }
638            };
639            let params = as_pg_params(&values);
640            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
641            Ok(id)
642        }
643
644        /// Get a row by ID.
645        pub fn get_by_id(
646            &mut self,
647            entity: &str,
648            id: &str,
649        ) -> Result<Option<serde_json::Value>, StorageError> {
650            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
651            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
652
653            match rows.first() {
654                Some(row) => Ok(Some(row_to_json(row))),
655                None => Ok(None),
656            }
657        }
658
659        /// List all rows from an entity.
660        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
661            let sql = format!("SELECT * FROM {}", quote_ident(entity));
662            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
663
664            Ok(rows.iter().map(row_to_json).collect())
665        }
666
667        /// Cursor-paginated list. `after` is the last `id` from the previous
668        /// page; the result contains rows with `id > after` (lex order),
669        /// limited to `limit`. Used for sync push/pull.
670        pub fn list_after(
671            &mut self,
672            entity: &str,
673            after: Option<&str>,
674            limit: usize,
675        ) -> Result<Vec<serde_json::Value>, StorageError> {
676            // Cap limit at a sensible upper bound so a malicious client can't
677            // stream the whole table by passing limit=u64::MAX.
678            let capped: i64 = limit.min(10_000) as i64;
679            let sql = match after {
680                Some(_) => format!(
681                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
682                    quote_ident(entity)
683                ),
684                None => format!(
685                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
686                    quote_ident(entity)
687                ),
688            };
689            let rows = match after {
690                Some(cursor) => self
691                    .client
692                    .query(sql.as_str(), &[&cursor, &capped])
693                    .map_err(pg_err)?,
694                None => self
695                    .client
696                    .query(sql.as_str(), &[&capped])
697                    .map_err(pg_err)?,
698            };
699            Ok(rows.iter().map(row_to_json).collect())
700        }
701
702        /// Update a row by ID. Returns true if the row was found and updated.
703        pub fn update(
704            &mut self,
705            entity: &str,
706            id: &str,
707            data: &serde_json::Value,
708        ) -> Result<bool, StorageError> {
709            let (sql, values) = build_update_sql(entity, id, data)?;
710            let params = as_pg_params(&values);
711            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
712            Ok(rows_affected > 0)
713        }
714
715        /// Delete a row by ID. Returns true if the row was found and deleted.
716        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
717            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
718            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
719            Ok(rows_affected > 0)
720        }
721
722        /// Look up a row by `field = value`. Caller must validate `field`
723        /// against the manifest before calling — we still `quote_ident` it
724        /// but won't catch a typo against the entity definition.
725        pub fn lookup_field(
726            &mut self,
727            entity: &str,
728            field: &str,
729            value: &str,
730        ) -> Result<Option<serde_json::Value>, StorageError> {
731            let sql = format!(
732                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
733                quote_ident(entity),
734                quote_ident(field),
735            );
736            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
737            Ok(rows.first().map(row_to_json))
738        }
739
740        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
741        ///
742        /// Supported operators (parity with the SQLite path):
743        /// - Equality (`field: value`)
744        /// - `$not`: emits `field != value`
745        /// - `$gt` / `$gte` / `$lt` / `$lte`
746        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
747        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
748        ///   if/when the SQLite side adds it)
749        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
750        ///
751        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
752        ///
753        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
754        /// would need a tsvector column or a generic ILIKE OR-fold across
755        /// every text field, neither of which is wired up yet. Returns
756        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
757        /// receiving silently-broad results.
758        ///
759        /// Anything else is silently ignored (matches the in-memory fallback's
760        /// permissive behavior). Field names are validated against `valid_columns`
761        /// to prevent SQL injection — pass the entity's column set.
762        pub fn query_filtered(
763            &mut self,
764            entity: &str,
765            filter: &serde_json::Value,
766            valid_columns: &[String],
767        ) -> Result<Vec<serde_json::Value>, StorageError> {
768            let empty = serde_json::Map::new();
769            let obj = filter.as_object().unwrap_or(&empty);
770
771            let validate = |col: &str| -> Result<(), StorageError> {
772                if col == "id" || valid_columns.iter().any(|c| c == col) {
773                    Ok(())
774                } else {
775                    Err(StorageError {
776                        code: "UNKNOWN_COLUMN".into(),
777                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
778                    })
779                }
780            };
781
782            let mut where_clauses: Vec<String> = Vec::new();
783            let mut order_clause = String::new();
784            let mut limit_clause = String::new();
785            let mut offset_clause = String::new();
786            // Collect (col, op, value) so placeholder numbers can be assigned
787            // in a single materialization pass after the parse loop. Values
788            // are now JsonParam (typed) instead of String — see `value_to_pg`.
789            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
790
791            for (key, val) in obj {
792                match key.as_str() {
793                    "$search" => {
794                        // FTS5 (SQLite) has no portable equivalent in
795                        // Postgres without an explicit tsvector column.
796                        // Return a clear error rather than silently
797                        // ignoring the operator — callers that hit this
798                        // need to either branch on backend or define a
799                        // tsvector + GIN index in their schema.
800                        return Err(StorageError {
801                            code: "SEARCH_NOT_SUPPORTED".into(),
802                            message: "$search is SQLite-FTS5-only; use a Postgres tsvector column with the storage adapter's full-text path"
803                                .into(),
804                        });
805                    }
806                    "$order" => {
807                        if let Some(ord) = val.as_object() {
808                            let mut parts = Vec::new();
809                            for (col, dir) in ord {
810                                validate(col)?;
811                                let d = match dir.as_str().unwrap_or("asc") {
812                                    "desc" | "DESC" => "DESC",
813                                    _ => "ASC",
814                                };
815                                parts.push(format!("{} {d}", quote_ident(col)));
816                            }
817                            if !parts.is_empty() {
818                                order_clause = format!(" ORDER BY {}", parts.join(", "));
819                            }
820                        }
821                    }
822                    "$limit" => {
823                        if let Some(n) = val.as_u64() {
824                            limit_clause = format!(" LIMIT {}", n);
825                        }
826                    }
827                    "$offset" => {
828                        if let Some(n) = val.as_u64() {
829                            offset_clause = format!(" OFFSET {}", n);
830                        }
831                    }
832                    field => {
833                        validate(field)?;
834                        match val {
835                            serde_json::Value::Object(ops) => {
836                                for (op, v) in ops {
837                                    match op.as_str() {
838                                        "$not" => planned.push((
839                                            field.into(),
840                                            "!=".into(),
841                                            value_to_pg(v),
842                                        )),
843                                        "$gt" => {
844                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
845                                        }
846                                        "$gte" => planned.push((
847                                            field.into(),
848                                            ">=".into(),
849                                            value_to_pg(v),
850                                        )),
851                                        "$lt" => {
852                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
853                                        }
854                                        "$lte" => planned.push((
855                                            field.into(),
856                                            "<=".into(),
857                                            value_to_pg(v),
858                                        )),
859                                        "$like" => planned.push((
860                                            field.into(),
861                                            "LIKE".into(),
862                                            value_to_pg(v),
863                                        )),
864                                        "$in" => {
865                                            if let Some(arr) = v.as_array() {
866                                                let placeholders: Vec<String> = (0..arr.len())
867                                                    .map(|i| format!("${}", planned.len() + 1 + i))
868                                                    .collect();
869                                                where_clauses.push(format!(
870                                                    "{} IN ({})",
871                                                    quote_ident(field),
872                                                    placeholders.join(", "),
873                                                ));
874                                                for x in arr {
875                                                    planned.push((
876                                                        format!("__inline_{}", planned.len()),
877                                                        "__INLINE__".into(),
878                                                        value_to_pg(x),
879                                                    ));
880                                                }
881                                            }
882                                        }
883                                        _ => {}
884                                    }
885                                }
886                            }
887                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
888                        }
889                    }
890                }
891            }
892
893            // Materialize planned -> SQL + params.
894            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
895            for (field, op, v) in &planned {
896                if op == "__INLINE__" {
897                    // Already emitted via the IN-clause path; just push the value.
898                    params.push(v.clone());
899                } else {
900                    let placeholder = format!("${}", params.len() + 1);
901                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
902                    params.push(v.clone());
903                }
904            }
905
906            let where_sql = if where_clauses.is_empty() {
907                String::new()
908            } else {
909                format!(" WHERE {}", where_clauses.join(" AND "))
910            };
911            let sql = format!(
912                "SELECT * FROM {}{}{}{}{}",
913                quote_ident(entity),
914                where_sql,
915                order_clause,
916                limit_clause,
917                offset_clause,
918            );
919
920            let pg_params = as_pg_params(&params);
921            let rows = self
922                .client
923                .query(sql.as_str(), &pg_params)
924                .map_err(pg_err)?;
925            Ok(rows.iter().map(row_to_json).collect())
926        }
927
928        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
929        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
930        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
931        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
932        /// via `date_trunc`), and a flat-equality `where` filter.
933        ///
934        /// Spec format (same JSON shape used by the SQLite path):
935        /// ```json
936        /// { "count": "*",
937        ///   "sum": ["amount"],
938        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
939        ///   "where": {"status": "paid"} }
940        /// ```
941        ///
942        /// `valid_columns` is used to validate every field name before it's
943        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
944        /// `DataStore` impl in this crate) supplies the entity's column set
945        /// from the manifest.
946        pub fn aggregate(
947            &mut self,
948            entity: &str,
949            spec: &serde_json::Value,
950            valid_columns: &[String],
951        ) -> Result<serde_json::Value, StorageError> {
952            let obj = spec.as_object().ok_or_else(|| StorageError {
953                code: "INVALID_QUERY".into(),
954                message: "aggregate spec must be a JSON object".into(),
955            })?;
956
957            let validate = |col: &str| -> Result<(), StorageError> {
958                if col == "id" || valid_columns.iter().any(|c| c == col) {
959                    Ok(())
960                } else {
961                    Err(StorageError {
962                        code: "UNKNOWN_COLUMN".into(),
963                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
964                    })
965                }
966            };
967
968            let mut select_parts: Vec<String> = Vec::new();
969            let mut result_fields: Vec<String> = Vec::new();
970
971            if let Some(count) = obj.get("count") {
972                match count {
973                    serde_json::Value::String(s) if s == "*" => {
974                        select_parts.push("COUNT(*) AS count".into());
975                        result_fields.push("count".into());
976                    }
977                    serde_json::Value::String(field) => {
978                        validate(field)?;
979                        let alias = format!("count_{field}");
980                        select_parts.push(format!(
981                            "COUNT({}) AS {}",
982                            quote_ident(field),
983                            quote_ident(&alias),
984                        ));
985                        result_fields.push(alias);
986                    }
987                    _ => {}
988                }
989            }
990
991            for (fn_name, prefix) in [
992                ("sum", "sum_"),
993                ("avg", "avg_"),
994                ("min", "min_"),
995                ("max", "max_"),
996            ] {
997                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
998                    for field in fields {
999                        if let Some(f) = field.as_str() {
1000                            validate(f)?;
1001                            let alias = format!("{prefix}{f}");
1002                            let sql_fn = fn_name.to_uppercase();
1003                            select_parts.push(format!(
1004                                "{}({}) AS {}",
1005                                sql_fn,
1006                                quote_ident(f),
1007                                quote_ident(&alias),
1008                            ));
1009                            result_fields.push(alias);
1010                        }
1011                    }
1012                }
1013            }
1014
1015            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1016                for field in fields {
1017                    if let Some(f) = field.as_str() {
1018                        validate(f)?;
1019                        let alias = format!("count_distinct_{f}");
1020                        select_parts.push(format!(
1021                            "COUNT(DISTINCT {}) AS {}",
1022                            quote_ident(f),
1023                            quote_ident(&alias),
1024                        ));
1025                        result_fields.push(alias);
1026                    }
1027                }
1028            }
1029
1030            // groupBy: column name or { field, bucket } — same vocabulary as
1031            // the SQLite path. Buckets translate to Postgres `date_trunc`
1032            // (SQLite uses `strftime`); both collapse rows to the bucket
1033            // boundary identically.
1034            let mut group_by: Vec<String> = Vec::new();
1035            let mut group_select: Vec<String> = Vec::new();
1036            let mut group_field_names: Vec<String> = Vec::new();
1037            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1038                for g in groups {
1039                    if let Some(f) = g.as_str() {
1040                        validate(f)?;
1041                        let q = quote_ident(f);
1042                        group_by.push(q.clone());
1043                        group_select.push(q);
1044                        group_field_names.push(f.to_string());
1045                    } else if let Some(spec) = g.as_object() {
1046                        let field =
1047                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1048                                StorageError {
1049                                    code: "INVALID_QUERY".into(),
1050                                    message: "groupBy object spec requires `field`".into(),
1051                                }
1052                            })?;
1053                        validate(field)?;
1054                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1055                        let trunc_unit = match bucket {
1056                            "hour" | "day" | "week" | "month" | "year" => bucket,
1057                            _ => {
1058                                return Err(StorageError {
1059                                    code: "INVALID_QUERY".into(),
1060                                    message: format!(
1061                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1062                                    ),
1063                                });
1064                            }
1065                        };
1066                        let alias = format!("{field}_{bucket}");
1067                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1068                        group_by.push(expr.clone());
1069                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1070                        group_field_names.push(alias);
1071                    }
1072                }
1073            }
1074
1075            let mut full_select = group_select.clone();
1076            full_select.extend(select_parts.iter().cloned());
1077            if full_select.is_empty() {
1078                return Err(StorageError {
1079                    code: "INVALID_QUERY".into(),
1080                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1081                });
1082            }
1083
1084            let mut where_clauses: Vec<String> = Vec::new();
1085            let mut params: Vec<JsonParam> = Vec::new();
1086            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1087                for (k, v) in w {
1088                    validate(k)?;
1089                    let placeholder = format!("${}", params.len() + 1);
1090                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1091                    params.push(value_to_pg(v));
1092                }
1093            }
1094            let where_sql = if where_clauses.is_empty() {
1095                String::new()
1096            } else {
1097                format!(" WHERE {}", where_clauses.join(" AND "))
1098            };
1099            let group_sql = if group_by.is_empty() {
1100                String::new()
1101            } else {
1102                format!(" GROUP BY {}", group_by.join(", "))
1103            };
1104
1105            let sql = format!(
1106                "SELECT {} FROM {}{}{}",
1107                full_select.join(", "),
1108                quote_ident(entity),
1109                where_sql,
1110                group_sql,
1111            );
1112
1113            let pg_params = as_pg_params(&params);
1114            let rows = self
1115                .client
1116                .query(sql.as_str(), &pg_params)
1117                .map_err(pg_err)?;
1118
1119            let column_names: Vec<String> = group_field_names
1120                .iter()
1121                .chain(result_fields.iter())
1122                .cloned()
1123                .collect();
1124
1125            let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1126            for row in &rows {
1127                let row_json = row_to_json(row);
1128                if let serde_json::Value::Object(map) = &row_json {
1129                    let mut filtered = serde_json::Map::new();
1130                    for name in &column_names {
1131                        if let Some(v) = map.get(name) {
1132                            filtered.insert(name.clone(), v.clone());
1133                        }
1134                    }
1135                    out.push(serde_json::Value::Object(filtered));
1136                } else {
1137                    out.push(row_json);
1138                }
1139            }
1140            Ok(serde_json::json!({ "rows": out }))
1141        }
1142    }
1143
1144    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1145    pub enum TxOp<'a> {
1146        Insert {
1147            entity: &'a str,
1148            data: &'a serde_json::Value,
1149        },
1150        Update {
1151            entity: &'a str,
1152            id: &'a str,
1153            data: &'a serde_json::Value,
1154        },
1155        Delete {
1156            entity: &'a str,
1157            id: &'a str,
1158        },
1159    }
1160
1161    /// Result of a single op inside a transaction.
1162    #[derive(Debug, Clone)]
1163    pub enum TxResult {
1164        Inserted(String),
1165        Updated(bool),
1166        Deleted(bool),
1167    }
1168
1169    impl LivePostgresAdapter {
1170        /// Run `ops` inside a single Postgres transaction. Either all of them
1171        /// commit together or none of them do — there is no partial state on
1172        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1173        /// guard is dropped without `commit()` being called.
1174        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1175            let mut tx = self.client.transaction().map_err(pg_err)?;
1176            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1177
1178            for op in ops {
1179                match op {
1180                    TxOp::Insert { entity, data } => {
1181                        let (sql, values) = build_insert_sql(entity, data)?;
1182                        let id = match &values[0] {
1183                            JsonParam::Text(s) => s.clone(),
1184                            _ => {
1185                                return Err(StorageError {
1186                                    code: "PG_INTERNAL".into(),
1187                                    message: "build_insert_sql produced non-text id param".into(),
1188                                });
1189                            }
1190                        };
1191                        let params = as_pg_params(&values);
1192                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1193                        results.push(TxResult::Inserted(id));
1194                    }
1195                    TxOp::Update { entity, id, data } => {
1196                        let (sql, values) = build_update_sql(entity, id, data)?;
1197                        let params = as_pg_params(&values);
1198                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1199                        results.push(TxResult::Updated(n > 0));
1200                    }
1201                    TxOp::Delete { entity, id } => {
1202                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1203                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1204                        results.push(TxResult::Deleted(n > 0));
1205                    }
1206                }
1207            }
1208
1209            tx.commit().map_err(pg_err)?;
1210            Ok(results)
1211        }
1212    }
1213
1214    /// Lift a JSON value into a typed Postgres parameter. The previous
1215    /// implementation collapsed everything to `String`, which silently
1216    /// stringified ints/bools and turned JSON `null` into `""` for
1217    /// nullable columns. Forwarding through `JsonParam` keeps the column
1218    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1219    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1220        JsonParam::from_json(v)
1221    }
1222
1223    fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1224        use postgres::types::Type;
1225        let mut obj = serde_json::Map::new();
1226        for (i, col) in row.columns().iter().enumerate() {
1227            let name = col.name().to_string();
1228
1229            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1230            // and a panic in a query handler poisons the connection mutex,
1231            // taking down all subsequent reads on this datastore. Anything
1232            // that fails to decode becomes Null with a one-shot warning.
1233            //
1234            // Timestamps and the catch-all path explicitly DON'T request
1235            // `String` — the postgres crate uses binary protocol by default
1236            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1237            // ask for `Vec<u8>` and lossy-stringify, which works for all
1238            // text-shaped columns in either protocol.
1239            let value: serde_json::Value = match *col.type_() {
1240                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1241                    .flatten()
1242                    .map(serde_json::Value::Bool)
1243                    .unwrap_or(serde_json::Value::Null),
1244                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1245                    .flatten()
1246                    .map(|v| serde_json::Value::Number(v.into()))
1247                    .unwrap_or(serde_json::Value::Null),
1248                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1249                    .flatten()
1250                    .map(|v| serde_json::Value::Number(v.into()))
1251                    .unwrap_or(serde_json::Value::Null),
1252                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1253                    .flatten()
1254                    .map(|v| serde_json::Value::Number(v.into()))
1255                    .unwrap_or(serde_json::Value::Null),
1256                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1257                    .flatten()
1258                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1259                    .map(serde_json::Value::Number)
1260                    .unwrap_or(serde_json::Value::Null),
1261                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1262                    .flatten()
1263                    .and_then(serde_json::Number::from_f64)
1264                    .map(serde_json::Value::Number)
1265                    .unwrap_or(serde_json::Value::Null),
1266                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1267                    .flatten()
1268                    .unwrap_or(serde_json::Value::Null),
1269                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1270                    .flatten()
1271                    .map(|b| serde_json::Value::String(b64(&b)))
1272                    .unwrap_or(serde_json::Value::Null),
1273                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1274                    try_get_or_null::<Option<String>>(row, i)
1275                        .flatten()
1276                        .map(serde_json::Value::String)
1277                        .unwrap_or(serde_json::Value::Null)
1278                }
1279                _ => {
1280                    // Last resort: ask Postgres to render anything else as
1281                    // text via a stringifying decode through Vec<u8>. If even
1282                    // that fails (rare — Postgres types not implementing the
1283                    // text format), fall through to Null with a warning.
1284                    match row.try_get::<_, Option<String>>(i) {
1285                        Ok(Some(s)) => serde_json::Value::String(s),
1286                        Ok(None) => serde_json::Value::Null,
1287                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1288                            Ok(Some(bytes)) => serde_json::Value::String(
1289                                String::from_utf8_lossy(&bytes).into_owned(),
1290                            ),
1291                            _ => serde_json::Value::Null,
1292                        },
1293                    }
1294                }
1295            };
1296            obj.insert(name, value);
1297        }
1298        serde_json::Value::Object(obj)
1299    }
1300
1301    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1302    where
1303        T: postgres::types::FromSql<'a>,
1304    {
1305        match row.try_get::<_, T>(i) {
1306            Ok(v) => Some(v),
1307            Err(e) => {
1308                tracing::warn!(
1309                    "[postgres] decode failed for column {} ({}): {e}",
1310                    i,
1311                    row.columns()[i].name()
1312                );
1313                None
1314            }
1315        }
1316    }
1317
1318    /// Minimal base64 encoder so we don't need another dependency just for
1319    /// the BYTEA column edge case.
1320    fn b64(bytes: &[u8]) -> String {
1321        const TABLE: &[u8; 64] =
1322            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1323        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1324        let chunks = bytes.chunks(3);
1325        for chunk in chunks {
1326            let b = [
1327                chunk.first().copied().unwrap_or(0),
1328                chunk.get(1).copied().unwrap_or(0),
1329                chunk.get(2).copied().unwrap_or(0),
1330            ];
1331            out.push(TABLE[(b[0] >> 2) as usize] as char);
1332            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1333            if chunk.len() > 1 {
1334                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1335            } else {
1336                out.push('=');
1337            }
1338            if chunk.len() > 2 {
1339                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1340            } else {
1341                out.push('=');
1342            }
1343        }
1344        out
1345    }
1346
1347    fn pg_err(e: postgres::Error) -> StorageError {
1348        // postgres::Error's Display is intentionally short ("db error",
1349        // "connection error" etc.) — the actual SQLSTATE / detail lives
1350        // on the source chain. Walk the chain so the final message has
1351        // enough signal to debug a failed insert/update without
1352        // attaching a debugger.
1353        use std::error::Error;
1354        let mut detail = format!("{e}");
1355        let mut src: Option<&dyn Error> = e.source();
1356        while let Some(s) = src {
1357            detail.push_str(": ");
1358            detail.push_str(&format!("{s}"));
1359            src = s.source();
1360        }
1361        StorageError {
1362            code: "PG_QUERY_FAILED".into(),
1363            message: format!("Postgres query failed: {detail}"),
1364        }
1365    }
1366}
1367
1368// ---------------------------------------------------------------------------
1369// Tests
1370// ---------------------------------------------------------------------------
1371
1372#[cfg(test)]
1373mod tests {
1374    use super::*;
1375
1376    /// Hand-rolled fixture that matches the snapshots in the tests
1377    /// below. Decoupled from any example's `pylon.manifest.json` so
1378    /// changing an example schema doesn't bleed into adapter tests.
1379    fn test_manifest() -> AppManifest {
1380        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1381        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1382            name: name.into(),
1383            field_type: ty.into(),
1384            optional: opt,
1385            unique: uniq,
1386            crdt: None,
1387        };
1388        AppManifest {
1389            manifest_version: 1,
1390            name: "test".into(),
1391            version: "0.0.0".into(),
1392            entities: vec![
1393                ManifestEntity {
1394                    name: "User".into(),
1395                    fields: vec![
1396                        f("email", "string", false, true),
1397                        f("displayName", "string", false, false),
1398                        f("createdAt", "datetime", false, false),
1399                    ],
1400                    indexes: vec![],
1401                    relations: vec![],
1402                    search: None,
1403                    crdt: true,
1404                },
1405                ManifestEntity {
1406                    name: "Todo".into(),
1407                    fields: vec![
1408                        f("title", "string", false, false),
1409                        f("done", "bool", false, false),
1410                        f("userId", "id(User)", false, false),
1411                        f("createdAt", "datetime", false, false),
1412                    ],
1413                    indexes: vec![ManifestIndex {
1414                        name: "by_user".into(),
1415                        fields: vec!["userId".into()],
1416                        unique: false,
1417                    }],
1418                    relations: vec![],
1419                    search: None,
1420                    crdt: true,
1421                },
1422            ],
1423            queries: vec![],
1424            actions: vec![],
1425            policies: vec![],
1426            routes: vec![],
1427        }
1428    }
1429
1430    #[test]
1431    fn pg_type_mapping() {
1432        assert_eq!(pg_column_type("string"), "TEXT");
1433        assert_eq!(pg_column_type("int"), "INTEGER");
1434        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1435        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1436        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1437        assert_eq!(pg_column_type("richtext"), "TEXT");
1438        assert_eq!(pg_column_type("id(User)"), "TEXT");
1439    }
1440
1441    #[test]
1442    fn quote_ident_simple() {
1443        assert_eq!(quote_ident("User"), "\"User\"");
1444        assert_eq!(quote_ident("email"), "\"email\"");
1445    }
1446
1447    #[test]
1448    fn quote_ident_escapes_embedded_double_quotes() {
1449        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1450        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1451    }
1452
1453    #[test]
1454    fn create_table_sql_basic() {
1455        let fields = vec![
1456            FieldSpec {
1457                name: "email".into(),
1458                field_type: "string".into(),
1459                optional: false,
1460                unique: true,
1461            },
1462            FieldSpec {
1463                name: "age".into(),
1464                field_type: "int".into(),
1465                optional: true,
1466                unique: false,
1467            },
1468        ];
1469        let sql = create_table_sql("User", &fields);
1470        assert_eq!(
1471            sql,
1472            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1473        );
1474    }
1475
1476    #[test]
1477    fn create_table_sql_escapes_identifiers() {
1478        let fields = vec![FieldSpec {
1479            name: "col\"x".into(),
1480            field_type: "string".into(),
1481            optional: false,
1482            unique: false,
1483        }];
1484        let sql = create_table_sql("my\"table", &fields);
1485        assert!(sql.contains("\"my\"\"table\""));
1486        assert!(sql.contains("\"col\"\"x\""));
1487    }
1488
1489    #[test]
1490    fn create_index_sql_unique() {
1491        let sql = create_index_sql("User", "by_email", &["email".into()], true);
1492        assert_eq!(
1493            sql,
1494            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1495        );
1496    }
1497
1498    #[test]
1499    fn create_index_sql_non_unique() {
1500        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1501        assert_eq!(
1502            sql,
1503            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1504        );
1505    }
1506
1507    #[test]
1508    fn add_column_sql_basic() {
1509        let field = FieldSpec {
1510            name: "bio".into(),
1511            field_type: "string".into(),
1512            optional: true,
1513            unique: false,
1514        };
1515        let sql = add_column_sql("User", &field);
1516        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1517    }
1518
1519    #[test]
1520    fn plan_from_manifest() {
1521        let adapter = PostgresAdapter;
1522        let manifest = test_manifest();
1523        let plan = adapter.plan_schema(&manifest).unwrap();
1524
1525        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
1526        assert!(plan.operations.iter().any(|op| matches!(
1527            op,
1528            SchemaOperation::CreateEntity { name, .. } if name == "User"
1529        )));
1530        assert!(plan.operations.iter().any(|op| matches!(
1531            op,
1532            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1533        )));
1534        assert!(plan.operations.iter().any(|op| matches!(
1535            op,
1536            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1537        )));
1538    }
1539
1540    #[test]
1541    fn plan_to_sql_produces_statements() {
1542        let adapter = PostgresAdapter;
1543        let manifest = test_manifest();
1544        let plan = adapter.plan_schema(&manifest).unwrap();
1545        let stmts = plan_to_sql(&plan).unwrap();
1546
1547        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
1548        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
1549        // declares a unique by_email index on User which lands as part of
1550        // the table. Final count: 2 tables + 2 indexes.
1551        let create_tables = stmts
1552            .iter()
1553            .filter(|s| s.starts_with("CREATE TABLE"))
1554            .count();
1555        let create_indexes = stmts
1556            .iter()
1557            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1558            .count();
1559        assert_eq!(create_tables, 2);
1560        assert!(create_indexes >= 1);
1561        assert!(stmts[0].starts_with("CREATE TABLE"));
1562        assert!(stmts[1].starts_with("CREATE TABLE"));
1563    }
1564
1565    #[test]
1566    fn plan_to_sql_rejects_unsupported() {
1567        let plan = SchemaPlan {
1568            operations: vec![SchemaOperation::RemoveEntity {
1569                name: "User".into(),
1570            }],
1571        };
1572        let result = plan_to_sql(&plan);
1573        assert!(result.is_err());
1574        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1575    }
1576
1577    #[test]
1578    fn apply_not_implemented() {
1579        let adapter = PostgresAdapter;
1580        let plan = SchemaPlan {
1581            operations: vec![SchemaOperation::Noop],
1582        };
1583        let result = adapter.apply_schema(&plan);
1584        assert!(result.is_err());
1585        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1586    }
1587
1588    #[test]
1589    fn sql_uses_quoted_identifiers() {
1590        let fields = vec![FieldSpec {
1591            name: "createdAt".into(),
1592            field_type: "datetime".into(),
1593            optional: false,
1594            unique: false,
1595        }];
1596        let sql = create_table_sql("User", &fields);
1597        // Postgres identifiers should be quoted for case-sensitivity.
1598        assert!(sql.contains("\"User\""));
1599        assert!(sql.contains("\"createdAt\""));
1600        assert!(sql.contains("TIMESTAMPTZ"));
1601    }
1602
1603    // -- Introspection SQL tests --
1604
1605    #[test]
1606    fn introspect_sql_constants_are_valid() {
1607        // Sanity checks that the SQL strings exist and look reasonable.
1608        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1609        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1610        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1611        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1612    }
1613
1614    // -- Plan from snapshot tests --
1615
1616    #[test]
1617    fn plan_from_empty_snapshot_creates_all() {
1618        let snapshot = crate::SchemaSnapshot { tables: vec![] };
1619        let manifest = test_manifest();
1620        let plan = plan_from_snapshot(&snapshot, &manifest);
1621
1622        assert!(plan.operations.iter().any(|op| matches!(
1623            op,
1624            SchemaOperation::CreateEntity { name, .. } if name == "User"
1625        )));
1626        assert!(plan.operations.iter().any(|op| matches!(
1627            op,
1628            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1629        )));
1630        assert!(plan.operations.iter().any(|op| matches!(
1631            op,
1632            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1633        )));
1634    }
1635
1636    #[test]
1637    fn plan_from_full_snapshot_is_noop() {
1638        let snapshot = crate::SchemaSnapshot {
1639            tables: vec![
1640                crate::TableSnapshot {
1641                    name: "User".into(),
1642                    columns: vec![
1643                        crate::ColumnSnapshot {
1644                            name: "id".into(),
1645                            column_type: "TEXT".into(),
1646                            notnull: true,
1647                            primary_key: true,
1648                        },
1649                        crate::ColumnSnapshot {
1650                            name: "email".into(),
1651                            column_type: "TEXT".into(),
1652                            notnull: true,
1653                            primary_key: false,
1654                        },
1655                        crate::ColumnSnapshot {
1656                            name: "displayName".into(),
1657                            column_type: "TEXT".into(),
1658                            notnull: true,
1659                            primary_key: false,
1660                        },
1661                        crate::ColumnSnapshot {
1662                            name: "createdAt".into(),
1663                            column_type: "TIMESTAMPTZ".into(),
1664                            notnull: true,
1665                            primary_key: false,
1666                        },
1667                    ],
1668                    indexes: vec![],
1669                },
1670                crate::TableSnapshot {
1671                    name: "Todo".into(),
1672                    columns: vec![
1673                        crate::ColumnSnapshot {
1674                            name: "id".into(),
1675                            column_type: "TEXT".into(),
1676                            notnull: true,
1677                            primary_key: true,
1678                        },
1679                        crate::ColumnSnapshot {
1680                            name: "title".into(),
1681                            column_type: "TEXT".into(),
1682                            notnull: true,
1683                            primary_key: false,
1684                        },
1685                        crate::ColumnSnapshot {
1686                            name: "done".into(),
1687                            column_type: "BOOLEAN".into(),
1688                            notnull: true,
1689                            primary_key: false,
1690                        },
1691                        crate::ColumnSnapshot {
1692                            name: "userId".into(),
1693                            column_type: "TEXT".into(),
1694                            notnull: true,
1695                            primary_key: false,
1696                        },
1697                        crate::ColumnSnapshot {
1698                            name: "createdAt".into(),
1699                            column_type: "TIMESTAMPTZ".into(),
1700                            notnull: true,
1701                            primary_key: false,
1702                        },
1703                    ],
1704                    indexes: vec![crate::IndexSnapshot {
1705                        name: "Todo_by_user".into(),
1706                        columns: vec!["userId".into()],
1707                        unique: false,
1708                    }],
1709                },
1710            ],
1711        };
1712        let manifest = test_manifest();
1713        let plan = plan_from_snapshot(&snapshot, &manifest);
1714        assert!(plan.is_empty());
1715    }
1716
1717    #[test]
1718    fn plan_detects_missing_column_in_snapshot() {
1719        let snapshot = crate::SchemaSnapshot {
1720            tables: vec![
1721                crate::TableSnapshot {
1722                    name: "User".into(),
1723                    columns: vec![
1724                        crate::ColumnSnapshot {
1725                            name: "id".into(),
1726                            column_type: "TEXT".into(),
1727                            notnull: true,
1728                            primary_key: true,
1729                        },
1730                        crate::ColumnSnapshot {
1731                            name: "email".into(),
1732                            column_type: "TEXT".into(),
1733                            notnull: true,
1734                            primary_key: false,
1735                        },
1736                        // missing displayName and createdAt
1737                    ],
1738                    indexes: vec![],
1739                },
1740                crate::TableSnapshot {
1741                    name: "Todo".into(),
1742                    columns: vec![
1743                        crate::ColumnSnapshot {
1744                            name: "id".into(),
1745                            column_type: "TEXT".into(),
1746                            notnull: true,
1747                            primary_key: true,
1748                        },
1749                        crate::ColumnSnapshot {
1750                            name: "title".into(),
1751                            column_type: "TEXT".into(),
1752                            notnull: true,
1753                            primary_key: false,
1754                        },
1755                        crate::ColumnSnapshot {
1756                            name: "done".into(),
1757                            column_type: "BOOLEAN".into(),
1758                            notnull: true,
1759                            primary_key: false,
1760                        },
1761                        crate::ColumnSnapshot {
1762                            name: "userId".into(),
1763                            column_type: "TEXT".into(),
1764                            notnull: true,
1765                            primary_key: false,
1766                        },
1767                        crate::ColumnSnapshot {
1768                            name: "createdAt".into(),
1769                            column_type: "TIMESTAMPTZ".into(),
1770                            notnull: true,
1771                            primary_key: false,
1772                        },
1773                    ],
1774                    indexes: vec![crate::IndexSnapshot {
1775                        name: "Todo_by_user".into(),
1776                        columns: vec!["userId".into()],
1777                        unique: false,
1778                    }],
1779                },
1780            ],
1781        };
1782        let manifest = test_manifest();
1783        let plan = plan_from_snapshot(&snapshot, &manifest);
1784
1785        let add_fields: Vec<_> = plan
1786            .operations
1787            .iter()
1788            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1789            .collect();
1790        assert_eq!(add_fields.len(), 2); // displayName + createdAt
1791    }
1792
1793    // -- CRUD helper tests (no live database required) --
1794
1795    #[test]
1796    fn json_value_to_string_handles_all_types() {
1797        assert_eq!(
1798            json_value_to_string(&serde_json::Value::String("hello".into())),
1799            "hello"
1800        );
1801        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1802        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1803        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1804        assert_eq!(
1805            json_value_to_string(&serde_json::Value::Bool(false)),
1806            "false"
1807        );
1808        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1809        // Arrays and objects get their JSON representation.
1810        assert_eq!(
1811            json_value_to_string(&serde_json::json!([1, 2, 3])),
1812            "[1,2,3]"
1813        );
1814        assert_eq!(
1815            json_value_to_string(&serde_json::json!({"a": 1})),
1816            "{\"a\":1}"
1817        );
1818    }
1819
1820    #[test]
1821    fn generate_id_returns_hex_string() {
1822        let id = generate_id();
1823        assert!(!id.is_empty());
1824        // Must be valid hex characters.
1825        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1826    }
1827
1828    #[test]
1829    fn generate_id_is_unique_across_calls() {
1830        let id1 = generate_id();
1831        let id2 = generate_id();
1832        assert_ne!(id1, id2);
1833    }
1834
1835    #[test]
1836    fn generate_id_is_lex_sortable() {
1837        // 1000 IDs back-to-back must come out in monotonically increasing
1838        // lexicographic order. This is what makes cursor pagination correct.
1839        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1840        let sorted = {
1841            let mut s = ids.clone();
1842            s.sort();
1843            s
1844        };
1845        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1846        // And every id must be the same width (otherwise lex comparison is
1847        // wrong at width boundaries).
1848        let len0 = ids[0].len();
1849        assert!(ids.iter().all(|id| id.len() == len0));
1850        ids.dedup();
1851        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1852    }
1853
1854    #[test]
1855    fn build_insert_sql_simple() {
1856        let data = serde_json::json!({
1857            "email": "alice@example.com",
1858            "displayName": "Alice"
1859        });
1860        let (sql, values) = build_insert_sql("User", &data).unwrap();
1861
1862        assert!(sql.starts_with("INSERT INTO \"User\""));
1863        assert!(sql.contains("id"));
1864        assert!(sql.contains("$1"));
1865        assert!(sql.contains("$2"));
1866        assert!(sql.contains("$3"));
1867        // First value is the generated ID — JsonParam::Text variant.
1868        match &values[0] {
1869            JsonParam::Text(s) => assert!(!s.is_empty()),
1870            other => panic!("expected Text id param, got {other:?}"),
1871        }
1872        assert_eq!(values.len(), 3); // id + 2 fields
1873    }
1874
1875    #[test]
1876    fn build_insert_sql_preserves_json_types() {
1877        let data = serde_json::json!({
1878            "n": 42,
1879            "f": 1.5,
1880            "b": true,
1881            "s": "hi",
1882            "z": null,
1883        });
1884        let (_sql, values) = build_insert_sql("T", &data).unwrap();
1885        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
1886        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
1887        assert!(matches!(kinds[0], JsonParam::Bool(true)));
1888        assert!(matches!(kinds[1], JsonParam::Float(_)));
1889        assert!(matches!(kinds[2], JsonParam::Int(42)));
1890        assert!(matches!(kinds[3], JsonParam::Text(_)));
1891        assert!(matches!(kinds[4], JsonParam::Null));
1892    }
1893
1894    #[test]
1895    fn build_insert_sql_quotes_column_names() {
1896        let data = serde_json::json!({"createdAt": "2026-01-01"});
1897        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1898        assert!(sql.contains("\"createdAt\""));
1899        assert!(sql.contains("\"Todo\""));
1900    }
1901
1902    #[test]
1903    fn build_insert_sql_rejects_non_object() {
1904        let data = serde_json::json!("not an object");
1905        let result = build_insert_sql("User", &data);
1906        assert!(result.is_err());
1907        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1908    }
1909
1910    #[test]
1911    fn build_update_sql_simple() {
1912        let data = serde_json::json!({
1913            "displayName": "Bob",
1914            "email": "bob@example.com"
1915        });
1916        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
1917
1918        assert!(sql.starts_with("UPDATE \"User\" SET"));
1919        assert!(sql.contains("WHERE id = $1"));
1920        assert!(sql.contains("$2"));
1921        assert!(sql.contains("$3"));
1922        match &values[0] {
1923            JsonParam::Text(s) => assert_eq!(s, "abc123"),
1924            other => panic!("expected Text id param, got {other:?}"),
1925        }
1926        assert_eq!(values.len(), 3); // id + 2 fields
1927    }
1928
1929    #[test]
1930    fn build_update_sql_quotes_column_names() {
1931        let data = serde_json::json!({"displayName": "Carol"});
1932        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
1933        assert!(sql.contains("\"displayName\" = $2"));
1934    }
1935
1936    #[test]
1937    fn build_update_sql_rejects_non_object() {
1938        let data = serde_json::json!(42);
1939        let result = build_update_sql("User", "id1", &data);
1940        assert!(result.is_err());
1941        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1942    }
1943
1944    #[test]
1945    fn build_update_sql_rejects_empty_object() {
1946        let data = serde_json::json!({});
1947        let err = build_update_sql("User", "id1", &data).unwrap_err();
1948        assert_eq!(err.code, "PG_INVALID_DATA");
1949        assert!(err.message.contains("at least one field"));
1950    }
1951}