Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40// ---------------------------------------------------------------------------
41// SQL generation
42// ---------------------------------------------------------------------------
43
44/// Generate a Postgres CREATE TABLE statement.
45pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48    for field in fields {
49        let col_type = pg_column_type(&field.field_type);
50        let not_null = if field.optional { "" } else { " NOT NULL" };
51        let unique = if field.unique { " UNIQUE" } else { "" };
52        columns.push(format!(
53            "{} {}{}{}",
54            quote_ident(&field.name),
55            col_type,
56            not_null,
57            unique
58        ));
59    }
60
61    format!(
62        "CREATE TABLE IF NOT EXISTS {} ({})",
63        quote_ident(entity_name),
64        columns.join(", ")
65    )
66}
67
68/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
69/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
70/// Required-ness is tracked in the manifest; enforcement deferred.
71pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72    let col_type = pg_column_type(&field.field_type);
73    let unique = if field.unique { " UNIQUE" } else { "" };
74    format!(
75        "ALTER TABLE {} ADD COLUMN {} {}{}",
76        quote_ident(entity_name),
77        quote_ident(&field.name),
78        col_type,
79        unique
80    )
81}
82
83/// Generate a Postgres CREATE INDEX statement.
84pub fn create_index_sql(
85    entity_name: &str,
86    index_name: &str,
87    fields: &[String],
88    unique: bool,
89) -> String {
90    let unique_str = if unique { "UNIQUE " } else { "" };
91    let full_index_name = format!("{}_{}", entity_name, index_name);
92    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93    format!(
94        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95        unique_str,
96        quote_ident(&full_index_name),
97        quote_ident(entity_name),
98        quoted_fields.join(", ")
99    )
100}
101
102// ---------------------------------------------------------------------------
103// PostgresAdapter — planning-only adapter
104// ---------------------------------------------------------------------------
105
106/// A Postgres storage adapter. Currently supports planning only.
107/// No live connection — SQL generation and planning from manifest.
108pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112        // Plan from empty baseline.
113        let mut operations = Vec::new();
114
115        for entity in &target.entities {
116            let fields: Vec<FieldSpec> = entity
117                .fields
118                .iter()
119                .map(|f| FieldSpec {
120                    name: f.name.clone(),
121                    field_type: f.field_type.clone(),
122                    optional: f.optional,
123                    unique: f.unique,
124                })
125                .collect();
126
127            operations.push(SchemaOperation::CreateEntity {
128                name: entity.name.clone(),
129                fields,
130            });
131
132            for index in &entity.indexes {
133                operations.push(SchemaOperation::AddIndex {
134                    entity: entity.name.clone(),
135                    name: index.name.clone(),
136                    fields: index.fields.clone(),
137                    unique: index.unique,
138                });
139            }
140        }
141
142        if operations.is_empty() {
143            operations.push(SchemaOperation::Noop);
144        }
145
146        Ok(SchemaPlan { operations })
147    }
148
149    // apply_schema intentionally not implemented — uses default trait error.
150}
151
152/// Generate all SQL statements for a plan, in order.
153/// Useful for dry-run preview of what Postgres DDL would be executed.
154pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155    let mut statements = Vec::new();
156
157    for op in &plan.operations {
158        match op {
159            SchemaOperation::CreateEntity { name, fields } => {
160                statements.push(create_table_sql(name, fields));
161            }
162            SchemaOperation::AddField { entity, field } => {
163                statements.push(add_column_sql(entity, field));
164            }
165            SchemaOperation::AlterField {
166                entity,
167                previous,
168                target,
169            } => {
170                // Only nullable transitions today. SET / DROP NOT NULL is
171                // safe on a populated table when going from required →
172                // optional (existing rows already satisfy NOT NULL); the
173                // reverse direction (optional → required) succeeds only
174                // if every row has a non-null value, which the planner
175                // has no way to know — Postgres will fail the migration
176                // if it can't and the operator gets a clear error from
177                // the apply step.
178                if previous.optional && !target.optional {
179                    statements.push(format!(
180                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
181                        quote_ident(entity),
182                        quote_ident(&target.name)
183                    ));
184                } else if !previous.optional && target.optional {
185                    statements.push(format!(
186                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
187                        quote_ident(entity),
188                        quote_ident(&target.name)
189                    ));
190                }
191                // Nothing emitted when neither nullable nor type changed
192                // — falls through silently. AlterField with no actual
193                // shape change shouldn't happen in practice (the planner
194                // only emits it on real drift), but guard against
195                // emitting empty SQL just in case.
196            }
197            SchemaOperation::AddIndex {
198                entity,
199                name,
200                fields,
201                unique,
202            } => {
203                statements.push(create_index_sql(entity, name, fields, *unique));
204            }
205            SchemaOperation::Noop => {}
206            other => {
207                return Err(StorageError {
208                    code: "PG_OP_UNSUPPORTED".into(),
209                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
210                });
211            }
212        }
213    }
214
215    Ok(statements)
216}
217
218// ---------------------------------------------------------------------------
219// Introspection SQL helpers
220//
221// These generate the SQL queries that a live Postgres connection would run
222// to read the current schema. No connection required — just SQL strings.
223// ---------------------------------------------------------------------------
224
225/// SQL to list user tables in the public schema.
226pub const INTROSPECT_TABLES_SQL: &str = "\
227    SELECT table_name \
228    FROM information_schema.tables \
229    WHERE table_schema = 'public' \
230      AND table_type = 'BASE TABLE' \
231      AND table_name NOT LIKE '_pylon_%' \
232    ORDER BY table_name";
233
234/// SQL to list columns for a given table.
235/// Use with parameter: table_name.
236pub const INTROSPECT_COLUMNS_SQL: &str = "\
237    SELECT column_name, data_type, is_nullable, \
238           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
239            JOIN information_schema.key_column_usage kcu \
240              ON tc.constraint_name = kcu.constraint_name \
241            WHERE tc.table_name = c.table_name \
242              AND kcu.column_name = c.column_name \
243              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
244    FROM information_schema.columns c \
245    WHERE table_schema = 'public' AND table_name = $1 \
246    ORDER BY ordinal_position";
247
248/// SQL to list indexes for a given table.
249/// Use with parameter: table_name.
250pub const INTROSPECT_INDEXES_SQL: &str = "\
251    SELECT i.relname as index_name, \
252           ix.indisunique as is_unique, \
253           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
254    FROM pg_index ix \
255    JOIN pg_class t ON t.oid = ix.indrelid \
256    JOIN pg_class i ON i.oid = ix.indexrelid \
257    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
258    JOIN pg_namespace n ON n.oid = t.relnamespace \
259    WHERE n.nspname = 'public' \
260      AND t.relname = $1 \
261      AND NOT ix.indisprimary \
262    GROUP BY i.relname, ix.indisunique \
263    ORDER BY i.relname";
264
265/// Plan from a snapshot (reuses the shared plan_from_snapshot).
266/// This allows Postgres to plan incrementally once introspection data is available.
267pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
268    crate::plan_from_snapshot(snapshot, target)
269}
270
271// ---------------------------------------------------------------------------
272// CRUD SQL generation helpers (used by live adapter, testable without a DB)
273// ---------------------------------------------------------------------------
274
275/// Generate a lex-sortable, monotonic-ish unique ID.
276///
277/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
278/// of a per-process atomic counter. The counter prevents collisions when two
279/// inserts hit the same nanosecond and — critically — keeps order stable: an
280/// id minted at the same nanosecond is monotonically greater than the
281/// previous one. Width is fixed at 40 chars so lexicographic comparison
282/// matches creation order, which is what cursor pagination relies on.
283pub fn generate_id() -> String {
284    use std::sync::atomic::{AtomicU32, Ordering};
285    use std::time::{SystemTime, UNIX_EPOCH};
286    static COUNTER: AtomicU32 = AtomicU32::new(0);
287    let ts = SystemTime::now()
288        .duration_since(UNIX_EPOCH)
289        .unwrap_or_default()
290        .as_nanos();
291    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
292    format!("{ts:032x}{seq:08x}")
293}
294
295/// Convert a JSON value to its string representation for use as a SQL parameter.
296///
297/// Kept for back-compat with callers that need a textual fallback (e.g.
298/// human-readable logs). New code should bind through [`JsonParam`] so
299/// integers/booleans/nulls reach Postgres in their typed form instead of
300/// collapsing to TEXT, which the driver can't coerce into INTEGER /
301/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
302/// into an empty string for FK columns.
303pub fn json_value_to_string(val: &serde_json::Value) -> String {
304    match val {
305        serde_json::Value::String(s) => s.clone(),
306        serde_json::Value::Number(n) => n.to_string(),
307        serde_json::Value::Bool(b) => b.to_string(),
308        serde_json::Value::Null => String::new(),
309        other => other.to_string(),
310    }
311}
312
313/// A typed wrapper around a JSON scalar that implements
314/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
315///
316/// The previous implementation passed every value as `String`, which broke
317/// non-text columns (the postgres driver can't bind a string literal into
318/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
319/// `null` into the empty string for nullable FKs (so `unlink` left
320/// dangling `""` references instead of NULL). `JsonParam` carries the
321/// JSON variant tag through to `to_sql` so the driver can pick the
322/// correct binary representation per column type.
323///
324/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
325/// runtime layer doesn't currently model array/object columns at the
326/// manifest level on Postgres, so anything that lands here came from
327/// caller-supplied prose that's expected to fit into a TEXT column.
328#[derive(Debug, Clone, PartialEq)]
329pub enum JsonParam {
330    Null,
331    Text(String),
332    Int(i64),
333    Float(f64),
334    Bool(bool),
335}
336
337impl JsonParam {
338    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
339    /// that fit `i64` go through as Int; everything else goes through as
340    /// Float to preserve fractional / large-magnitude values.
341    pub fn from_json(val: &serde_json::Value) -> Self {
342        match val {
343            serde_json::Value::Null => JsonParam::Null,
344            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
345            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
346            serde_json::Value::Number(n) => {
347                if let Some(i) = n.as_i64() {
348                    JsonParam::Int(i)
349                } else if let Some(f) = n.as_f64() {
350                    JsonParam::Float(f)
351                } else {
352                    JsonParam::Text(n.to_string())
353                }
354            }
355            other => JsonParam::Text(other.to_string()),
356        }
357    }
358}
359
360#[cfg(feature = "postgres-live")]
361impl postgres::types::ToSql for JsonParam {
362    fn to_sql(
363        &self,
364        ty: &postgres::types::Type,
365        out: &mut bytes::BytesMut,
366    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
367        use postgres::types::Type;
368
369        // Null binds as SQL NULL regardless of the column's declared
370        // type — the postgres driver treats `IsNull::Yes` as a null
371        // value of the requested type.
372        if matches!(self, JsonParam::Null) {
373            return Ok(postgres::types::IsNull::Yes);
374        }
375
376        // Match each JsonParam variant against the COLUMN's declared
377        // type so the binary encoding actually fits the target slot.
378        // Postgres rejects "binary data of wrong size" if you bind an
379        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
380        // exactly what the previous "everything is a String" path did
381        // on every non-TEXT column.
382        match (self, ty) {
383            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
384
385            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
386            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
387            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
388            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
389            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
390
391            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
392            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
393            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
394            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
395
396            (JsonParam::Text(s), &Type::TEXT)
397            | (JsonParam::Text(s), &Type::VARCHAR)
398            | (JsonParam::Text(s), &Type::BPCHAR)
399            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
400            (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
401                // The runtime models datetimes as ISO 8601 strings
402                // (`pylon_kernel::util::now_iso` shape, plus
403                // user-supplied RFC 3339). Postgres's TIMESTAMPTZ
404                // binary wire format is `i64` microseconds since
405                // 2000-01-01 UTC — NOT the bytes of an ISO string.
406                // The previous impl bound via `&str::to_sql(TIMESTAMPTZ, ...)`,
407                // which advertised TIMESTAMPTZ format but wrote raw
408                // ASCII; Postgres rejected with "incorrect binary
409                // data format in bind parameter N". This was the
410                // OAuth-callback failure mode on pylon-cloud
411                // (User.createdAt). Parse via chrono and let the
412                // postgres crate's `with-chrono-0_4` ToSql impl
413                // emit the proper binary format.
414                let dt = chrono::DateTime::parse_from_rfc3339(s)
415                    .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
416                    .with_timezone(&chrono::Utc);
417                dt.to_sql(ty, out)
418            }
419            (JsonParam::Text(s), &Type::TIMESTAMP) => {
420                // TIMESTAMP (no timezone) — same conversion shape but
421                // bind as NaiveDateTime so chrono picks the right
422                // binary encoding for the column.
423                let dt = chrono::DateTime::parse_from_rfc3339(s)
424                    .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
425                    .with_timezone(&chrono::Utc)
426                    .naive_utc();
427                dt.to_sql(ty, out)
428            }
429            (JsonParam::Text(s), &Type::DATE) => {
430                let dt = chrono::DateTime::parse_from_rfc3339(s)
431                    .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
432                    .with_timezone(&chrono::Utc)
433                    .date_naive();
434                dt.to_sql(ty, out)
435            }
436
437            // Cross-type fallback: render as text and bind into a TEXT
438            // slot, OR error if the target column doesn't accept text.
439            // Catches "manifest says INT but caller sent a stringified
440            // number" — better to fail loudly than silently coerce.
441            (other, _) => {
442                let s = match other {
443                    JsonParam::Bool(b) => b.to_string(),
444                    JsonParam::Int(n) => n.to_string(),
445                    JsonParam::Float(f) => f.to_string(),
446                    JsonParam::Text(s) => s.clone(),
447                    JsonParam::Null => unreachable!(),
448                };
449                s.to_sql(ty, out)
450            }
451        }
452    }
453
454    fn accepts(_ty: &postgres::types::Type) -> bool {
455        // Defer per-variant acceptance to to_sql_checked, which dispatches
456        // to the inner type's ToSql impl. Returning `true` here matches
457        // the postgres crate's recommended pattern for sum-type wrappers.
458        true
459    }
460
461    postgres::types::to_sql_checked!();
462}
463
464/// Build an INSERT SQL statement and collect typed parameter values.
465/// Returns `(sql, params)` where `params[0]` is the generated ID
466/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
467/// value so the postgres driver can bind them to typed columns
468/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
469/// the database as SQL NULL — the previous string-collapsing path stored
470/// `""` for nullable FKs and broke any non-text column.
471pub fn build_insert_sql(
472    entity: &str,
473    data: &serde_json::Value,
474) -> Result<(String, Vec<JsonParam>), StorageError> {
475    let id = generate_id();
476    let obj = data.as_object().ok_or_else(|| StorageError {
477        code: "PG_INVALID_DATA".into(),
478        message: "Insert data must be a JSON object".into(),
479    })?;
480
481    let mut col_names = vec!["id".to_string()];
482    let mut placeholders = vec!["$1".to_string()];
483    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
484
485    for (i, (key, val)) in obj.iter().enumerate() {
486        col_names.push(quote_ident(key));
487        placeholders.push(format!("${}", i + 2));
488        values.push(JsonParam::from_json(val));
489    }
490
491    let sql = format!(
492        "INSERT INTO {} ({}) VALUES ({})",
493        quote_ident(entity),
494        col_names.join(", "),
495        placeholders.join(", ")
496    );
497
498    Ok((sql, values))
499}
500
501/// Build an UPDATE SQL statement and collect typed parameter values.
502/// Returns `(sql, params)` where `params[0]` is the row ID.
503pub fn build_update_sql(
504    entity: &str,
505    id: &str,
506    data: &serde_json::Value,
507) -> Result<(String, Vec<JsonParam>), StorageError> {
508    let obj = data.as_object().ok_or_else(|| StorageError {
509        code: "PG_INVALID_DATA".into(),
510        message: "Update data must be a JSON object".into(),
511    })?;
512
513    if obj.is_empty() {
514        return Err(StorageError {
515            code: "PG_INVALID_DATA".into(),
516            message: "Update data must contain at least one field".into(),
517        });
518    }
519
520    let mut set_clauses = Vec::new();
521    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
522
523    for (i, (key, val)) in obj.iter().enumerate() {
524        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
525        values.push(JsonParam::from_json(val));
526    }
527
528    let sql = format!(
529        "UPDATE {} SET {} WHERE id = $1",
530        quote_ident(entity),
531        set_clauses.join(", ")
532    );
533
534    Ok((sql, values))
535}
536
537/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
538/// at insert/update/transact call sites. The postgres driver wants a
539/// slice of trait objects; this avoids repeating the same map at each site.
540#[cfg(feature = "postgres-live")]
541fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
542    values
543        .iter()
544        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
545        .collect()
546}
547
548// ---------------------------------------------------------------------------
549// Live Postgres adapter (requires "postgres-live" feature)
550// ---------------------------------------------------------------------------
551
552#[cfg(feature = "postgres-live")]
553pub mod live {
554    use super::*;
555    use crate::{
556        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
557    };
558
559    /// A live Postgres adapter with a real database connection.
560    pub struct LivePostgresAdapter {
561        client: postgres::Client,
562    }
563
564    impl LivePostgresAdapter {
565        /// Connect to a Postgres database.
566        pub fn connect(url: &str) -> Result<Self, StorageError> {
567            let client =
568                postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
569                    code: "PG_CONNECT_FAILED".into(),
570                    message: format!("Failed to connect to Postgres: {e}"),
571                })?;
572            Ok(Self { client })
573        }
574
575        /// Read the current schema from the live database.
576        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
577            let table_rows = self
578                .client
579                .query(INTROSPECT_TABLES_SQL, &[])
580                .map_err(pg_err)?;
581
582            let mut tables = Vec::new();
583            for row in &table_rows {
584                let table_name: String = row.get(0);
585                let columns = self.read_columns(&table_name)?;
586                let indexes = self.read_indexes(&table_name)?;
587                tables.push(TableSnapshot {
588                    name: table_name,
589                    columns,
590                    indexes,
591                });
592            }
593
594            Ok(SchemaSnapshot { tables })
595        }
596
597        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
598            let rows = self
599                .client
600                .query(INTROSPECT_COLUMNS_SQL, &[&table])
601                .map_err(pg_err)?;
602
603            let mut columns = Vec::new();
604            for row in &rows {
605                let name: String = row.get(0);
606                let data_type: String = row.get(1);
607                let is_nullable: String = row.get(2);
608                let is_pk: i64 = row.get(3);
609                columns.push(ColumnSnapshot {
610                    name,
611                    column_type: data_type,
612                    notnull: is_nullable == "NO",
613                    primary_key: is_pk > 0,
614                });
615            }
616            Ok(columns)
617        }
618
619        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
620            let rows = self
621                .client
622                .query(INTROSPECT_INDEXES_SQL, &[&table])
623                .map_err(pg_err)?;
624
625            let mut indexes = Vec::new();
626            for row in &rows {
627                let name: String = row.get(0);
628                let unique: bool = row.get(1);
629                let columns: Vec<String> = row.get(2);
630                indexes.push(IndexSnapshot {
631                    name,
632                    columns,
633                    unique,
634                });
635            }
636            Ok(indexes)
637        }
638
639        /// Plan from live database state.
640        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
641            let snapshot = self.read_schema()?;
642            Ok(crate::plan_from_snapshot(&snapshot, target))
643        }
644    }
645
646    impl StorageAdapter for LivePostgresAdapter {
647        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
648            Err(StorageError {
649                code: "PG_PLAN_NEEDS_MUTABLE".into(),
650                message: "Use plan_from_live() instead for live Postgres planning".into(),
651            })
652        }
653
654        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
655            Err(StorageError {
656                code: "PG_APPLY_USE_METHOD".into(),
657                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
658            })
659        }
660    }
661
662    impl LivePostgresAdapter {
663        /// Apply a schema plan to the live database.
664        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
665            let statements = plan_to_sql(plan)?;
666            for sql in &statements {
667                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
668            }
669            Ok(())
670        }
671
672        /// Execute a raw SQL statement against the live database. Used by
673        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
674        /// production code should go through `apply_plan` so changes are
675        /// represented in the migration history. Returns the number of
676        /// rows affected.
677        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
678            self.client.execute(sql, &[]).map_err(pg_err)
679        }
680
681        /// Insert a row. Returns the generated ID.
682        pub fn insert(
683            &mut self,
684            entity: &str,
685            data: &serde_json::Value,
686        ) -> Result<String, StorageError> {
687            let (sql, values) = build_insert_sql(entity, data)?;
688            // The first param is always the generated ID — extract it before
689            // we hand `values` off to the postgres driver as borrowed slices.
690            let id = match &values[0] {
691                JsonParam::Text(s) => s.clone(),
692                _ => {
693                    return Err(StorageError {
694                        code: "PG_INTERNAL".into(),
695                        message: "build_insert_sql produced non-text id param".into(),
696                    });
697                }
698            };
699            let params = as_pg_params(&values);
700            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
701            Ok(id)
702        }
703
704        /// Get a row by ID.
705        pub fn get_by_id(
706            &mut self,
707            entity: &str,
708            id: &str,
709        ) -> Result<Option<serde_json::Value>, StorageError> {
710            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
711            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
712
713            match rows.first() {
714                Some(row) => Ok(Some(row_to_json(row))),
715                None => Ok(None),
716            }
717        }
718
719        /// List all rows from an entity.
720        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
721            let sql = format!("SELECT * FROM {}", quote_ident(entity));
722            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
723
724            Ok(rows.iter().map(row_to_json).collect())
725        }
726
727        /// Cursor-paginated list. `after` is the last `id` from the previous
728        /// page; the result contains rows with `id > after` (lex order),
729        /// limited to `limit`. Used for sync push/pull.
730        pub fn list_after(
731            &mut self,
732            entity: &str,
733            after: Option<&str>,
734            limit: usize,
735        ) -> Result<Vec<serde_json::Value>, StorageError> {
736            // Cap limit at a sensible upper bound so a malicious client can't
737            // stream the whole table by passing limit=u64::MAX.
738            let capped: i64 = limit.min(10_000) as i64;
739            let sql = match after {
740                Some(_) => format!(
741                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
742                    quote_ident(entity)
743                ),
744                None => format!(
745                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
746                    quote_ident(entity)
747                ),
748            };
749            let rows = match after {
750                Some(cursor) => self
751                    .client
752                    .query(sql.as_str(), &[&cursor, &capped])
753                    .map_err(pg_err)?,
754                None => self
755                    .client
756                    .query(sql.as_str(), &[&capped])
757                    .map_err(pg_err)?,
758            };
759            Ok(rows.iter().map(row_to_json).collect())
760        }
761
762        /// Update a row by ID. Returns true if the row was found and updated.
763        pub fn update(
764            &mut self,
765            entity: &str,
766            id: &str,
767            data: &serde_json::Value,
768        ) -> Result<bool, StorageError> {
769            let (sql, values) = build_update_sql(entity, id, data)?;
770            let params = as_pg_params(&values);
771            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
772            Ok(rows_affected > 0)
773        }
774
775        /// Delete a row by ID. Returns true if the row was found and deleted.
776        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
777            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
778            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
779            Ok(rows_affected > 0)
780        }
781
782        /// Look up a row by `field = value`. Caller must validate `field`
783        /// against the manifest before calling — we still `quote_ident` it
784        /// but won't catch a typo against the entity definition.
785        pub fn lookup_field(
786            &mut self,
787            entity: &str,
788            field: &str,
789            value: &str,
790        ) -> Result<Option<serde_json::Value>, StorageError> {
791            let sql = format!(
792                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
793                quote_ident(entity),
794                quote_ident(field),
795            );
796            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
797            Ok(rows.first().map(row_to_json))
798        }
799
800        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
801        ///
802        /// Supported operators (parity with the SQLite path):
803        /// - Equality (`field: value`)
804        /// - `$not`: emits `field != value`
805        /// - `$gt` / `$gte` / `$lt` / `$lte`
806        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
807        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
808        ///   if/when the SQLite side adds it)
809        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
810        ///
811        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
812        ///
813        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
814        /// would need a tsvector column or a generic ILIKE OR-fold across
815        /// every text field, neither of which is wired up yet. Returns
816        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
817        /// receiving silently-broad results.
818        ///
819        /// Anything else is silently ignored (matches the in-memory fallback's
820        /// permissive behavior). Field names are validated against `valid_columns`
821        /// to prevent SQL injection — pass the entity's column set.
822        pub fn query_filtered(
823            &mut self,
824            entity: &str,
825            filter: &serde_json::Value,
826            valid_columns: &[String],
827        ) -> Result<Vec<serde_json::Value>, StorageError> {
828            let empty = serde_json::Map::new();
829            let obj = filter.as_object().unwrap_or(&empty);
830
831            let validate = |col: &str| -> Result<(), StorageError> {
832                if col == "id" || valid_columns.iter().any(|c| c == col) {
833                    Ok(())
834                } else {
835                    Err(StorageError {
836                        code: "UNKNOWN_COLUMN".into(),
837                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
838                    })
839                }
840            };
841
842            let mut where_clauses: Vec<String> = Vec::new();
843            let mut order_clause = String::new();
844            let mut limit_clause = String::new();
845            let mut offset_clause = String::new();
846            // Collect (col, op, value) so placeholder numbers can be assigned
847            // in a single materialization pass after the parse loop. Values
848            // are now JsonParam (typed) instead of String — see `value_to_pg`.
849            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
850
851            for (key, val) in obj {
852                match key.as_str() {
853                    "$search" => {
854                        // FTS5 (SQLite) has no portable equivalent in
855                        // Postgres without an explicit tsvector column.
856                        // Return a clear error rather than silently
857                        // ignoring the operator — callers that hit this
858                        // need to either branch on backend or define a
859                        // tsvector + GIN index in their schema.
860                        return Err(StorageError {
861                            code: "SEARCH_NOT_SUPPORTED".into(),
862                            message: "$search is SQLite-FTS5-only; use a Postgres tsvector column with the storage adapter's full-text path"
863                                .into(),
864                        });
865                    }
866                    "$order" => {
867                        if let Some(ord) = val.as_object() {
868                            let mut parts = Vec::new();
869                            for (col, dir) in ord {
870                                validate(col)?;
871                                let d = match dir.as_str().unwrap_or("asc") {
872                                    "desc" | "DESC" => "DESC",
873                                    _ => "ASC",
874                                };
875                                parts.push(format!("{} {d}", quote_ident(col)));
876                            }
877                            if !parts.is_empty() {
878                                order_clause = format!(" ORDER BY {}", parts.join(", "));
879                            }
880                        }
881                    }
882                    "$limit" => {
883                        if let Some(n) = val.as_u64() {
884                            limit_clause = format!(" LIMIT {}", n);
885                        }
886                    }
887                    "$offset" => {
888                        if let Some(n) = val.as_u64() {
889                            offset_clause = format!(" OFFSET {}", n);
890                        }
891                    }
892                    field => {
893                        validate(field)?;
894                        match val {
895                            serde_json::Value::Object(ops) => {
896                                for (op, v) in ops {
897                                    match op.as_str() {
898                                        "$not" => planned.push((
899                                            field.into(),
900                                            "!=".into(),
901                                            value_to_pg(v),
902                                        )),
903                                        "$gt" => {
904                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
905                                        }
906                                        "$gte" => planned.push((
907                                            field.into(),
908                                            ">=".into(),
909                                            value_to_pg(v),
910                                        )),
911                                        "$lt" => {
912                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
913                                        }
914                                        "$lte" => planned.push((
915                                            field.into(),
916                                            "<=".into(),
917                                            value_to_pg(v),
918                                        )),
919                                        "$like" => planned.push((
920                                            field.into(),
921                                            "LIKE".into(),
922                                            value_to_pg(v),
923                                        )),
924                                        "$in" => {
925                                            if let Some(arr) = v.as_array() {
926                                                let placeholders: Vec<String> = (0..arr.len())
927                                                    .map(|i| format!("${}", planned.len() + 1 + i))
928                                                    .collect();
929                                                where_clauses.push(format!(
930                                                    "{} IN ({})",
931                                                    quote_ident(field),
932                                                    placeholders.join(", "),
933                                                ));
934                                                for x in arr {
935                                                    planned.push((
936                                                        format!("__inline_{}", planned.len()),
937                                                        "__INLINE__".into(),
938                                                        value_to_pg(x),
939                                                    ));
940                                                }
941                                            }
942                                        }
943                                        _ => {}
944                                    }
945                                }
946                            }
947                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
948                        }
949                    }
950                }
951            }
952
953            // Materialize planned -> SQL + params.
954            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
955            for (field, op, v) in &planned {
956                if op == "__INLINE__" {
957                    // Already emitted via the IN-clause path; just push the value.
958                    params.push(v.clone());
959                } else {
960                    let placeholder = format!("${}", params.len() + 1);
961                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
962                    params.push(v.clone());
963                }
964            }
965
966            let where_sql = if where_clauses.is_empty() {
967                String::new()
968            } else {
969                format!(" WHERE {}", where_clauses.join(" AND "))
970            };
971            let sql = format!(
972                "SELECT * FROM {}{}{}{}{}",
973                quote_ident(entity),
974                where_sql,
975                order_clause,
976                limit_clause,
977                offset_clause,
978            );
979
980            let pg_params = as_pg_params(&params);
981            let rows = self
982                .client
983                .query(sql.as_str(), &pg_params)
984                .map_err(pg_err)?;
985            Ok(rows.iter().map(row_to_json).collect())
986        }
987
988        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
989        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
990        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
991        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
992        /// via `date_trunc`), and a flat-equality `where` filter.
993        ///
994        /// Spec format (same JSON shape used by the SQLite path):
995        /// ```json
996        /// { "count": "*",
997        ///   "sum": ["amount"],
998        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
999        ///   "where": {"status": "paid"} }
1000        /// ```
1001        ///
1002        /// `valid_columns` is used to validate every field name before it's
1003        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
1004        /// `DataStore` impl in this crate) supplies the entity's column set
1005        /// from the manifest.
1006        pub fn aggregate(
1007            &mut self,
1008            entity: &str,
1009            spec: &serde_json::Value,
1010            valid_columns: &[String],
1011        ) -> Result<serde_json::Value, StorageError> {
1012            let obj = spec.as_object().ok_or_else(|| StorageError {
1013                code: "INVALID_QUERY".into(),
1014                message: "aggregate spec must be a JSON object".into(),
1015            })?;
1016
1017            let validate = |col: &str| -> Result<(), StorageError> {
1018                if col == "id" || valid_columns.iter().any(|c| c == col) {
1019                    Ok(())
1020                } else {
1021                    Err(StorageError {
1022                        code: "UNKNOWN_COLUMN".into(),
1023                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1024                    })
1025                }
1026            };
1027
1028            let mut select_parts: Vec<String> = Vec::new();
1029            let mut result_fields: Vec<String> = Vec::new();
1030
1031            if let Some(count) = obj.get("count") {
1032                match count {
1033                    serde_json::Value::String(s) if s == "*" => {
1034                        select_parts.push("COUNT(*) AS count".into());
1035                        result_fields.push("count".into());
1036                    }
1037                    serde_json::Value::String(field) => {
1038                        validate(field)?;
1039                        let alias = format!("count_{field}");
1040                        select_parts.push(format!(
1041                            "COUNT({}) AS {}",
1042                            quote_ident(field),
1043                            quote_ident(&alias),
1044                        ));
1045                        result_fields.push(alias);
1046                    }
1047                    _ => {}
1048                }
1049            }
1050
1051            for (fn_name, prefix) in [
1052                ("sum", "sum_"),
1053                ("avg", "avg_"),
1054                ("min", "min_"),
1055                ("max", "max_"),
1056            ] {
1057                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1058                    for field in fields {
1059                        if let Some(f) = field.as_str() {
1060                            validate(f)?;
1061                            let alias = format!("{prefix}{f}");
1062                            let sql_fn = fn_name.to_uppercase();
1063                            select_parts.push(format!(
1064                                "{}({}) AS {}",
1065                                sql_fn,
1066                                quote_ident(f),
1067                                quote_ident(&alias),
1068                            ));
1069                            result_fields.push(alias);
1070                        }
1071                    }
1072                }
1073            }
1074
1075            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1076                for field in fields {
1077                    if let Some(f) = field.as_str() {
1078                        validate(f)?;
1079                        let alias = format!("count_distinct_{f}");
1080                        select_parts.push(format!(
1081                            "COUNT(DISTINCT {}) AS {}",
1082                            quote_ident(f),
1083                            quote_ident(&alias),
1084                        ));
1085                        result_fields.push(alias);
1086                    }
1087                }
1088            }
1089
1090            // groupBy: column name or { field, bucket } — same vocabulary as
1091            // the SQLite path. Buckets translate to Postgres `date_trunc`
1092            // (SQLite uses `strftime`); both collapse rows to the bucket
1093            // boundary identically.
1094            let mut group_by: Vec<String> = Vec::new();
1095            let mut group_select: Vec<String> = Vec::new();
1096            let mut group_field_names: Vec<String> = Vec::new();
1097            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1098                for g in groups {
1099                    if let Some(f) = g.as_str() {
1100                        validate(f)?;
1101                        let q = quote_ident(f);
1102                        group_by.push(q.clone());
1103                        group_select.push(q);
1104                        group_field_names.push(f.to_string());
1105                    } else if let Some(spec) = g.as_object() {
1106                        let field =
1107                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1108                                StorageError {
1109                                    code: "INVALID_QUERY".into(),
1110                                    message: "groupBy object spec requires `field`".into(),
1111                                }
1112                            })?;
1113                        validate(field)?;
1114                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1115                        let trunc_unit = match bucket {
1116                            "hour" | "day" | "week" | "month" | "year" => bucket,
1117                            _ => {
1118                                return Err(StorageError {
1119                                    code: "INVALID_QUERY".into(),
1120                                    message: format!(
1121                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1122                                    ),
1123                                });
1124                            }
1125                        };
1126                        let alias = format!("{field}_{bucket}");
1127                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1128                        group_by.push(expr.clone());
1129                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1130                        group_field_names.push(alias);
1131                    }
1132                }
1133            }
1134
1135            let mut full_select = group_select.clone();
1136            full_select.extend(select_parts.iter().cloned());
1137            if full_select.is_empty() {
1138                return Err(StorageError {
1139                    code: "INVALID_QUERY".into(),
1140                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1141                });
1142            }
1143
1144            let mut where_clauses: Vec<String> = Vec::new();
1145            let mut params: Vec<JsonParam> = Vec::new();
1146            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1147                for (k, v) in w {
1148                    validate(k)?;
1149                    let placeholder = format!("${}", params.len() + 1);
1150                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1151                    params.push(value_to_pg(v));
1152                }
1153            }
1154            let where_sql = if where_clauses.is_empty() {
1155                String::new()
1156            } else {
1157                format!(" WHERE {}", where_clauses.join(" AND "))
1158            };
1159            let group_sql = if group_by.is_empty() {
1160                String::new()
1161            } else {
1162                format!(" GROUP BY {}", group_by.join(", "))
1163            };
1164
1165            let sql = format!(
1166                "SELECT {} FROM {}{}{}",
1167                full_select.join(", "),
1168                quote_ident(entity),
1169                where_sql,
1170                group_sql,
1171            );
1172
1173            let pg_params = as_pg_params(&params);
1174            let rows = self
1175                .client
1176                .query(sql.as_str(), &pg_params)
1177                .map_err(pg_err)?;
1178
1179            let column_names: Vec<String> = group_field_names
1180                .iter()
1181                .chain(result_fields.iter())
1182                .cloned()
1183                .collect();
1184
1185            let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1186            for row in &rows {
1187                let row_json = row_to_json(row);
1188                if let serde_json::Value::Object(map) = &row_json {
1189                    let mut filtered = serde_json::Map::new();
1190                    for name in &column_names {
1191                        if let Some(v) = map.get(name) {
1192                            filtered.insert(name.clone(), v.clone());
1193                        }
1194                    }
1195                    out.push(serde_json::Value::Object(filtered));
1196                } else {
1197                    out.push(row_json);
1198                }
1199            }
1200            Ok(serde_json::json!({ "rows": out }))
1201        }
1202    }
1203
1204    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1205    pub enum TxOp<'a> {
1206        Insert {
1207            entity: &'a str,
1208            data: &'a serde_json::Value,
1209        },
1210        Update {
1211            entity: &'a str,
1212            id: &'a str,
1213            data: &'a serde_json::Value,
1214        },
1215        Delete {
1216            entity: &'a str,
1217            id: &'a str,
1218        },
1219    }
1220
1221    /// Result of a single op inside a transaction.
1222    #[derive(Debug, Clone)]
1223    pub enum TxResult {
1224        Inserted(String),
1225        Updated(bool),
1226        Deleted(bool),
1227    }
1228
1229    impl LivePostgresAdapter {
1230        /// Run `ops` inside a single Postgres transaction. Either all of them
1231        /// commit together or none of them do — there is no partial state on
1232        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1233        /// guard is dropped without `commit()` being called.
1234        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1235            let mut tx = self.client.transaction().map_err(pg_err)?;
1236            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1237
1238            for op in ops {
1239                match op {
1240                    TxOp::Insert { entity, data } => {
1241                        let (sql, values) = build_insert_sql(entity, data)?;
1242                        let id = match &values[0] {
1243                            JsonParam::Text(s) => s.clone(),
1244                            _ => {
1245                                return Err(StorageError {
1246                                    code: "PG_INTERNAL".into(),
1247                                    message: "build_insert_sql produced non-text id param".into(),
1248                                });
1249                            }
1250                        };
1251                        let params = as_pg_params(&values);
1252                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1253                        results.push(TxResult::Inserted(id));
1254                    }
1255                    TxOp::Update { entity, id, data } => {
1256                        let (sql, values) = build_update_sql(entity, id, data)?;
1257                        let params = as_pg_params(&values);
1258                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1259                        results.push(TxResult::Updated(n > 0));
1260                    }
1261                    TxOp::Delete { entity, id } => {
1262                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1263                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1264                        results.push(TxResult::Deleted(n > 0));
1265                    }
1266                }
1267            }
1268
1269            tx.commit().map_err(pg_err)?;
1270            Ok(results)
1271        }
1272    }
1273
1274    /// Lift a JSON value into a typed Postgres parameter. The previous
1275    /// implementation collapsed everything to `String`, which silently
1276    /// stringified ints/bools and turned JSON `null` into `""` for
1277    /// nullable columns. Forwarding through `JsonParam` keeps the column
1278    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1279    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1280        JsonParam::from_json(v)
1281    }
1282
1283    fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1284        use postgres::types::Type;
1285        let mut obj = serde_json::Map::new();
1286        for (i, col) in row.columns().iter().enumerate() {
1287            let name = col.name().to_string();
1288
1289            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1290            // and a panic in a query handler poisons the connection mutex,
1291            // taking down all subsequent reads on this datastore. Anything
1292            // that fails to decode becomes Null with a one-shot warning.
1293            //
1294            // Timestamps and the catch-all path explicitly DON'T request
1295            // `String` — the postgres crate uses binary protocol by default
1296            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1297            // ask for `Vec<u8>` and lossy-stringify, which works for all
1298            // text-shaped columns in either protocol.
1299            let value: serde_json::Value = match *col.type_() {
1300                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1301                    .flatten()
1302                    .map(serde_json::Value::Bool)
1303                    .unwrap_or(serde_json::Value::Null),
1304                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1305                    .flatten()
1306                    .map(|v| serde_json::Value::Number(v.into()))
1307                    .unwrap_or(serde_json::Value::Null),
1308                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1309                    .flatten()
1310                    .map(|v| serde_json::Value::Number(v.into()))
1311                    .unwrap_or(serde_json::Value::Null),
1312                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1313                    .flatten()
1314                    .map(|v| serde_json::Value::Number(v.into()))
1315                    .unwrap_or(serde_json::Value::Null),
1316                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1317                    .flatten()
1318                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1319                    .map(serde_json::Value::Number)
1320                    .unwrap_or(serde_json::Value::Null),
1321                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1322                    .flatten()
1323                    .and_then(serde_json::Number::from_f64)
1324                    .map(serde_json::Value::Number)
1325                    .unwrap_or(serde_json::Value::Null),
1326                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1327                    .flatten()
1328                    .unwrap_or(serde_json::Value::Null),
1329                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1330                    .flatten()
1331                    .map(|b| serde_json::Value::String(b64(&b)))
1332                    .unwrap_or(serde_json::Value::Null),
1333                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1334                    try_get_or_null::<Option<String>>(row, i)
1335                        .flatten()
1336                        .map(serde_json::Value::String)
1337                        .unwrap_or(serde_json::Value::Null)
1338                }
1339                Type::TIMESTAMPTZ => {
1340                    // Decode via chrono::DateTime<Utc> (postgres's
1341                    // `with-chrono-0_4` feature provides FromSql) and
1342                    // re-format as ISO 8601 — the shape pylon's clients
1343                    // expect (matches `pylon_kernel::util::now_iso`,
1344                    // so timestamps round-trip with the same surface
1345                    // across SQLite + PG).
1346                    try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1347                        .flatten()
1348                        .map(|dt| {
1349                            serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1350                        })
1351                        .unwrap_or(serde_json::Value::Null)
1352                }
1353                Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1354                    .flatten()
1355                    .map(|dt| {
1356                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1357                    })
1358                    .unwrap_or(serde_json::Value::Null),
1359                Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1360                    .flatten()
1361                    .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1362                    .unwrap_or(serde_json::Value::Null),
1363                _ => {
1364                    // Last resort: ask Postgres to render anything else as
1365                    // text via a stringifying decode through Vec<u8>. If even
1366                    // that fails (rare — Postgres types not implementing the
1367                    // text format), fall through to Null with a warning.
1368                    match row.try_get::<_, Option<String>>(i) {
1369                        Ok(Some(s)) => serde_json::Value::String(s),
1370                        Ok(None) => serde_json::Value::Null,
1371                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1372                            Ok(Some(bytes)) => serde_json::Value::String(
1373                                String::from_utf8_lossy(&bytes).into_owned(),
1374                            ),
1375                            _ => serde_json::Value::Null,
1376                        },
1377                    }
1378                }
1379            };
1380            obj.insert(name, value);
1381        }
1382        serde_json::Value::Object(obj)
1383    }
1384
1385    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1386    where
1387        T: postgres::types::FromSql<'a>,
1388    {
1389        match row.try_get::<_, T>(i) {
1390            Ok(v) => Some(v),
1391            Err(e) => {
1392                tracing::warn!(
1393                    "[postgres] decode failed for column {} ({}): {e}",
1394                    i,
1395                    row.columns()[i].name()
1396                );
1397                None
1398            }
1399        }
1400    }
1401
1402    /// Minimal base64 encoder so we don't need another dependency just for
1403    /// the BYTEA column edge case.
1404    fn b64(bytes: &[u8]) -> String {
1405        const TABLE: &[u8; 64] =
1406            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1407        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1408        let chunks = bytes.chunks(3);
1409        for chunk in chunks {
1410            let b = [
1411                chunk.first().copied().unwrap_or(0),
1412                chunk.get(1).copied().unwrap_or(0),
1413                chunk.get(2).copied().unwrap_or(0),
1414            ];
1415            out.push(TABLE[(b[0] >> 2) as usize] as char);
1416            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1417            if chunk.len() > 1 {
1418                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1419            } else {
1420                out.push('=');
1421            }
1422            if chunk.len() > 2 {
1423                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1424            } else {
1425                out.push('=');
1426            }
1427        }
1428        out
1429    }
1430
1431    fn pg_err(e: postgres::Error) -> StorageError {
1432        // postgres::Error's Display is intentionally short ("db error",
1433        // "connection error" etc.) — the actual SQLSTATE / detail lives
1434        // on the source chain. Walk the chain so the final message has
1435        // enough signal to debug a failed insert/update without
1436        // attaching a debugger.
1437        use std::error::Error;
1438        let mut detail = format!("{e}");
1439        let mut src: Option<&dyn Error> = e.source();
1440        while let Some(s) = src {
1441            detail.push_str(": ");
1442            detail.push_str(&format!("{s}"));
1443            src = s.source();
1444        }
1445        StorageError {
1446            code: "PG_QUERY_FAILED".into(),
1447            message: format!("Postgres query failed: {detail}"),
1448        }
1449    }
1450}
1451
1452// ---------------------------------------------------------------------------
1453// Tests
1454// ---------------------------------------------------------------------------
1455
1456#[cfg(test)]
1457mod tests {
1458    use super::*;
1459
1460    /// Hand-rolled fixture that matches the snapshots in the tests
1461    /// below. Decoupled from any example's `pylon.manifest.json` so
1462    /// changing an example schema doesn't bleed into adapter tests.
1463    fn test_manifest() -> AppManifest {
1464        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1465        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1466            name: name.into(),
1467            field_type: ty.into(),
1468            optional: opt,
1469            unique: uniq,
1470            crdt: None,
1471        };
1472        AppManifest {
1473            manifest_version: 1,
1474            name: "test".into(),
1475            version: "0.0.0".into(),
1476            entities: vec![
1477                ManifestEntity {
1478                    name: "User".into(),
1479                    fields: vec![
1480                        f("email", "string", false, true),
1481                        f("displayName", "string", false, false),
1482                        f("createdAt", "datetime", false, false),
1483                    ],
1484                    indexes: vec![],
1485                    relations: vec![],
1486                    search: None,
1487                    crdt: true,
1488                },
1489                ManifestEntity {
1490                    name: "Todo".into(),
1491                    fields: vec![
1492                        f("title", "string", false, false),
1493                        f("done", "bool", false, false),
1494                        f("userId", "id(User)", false, false),
1495                        f("createdAt", "datetime", false, false),
1496                    ],
1497                    indexes: vec![ManifestIndex {
1498                        name: "by_user".into(),
1499                        fields: vec!["userId".into()],
1500                        unique: false,
1501                    }],
1502                    relations: vec![],
1503                    search: None,
1504                    crdt: true,
1505                },
1506            ],
1507            queries: vec![],
1508            actions: vec![],
1509            policies: vec![],
1510            routes: vec![],
1511        }
1512    }
1513
1514    #[test]
1515    fn pg_type_mapping() {
1516        assert_eq!(pg_column_type("string"), "TEXT");
1517        assert_eq!(pg_column_type("int"), "INTEGER");
1518        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1519        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1520        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1521        assert_eq!(pg_column_type("richtext"), "TEXT");
1522        assert_eq!(pg_column_type("id(User)"), "TEXT");
1523    }
1524
1525    #[test]
1526    fn quote_ident_simple() {
1527        assert_eq!(quote_ident("User"), "\"User\"");
1528        assert_eq!(quote_ident("email"), "\"email\"");
1529    }
1530
1531    #[test]
1532    fn quote_ident_escapes_embedded_double_quotes() {
1533        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1534        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1535    }
1536
1537    #[test]
1538    fn create_table_sql_basic() {
1539        let fields = vec![
1540            FieldSpec {
1541                name: "email".into(),
1542                field_type: "string".into(),
1543                optional: false,
1544                unique: true,
1545            },
1546            FieldSpec {
1547                name: "age".into(),
1548                field_type: "int".into(),
1549                optional: true,
1550                unique: false,
1551            },
1552        ];
1553        let sql = create_table_sql("User", &fields);
1554        assert_eq!(
1555            sql,
1556            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1557        );
1558    }
1559
1560    #[test]
1561    fn create_table_sql_escapes_identifiers() {
1562        let fields = vec![FieldSpec {
1563            name: "col\"x".into(),
1564            field_type: "string".into(),
1565            optional: false,
1566            unique: false,
1567        }];
1568        let sql = create_table_sql("my\"table", &fields);
1569        assert!(sql.contains("\"my\"\"table\""));
1570        assert!(sql.contains("\"col\"\"x\""));
1571    }
1572
1573    #[test]
1574    fn create_index_sql_unique() {
1575        let sql = create_index_sql("User", "by_email", &["email".into()], true);
1576        assert_eq!(
1577            sql,
1578            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1579        );
1580    }
1581
1582    #[test]
1583    fn create_index_sql_non_unique() {
1584        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1585        assert_eq!(
1586            sql,
1587            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1588        );
1589    }
1590
1591    #[test]
1592    fn add_column_sql_basic() {
1593        let field = FieldSpec {
1594            name: "bio".into(),
1595            field_type: "string".into(),
1596            optional: true,
1597            unique: false,
1598        };
1599        let sql = add_column_sql("User", &field);
1600        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1601    }
1602
1603    #[test]
1604    fn plan_from_manifest() {
1605        let adapter = PostgresAdapter;
1606        let manifest = test_manifest();
1607        let plan = adapter.plan_schema(&manifest).unwrap();
1608
1609        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
1610        assert!(plan.operations.iter().any(|op| matches!(
1611            op,
1612            SchemaOperation::CreateEntity { name, .. } if name == "User"
1613        )));
1614        assert!(plan.operations.iter().any(|op| matches!(
1615            op,
1616            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1617        )));
1618        assert!(plan.operations.iter().any(|op| matches!(
1619            op,
1620            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1621        )));
1622    }
1623
1624    #[test]
1625    fn plan_to_sql_produces_statements() {
1626        let adapter = PostgresAdapter;
1627        let manifest = test_manifest();
1628        let plan = adapter.plan_schema(&manifest).unwrap();
1629        let stmts = plan_to_sql(&plan).unwrap();
1630
1631        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
1632        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
1633        // declares a unique by_email index on User which lands as part of
1634        // the table. Final count: 2 tables + 2 indexes.
1635        let create_tables = stmts
1636            .iter()
1637            .filter(|s| s.starts_with("CREATE TABLE"))
1638            .count();
1639        let create_indexes = stmts
1640            .iter()
1641            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1642            .count();
1643        assert_eq!(create_tables, 2);
1644        assert!(create_indexes >= 1);
1645        assert!(stmts[0].starts_with("CREATE TABLE"));
1646        assert!(stmts[1].starts_with("CREATE TABLE"));
1647    }
1648
1649    #[test]
1650    fn plan_to_sql_rejects_unsupported() {
1651        let plan = SchemaPlan {
1652            operations: vec![SchemaOperation::RemoveEntity {
1653                name: "User".into(),
1654            }],
1655        };
1656        let result = plan_to_sql(&plan);
1657        assert!(result.is_err());
1658        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1659    }
1660
1661    #[test]
1662    fn apply_not_implemented() {
1663        let adapter = PostgresAdapter;
1664        let plan = SchemaPlan {
1665            operations: vec![SchemaOperation::Noop],
1666        };
1667        let result = adapter.apply_schema(&plan);
1668        assert!(result.is_err());
1669        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1670    }
1671
1672    #[test]
1673    fn sql_uses_quoted_identifiers() {
1674        let fields = vec![FieldSpec {
1675            name: "createdAt".into(),
1676            field_type: "datetime".into(),
1677            optional: false,
1678            unique: false,
1679        }];
1680        let sql = create_table_sql("User", &fields);
1681        // Postgres identifiers should be quoted for case-sensitivity.
1682        assert!(sql.contains("\"User\""));
1683        assert!(sql.contains("\"createdAt\""));
1684        assert!(sql.contains("TIMESTAMPTZ"));
1685    }
1686
1687    // -- Introspection SQL tests --
1688
1689    #[test]
1690    fn introspect_sql_constants_are_valid() {
1691        // Sanity checks that the SQL strings exist and look reasonable.
1692        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1693        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1694        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1695        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1696    }
1697
1698    // -- Plan from snapshot tests --
1699
1700    #[test]
1701    fn plan_from_empty_snapshot_creates_all() {
1702        let snapshot = crate::SchemaSnapshot { tables: vec![] };
1703        let manifest = test_manifest();
1704        let plan = plan_from_snapshot(&snapshot, &manifest);
1705
1706        assert!(plan.operations.iter().any(|op| matches!(
1707            op,
1708            SchemaOperation::CreateEntity { name, .. } if name == "User"
1709        )));
1710        assert!(plan.operations.iter().any(|op| matches!(
1711            op,
1712            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1713        )));
1714        assert!(plan.operations.iter().any(|op| matches!(
1715            op,
1716            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1717        )));
1718    }
1719
1720    #[test]
1721    fn plan_from_full_snapshot_is_noop() {
1722        let snapshot = crate::SchemaSnapshot {
1723            tables: vec![
1724                crate::TableSnapshot {
1725                    name: "User".into(),
1726                    columns: vec![
1727                        crate::ColumnSnapshot {
1728                            name: "id".into(),
1729                            column_type: "TEXT".into(),
1730                            notnull: true,
1731                            primary_key: true,
1732                        },
1733                        crate::ColumnSnapshot {
1734                            name: "email".into(),
1735                            column_type: "TEXT".into(),
1736                            notnull: true,
1737                            primary_key: false,
1738                        },
1739                        crate::ColumnSnapshot {
1740                            name: "displayName".into(),
1741                            column_type: "TEXT".into(),
1742                            notnull: true,
1743                            primary_key: false,
1744                        },
1745                        crate::ColumnSnapshot {
1746                            name: "createdAt".into(),
1747                            column_type: "TIMESTAMPTZ".into(),
1748                            notnull: true,
1749                            primary_key: false,
1750                        },
1751                    ],
1752                    indexes: vec![],
1753                },
1754                crate::TableSnapshot {
1755                    name: "Todo".into(),
1756                    columns: vec![
1757                        crate::ColumnSnapshot {
1758                            name: "id".into(),
1759                            column_type: "TEXT".into(),
1760                            notnull: true,
1761                            primary_key: true,
1762                        },
1763                        crate::ColumnSnapshot {
1764                            name: "title".into(),
1765                            column_type: "TEXT".into(),
1766                            notnull: true,
1767                            primary_key: false,
1768                        },
1769                        crate::ColumnSnapshot {
1770                            name: "done".into(),
1771                            column_type: "BOOLEAN".into(),
1772                            notnull: true,
1773                            primary_key: false,
1774                        },
1775                        crate::ColumnSnapshot {
1776                            name: "userId".into(),
1777                            column_type: "TEXT".into(),
1778                            notnull: true,
1779                            primary_key: false,
1780                        },
1781                        crate::ColumnSnapshot {
1782                            name: "createdAt".into(),
1783                            column_type: "TIMESTAMPTZ".into(),
1784                            notnull: true,
1785                            primary_key: false,
1786                        },
1787                    ],
1788                    indexes: vec![crate::IndexSnapshot {
1789                        name: "Todo_by_user".into(),
1790                        columns: vec!["userId".into()],
1791                        unique: false,
1792                    }],
1793                },
1794            ],
1795        };
1796        let manifest = test_manifest();
1797        let plan = plan_from_snapshot(&snapshot, &manifest);
1798        assert!(plan.is_empty());
1799    }
1800
1801    #[test]
1802    fn plan_detects_missing_column_in_snapshot() {
1803        let snapshot = crate::SchemaSnapshot {
1804            tables: vec![
1805                crate::TableSnapshot {
1806                    name: "User".into(),
1807                    columns: vec![
1808                        crate::ColumnSnapshot {
1809                            name: "id".into(),
1810                            column_type: "TEXT".into(),
1811                            notnull: true,
1812                            primary_key: true,
1813                        },
1814                        crate::ColumnSnapshot {
1815                            name: "email".into(),
1816                            column_type: "TEXT".into(),
1817                            notnull: true,
1818                            primary_key: false,
1819                        },
1820                        // missing displayName and createdAt
1821                    ],
1822                    indexes: vec![],
1823                },
1824                crate::TableSnapshot {
1825                    name: "Todo".into(),
1826                    columns: vec![
1827                        crate::ColumnSnapshot {
1828                            name: "id".into(),
1829                            column_type: "TEXT".into(),
1830                            notnull: true,
1831                            primary_key: true,
1832                        },
1833                        crate::ColumnSnapshot {
1834                            name: "title".into(),
1835                            column_type: "TEXT".into(),
1836                            notnull: true,
1837                            primary_key: false,
1838                        },
1839                        crate::ColumnSnapshot {
1840                            name: "done".into(),
1841                            column_type: "BOOLEAN".into(),
1842                            notnull: true,
1843                            primary_key: false,
1844                        },
1845                        crate::ColumnSnapshot {
1846                            name: "userId".into(),
1847                            column_type: "TEXT".into(),
1848                            notnull: true,
1849                            primary_key: false,
1850                        },
1851                        crate::ColumnSnapshot {
1852                            name: "createdAt".into(),
1853                            column_type: "TIMESTAMPTZ".into(),
1854                            notnull: true,
1855                            primary_key: false,
1856                        },
1857                    ],
1858                    indexes: vec![crate::IndexSnapshot {
1859                        name: "Todo_by_user".into(),
1860                        columns: vec!["userId".into()],
1861                        unique: false,
1862                    }],
1863                },
1864            ],
1865        };
1866        let manifest = test_manifest();
1867        let plan = plan_from_snapshot(&snapshot, &manifest);
1868
1869        let add_fields: Vec<_> = plan
1870            .operations
1871            .iter()
1872            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1873            .collect();
1874        assert_eq!(add_fields.len(), 2); // displayName + createdAt
1875    }
1876
1877    // -- CRUD helper tests (no live database required) --
1878
1879    #[test]
1880    fn json_value_to_string_handles_all_types() {
1881        assert_eq!(
1882            json_value_to_string(&serde_json::Value::String("hello".into())),
1883            "hello"
1884        );
1885        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1886        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1887        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1888        assert_eq!(
1889            json_value_to_string(&serde_json::Value::Bool(false)),
1890            "false"
1891        );
1892        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1893        // Arrays and objects get their JSON representation.
1894        assert_eq!(
1895            json_value_to_string(&serde_json::json!([1, 2, 3])),
1896            "[1,2,3]"
1897        );
1898        assert_eq!(
1899            json_value_to_string(&serde_json::json!({"a": 1})),
1900            "{\"a\":1}"
1901        );
1902    }
1903
1904    #[test]
1905    fn generate_id_returns_hex_string() {
1906        let id = generate_id();
1907        assert!(!id.is_empty());
1908        // Must be valid hex characters.
1909        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1910    }
1911
1912    #[test]
1913    fn generate_id_is_unique_across_calls() {
1914        let id1 = generate_id();
1915        let id2 = generate_id();
1916        assert_ne!(id1, id2);
1917    }
1918
1919    #[test]
1920    fn generate_id_is_lex_sortable() {
1921        // 1000 IDs back-to-back must come out in monotonically increasing
1922        // lexicographic order. This is what makes cursor pagination correct.
1923        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1924        let sorted = {
1925            let mut s = ids.clone();
1926            s.sort();
1927            s
1928        };
1929        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1930        // And every id must be the same width (otherwise lex comparison is
1931        // wrong at width boundaries).
1932        let len0 = ids[0].len();
1933        assert!(ids.iter().all(|id| id.len() == len0));
1934        ids.dedup();
1935        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1936    }
1937
1938    #[test]
1939    fn build_insert_sql_simple() {
1940        let data = serde_json::json!({
1941            "email": "alice@example.com",
1942            "displayName": "Alice"
1943        });
1944        let (sql, values) = build_insert_sql("User", &data).unwrap();
1945
1946        assert!(sql.starts_with("INSERT INTO \"User\""));
1947        assert!(sql.contains("id"));
1948        assert!(sql.contains("$1"));
1949        assert!(sql.contains("$2"));
1950        assert!(sql.contains("$3"));
1951        // First value is the generated ID — JsonParam::Text variant.
1952        match &values[0] {
1953            JsonParam::Text(s) => assert!(!s.is_empty()),
1954            other => panic!("expected Text id param, got {other:?}"),
1955        }
1956        assert_eq!(values.len(), 3); // id + 2 fields
1957    }
1958
1959    #[test]
1960    fn build_insert_sql_preserves_json_types() {
1961        let data = serde_json::json!({
1962            "n": 42,
1963            "f": 1.5,
1964            "b": true,
1965            "s": "hi",
1966            "z": null,
1967        });
1968        let (_sql, values) = build_insert_sql("T", &data).unwrap();
1969        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
1970        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
1971        assert!(matches!(kinds[0], JsonParam::Bool(true)));
1972        assert!(matches!(kinds[1], JsonParam::Float(_)));
1973        assert!(matches!(kinds[2], JsonParam::Int(42)));
1974        assert!(matches!(kinds[3], JsonParam::Text(_)));
1975        assert!(matches!(kinds[4], JsonParam::Null));
1976    }
1977
1978    #[test]
1979    fn build_insert_sql_quotes_column_names() {
1980        let data = serde_json::json!({"createdAt": "2026-01-01"});
1981        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
1982        assert!(sql.contains("\"createdAt\""));
1983        assert!(sql.contains("\"Todo\""));
1984    }
1985
1986    #[test]
1987    fn build_insert_sql_rejects_non_object() {
1988        let data = serde_json::json!("not an object");
1989        let result = build_insert_sql("User", &data);
1990        assert!(result.is_err());
1991        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
1992    }
1993
1994    #[test]
1995    fn build_update_sql_simple() {
1996        let data = serde_json::json!({
1997            "displayName": "Bob",
1998            "email": "bob@example.com"
1999        });
2000        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2001
2002        assert!(sql.starts_with("UPDATE \"User\" SET"));
2003        assert!(sql.contains("WHERE id = $1"));
2004        assert!(sql.contains("$2"));
2005        assert!(sql.contains("$3"));
2006        match &values[0] {
2007            JsonParam::Text(s) => assert_eq!(s, "abc123"),
2008            other => panic!("expected Text id param, got {other:?}"),
2009        }
2010        assert_eq!(values.len(), 3); // id + 2 fields
2011    }
2012
2013    #[test]
2014    fn build_update_sql_quotes_column_names() {
2015        let data = serde_json::json!({"displayName": "Carol"});
2016        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2017        assert!(sql.contains("\"displayName\" = $2"));
2018    }
2019
2020    #[test]
2021    fn build_update_sql_rejects_non_object() {
2022        let data = serde_json::json!(42);
2023        let result = build_update_sql("User", "id1", &data);
2024        assert!(result.is_err());
2025        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2026    }
2027
2028    #[test]
2029    fn build_update_sql_rejects_empty_object() {
2030        let data = serde_json::json!({});
2031        let err = build_update_sql("User", "id1", &data).unwrap_err();
2032        assert_eq!(err.code, "PG_INVALID_DATA");
2033        assert!(err.message.contains("at least one field"));
2034    }
2035}