Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40// ---------------------------------------------------------------------------
41// SQL generation
42// ---------------------------------------------------------------------------
43
44/// Generate a Postgres CREATE TABLE statement.
45pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
46    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
47
48    for field in fields {
49        let col_type = pg_column_type(&field.field_type);
50        let not_null = if field.optional { "" } else { " NOT NULL" };
51        let unique = if field.unique { " UNIQUE" } else { "" };
52        columns.push(format!(
53            "{} {}{}{}",
54            quote_ident(&field.name),
55            col_type,
56            not_null,
57            unique
58        ));
59    }
60
61    format!(
62        "CREATE TABLE IF NOT EXISTS {} ({})",
63        quote_ident(entity_name),
64        columns.join(", ")
65    )
66}
67
68/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
69/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
70/// Required-ness is tracked in the manifest; enforcement deferred.
71pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
72    let col_type = pg_column_type(&field.field_type);
73    let unique = if field.unique { " UNIQUE" } else { "" };
74    format!(
75        "ALTER TABLE {} ADD COLUMN {} {}{}",
76        quote_ident(entity_name),
77        quote_ident(&field.name),
78        col_type,
79        unique
80    )
81}
82
83/// Generate a Postgres CREATE INDEX statement.
84pub fn create_index_sql(
85    entity_name: &str,
86    index_name: &str,
87    fields: &[String],
88    unique: bool,
89) -> String {
90    let unique_str = if unique { "UNIQUE " } else { "" };
91    let full_index_name = format!("{}_{}", entity_name, index_name);
92    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
93    format!(
94        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
95        unique_str,
96        quote_ident(&full_index_name),
97        quote_ident(entity_name),
98        quoted_fields.join(", ")
99    )
100}
101
102// ---------------------------------------------------------------------------
103// PostgresAdapter — planning-only adapter
104// ---------------------------------------------------------------------------
105
106/// A Postgres storage adapter. Currently supports planning only.
107/// No live connection — SQL generation and planning from manifest.
108pub struct PostgresAdapter;
109
110impl StorageAdapter for PostgresAdapter {
111    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
112        // Plan from empty baseline.
113        let mut operations = Vec::new();
114
115        for entity in &target.entities {
116            let fields: Vec<FieldSpec> = entity
117                .fields
118                .iter()
119                .map(|f| FieldSpec {
120                    name: f.name.clone(),
121                    field_type: f.field_type.clone(),
122                    optional: f.optional,
123                    unique: f.unique,
124                })
125                .collect();
126
127            operations.push(SchemaOperation::CreateEntity {
128                name: entity.name.clone(),
129                fields,
130            });
131
132            for index in &entity.indexes {
133                operations.push(SchemaOperation::AddIndex {
134                    entity: entity.name.clone(),
135                    name: index.name.clone(),
136                    fields: index.fields.clone(),
137                    unique: index.unique,
138                });
139            }
140        }
141
142        if operations.is_empty() {
143            operations.push(SchemaOperation::Noop);
144        }
145
146        Ok(SchemaPlan { operations })
147    }
148
149    // apply_schema intentionally not implemented — uses default trait error.
150}
151
152/// Generate all SQL statements for a plan, in order.
153/// Useful for dry-run preview of what Postgres DDL would be executed.
154pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
155    let mut statements = Vec::new();
156
157    for op in &plan.operations {
158        match op {
159            SchemaOperation::CreateEntity { name, fields } => {
160                statements.push(create_table_sql(name, fields));
161            }
162            SchemaOperation::AddField { entity, field } => {
163                statements.push(add_column_sql(entity, field));
164            }
165            SchemaOperation::AlterField {
166                entity,
167                previous,
168                target,
169            } => {
170                // Only nullable transitions today. SET / DROP NOT NULL is
171                // safe on a populated table when going from required →
172                // optional (existing rows already satisfy NOT NULL); the
173                // reverse direction (optional → required) succeeds only
174                // if every row has a non-null value, which the planner
175                // has no way to know — Postgres will fail the migration
176                // if it can't and the operator gets a clear error from
177                // the apply step.
178                if previous.optional && !target.optional {
179                    statements.push(format!(
180                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
181                        quote_ident(entity),
182                        quote_ident(&target.name)
183                    ));
184                } else if !previous.optional && target.optional {
185                    statements.push(format!(
186                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
187                        quote_ident(entity),
188                        quote_ident(&target.name)
189                    ));
190                }
191                // Nothing emitted when neither nullable nor type changed
192                // — falls through silently. AlterField with no actual
193                // shape change shouldn't happen in practice (the planner
194                // only emits it on real drift), but guard against
195                // emitting empty SQL just in case.
196            }
197            SchemaOperation::AddIndex {
198                entity,
199                name,
200                fields,
201                unique,
202            } => {
203                statements.push(create_index_sql(entity, name, fields, *unique));
204            }
205            SchemaOperation::Noop => {}
206            other => {
207                return Err(StorageError {
208                    code: "PG_OP_UNSUPPORTED".into(),
209                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
210                });
211            }
212        }
213    }
214
215    Ok(statements)
216}
217
218// ---------------------------------------------------------------------------
219// Introspection SQL helpers
220//
221// These generate the SQL queries that a live Postgres connection would run
222// to read the current schema. No connection required — just SQL strings.
223// ---------------------------------------------------------------------------
224
225/// SQL to list user tables in the public schema.
226pub const INTROSPECT_TABLES_SQL: &str = "\
227    SELECT table_name \
228    FROM information_schema.tables \
229    WHERE table_schema = 'public' \
230      AND table_type = 'BASE TABLE' \
231      AND table_name NOT LIKE '_pylon_%' \
232    ORDER BY table_name";
233
234/// SQL to list columns for a given table.
235/// Use with parameter: table_name.
236pub const INTROSPECT_COLUMNS_SQL: &str = "\
237    SELECT column_name, data_type, is_nullable, \
238           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
239            JOIN information_schema.key_column_usage kcu \
240              ON tc.constraint_name = kcu.constraint_name \
241            WHERE tc.table_name = c.table_name \
242              AND kcu.column_name = c.column_name \
243              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
244    FROM information_schema.columns c \
245    WHERE table_schema = 'public' AND table_name = $1 \
246    ORDER BY ordinal_position";
247
248/// SQL to list indexes for a given table.
249/// Use with parameter: table_name.
250pub const INTROSPECT_INDEXES_SQL: &str = "\
251    SELECT i.relname as index_name, \
252           ix.indisunique as is_unique, \
253           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
254    FROM pg_index ix \
255    JOIN pg_class t ON t.oid = ix.indrelid \
256    JOIN pg_class i ON i.oid = ix.indexrelid \
257    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
258    JOIN pg_namespace n ON n.oid = t.relnamespace \
259    WHERE n.nspname = 'public' \
260      AND t.relname = $1 \
261      AND NOT ix.indisprimary \
262    GROUP BY i.relname, ix.indisunique \
263    ORDER BY i.relname";
264
265/// Plan from a snapshot (reuses the shared plan_from_snapshot).
266/// This allows Postgres to plan incrementally once introspection data is available.
267pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
268    crate::plan_from_snapshot(snapshot, target)
269}
270
271// ---------------------------------------------------------------------------
272// CRUD SQL generation helpers (used by live adapter, testable without a DB)
273// ---------------------------------------------------------------------------
274
275/// Generate a lex-sortable, monotonic-ish unique ID.
276///
277/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
278/// of a per-process atomic counter. The counter prevents collisions when two
279/// inserts hit the same nanosecond and — critically — keeps order stable: an
280/// id minted at the same nanosecond is monotonically greater than the
281/// previous one. Width is fixed at 40 chars so lexicographic comparison
282/// matches creation order, which is what cursor pagination relies on.
283pub fn generate_id() -> String {
284    use std::sync::atomic::{AtomicU32, Ordering};
285    use std::time::{SystemTime, UNIX_EPOCH};
286    static COUNTER: AtomicU32 = AtomicU32::new(0);
287    let ts = SystemTime::now()
288        .duration_since(UNIX_EPOCH)
289        .unwrap_or_default()
290        .as_nanos();
291    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
292    format!("{ts:032x}{seq:08x}")
293}
294
295/// Convert a JSON value to its string representation for use as a SQL parameter.
296///
297/// Kept for back-compat with callers that need a textual fallback (e.g.
298/// human-readable logs). New code should bind through [`JsonParam`] so
299/// integers/booleans/nulls reach Postgres in their typed form instead of
300/// collapsing to TEXT, which the driver can't coerce into INTEGER /
301/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
302/// into an empty string for FK columns.
303pub fn json_value_to_string(val: &serde_json::Value) -> String {
304    match val {
305        serde_json::Value::String(s) => s.clone(),
306        serde_json::Value::Number(n) => n.to_string(),
307        serde_json::Value::Bool(b) => b.to_string(),
308        serde_json::Value::Null => String::new(),
309        other => other.to_string(),
310    }
311}
312
313/// A typed wrapper around a JSON scalar that implements
314/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
315///
316/// The previous implementation passed every value as `String`, which broke
317/// non-text columns (the postgres driver can't bind a string literal into
318/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
319/// `null` into the empty string for nullable FKs (so `unlink` left
320/// dangling `""` references instead of NULL). `JsonParam` carries the
321/// JSON variant tag through to `to_sql` so the driver can pick the
322/// correct binary representation per column type.
323///
324/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
325/// runtime layer doesn't currently model array/object columns at the
326/// manifest level on Postgres, so anything that lands here came from
327/// caller-supplied prose that's expected to fit into a TEXT column.
328#[derive(Debug, Clone, PartialEq)]
329pub enum JsonParam {
330    Null,
331    Text(String),
332    Int(i64),
333    Float(f64),
334    Bool(bool),
335}
336
337impl JsonParam {
338    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
339    /// that fit `i64` go through as Int; everything else goes through as
340    /// Float to preserve fractional / large-magnitude values.
341    pub fn from_json(val: &serde_json::Value) -> Self {
342        match val {
343            serde_json::Value::Null => JsonParam::Null,
344            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
345            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
346            serde_json::Value::Number(n) => {
347                if let Some(i) = n.as_i64() {
348                    JsonParam::Int(i)
349                } else if let Some(f) = n.as_f64() {
350                    JsonParam::Float(f)
351                } else {
352                    JsonParam::Text(n.to_string())
353                }
354            }
355            other => JsonParam::Text(other.to_string()),
356        }
357    }
358}
359
360#[cfg(feature = "postgres-live")]
361impl postgres::types::ToSql for JsonParam {
362    fn to_sql(
363        &self,
364        ty: &postgres::types::Type,
365        out: &mut bytes::BytesMut,
366    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
367        use postgres::types::Type;
368
369        // Null binds as SQL NULL regardless of the column's declared
370        // type — the postgres driver treats `IsNull::Yes` as a null
371        // value of the requested type.
372        if matches!(self, JsonParam::Null) {
373            return Ok(postgres::types::IsNull::Yes);
374        }
375
376        // Match each JsonParam variant against the COLUMN's declared
377        // type so the binary encoding actually fits the target slot.
378        // Postgres rejects "binary data of wrong size" if you bind an
379        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
380        // exactly what the previous "everything is a String" path did
381        // on every non-TEXT column.
382        match (self, ty) {
383            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
384
385            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
386            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
387            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
388            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
389            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
390
391            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
392            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
393            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
394            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
395
396            (JsonParam::Text(s), &Type::TEXT)
397            | (JsonParam::Text(s), &Type::VARCHAR)
398            | (JsonParam::Text(s), &Type::BPCHAR)
399            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
400            (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
401                // The runtime models datetimes as ISO 8601 strings
402                // (`pylon_kernel::util::now_iso` shape, plus
403                // user-supplied RFC 3339). Postgres's TIMESTAMPTZ
404                // binary wire format is `i64` microseconds since
405                // 2000-01-01 UTC — NOT the bytes of an ISO string.
406                // The previous impl bound via `&str::to_sql(TIMESTAMPTZ, ...)`,
407                // which advertised TIMESTAMPTZ format but wrote raw
408                // ASCII; Postgres rejected with "incorrect binary
409                // data format in bind parameter N". This was the
410                // OAuth-callback failure mode on pylon-cloud
411                // (User.createdAt). Parse via chrono and let the
412                // postgres crate's `with-chrono-0_4` ToSql impl
413                // emit the proper binary format.
414                let dt = chrono::DateTime::parse_from_rfc3339(s)
415                    .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
416                    .with_timezone(&chrono::Utc);
417                dt.to_sql(ty, out)
418            }
419            (JsonParam::Text(s), &Type::TIMESTAMP) => {
420                // TIMESTAMP (no timezone) — same conversion shape but
421                // bind as NaiveDateTime so chrono picks the right
422                // binary encoding for the column.
423                let dt = chrono::DateTime::parse_from_rfc3339(s)
424                    .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
425                    .with_timezone(&chrono::Utc)
426                    .naive_utc();
427                dt.to_sql(ty, out)
428            }
429            (JsonParam::Text(s), &Type::DATE) => {
430                let dt = chrono::DateTime::parse_from_rfc3339(s)
431                    .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
432                    .with_timezone(&chrono::Utc)
433                    .date_naive();
434                dt.to_sql(ty, out)
435            }
436
437            // Cross-type fallback: render as text and bind into a TEXT
438            // slot, OR error if the target column doesn't accept text.
439            // Catches "manifest says INT but caller sent a stringified
440            // number" — better to fail loudly than silently coerce.
441            (other, _) => {
442                let s = match other {
443                    JsonParam::Bool(b) => b.to_string(),
444                    JsonParam::Int(n) => n.to_string(),
445                    JsonParam::Float(f) => f.to_string(),
446                    JsonParam::Text(s) => s.clone(),
447                    JsonParam::Null => unreachable!(),
448                };
449                s.to_sql(ty, out)
450            }
451        }
452    }
453
454    fn accepts(_ty: &postgres::types::Type) -> bool {
455        // Defer per-variant acceptance to to_sql_checked, which dispatches
456        // to the inner type's ToSql impl. Returning `true` here matches
457        // the postgres crate's recommended pattern for sum-type wrappers.
458        true
459    }
460
461    postgres::types::to_sql_checked!();
462}
463
464/// Build an INSERT SQL statement and collect typed parameter values.
465/// Returns `(sql, params)` where `params[0]` is the generated ID
466/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
467/// value so the postgres driver can bind them to typed columns
468/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
469/// the database as SQL NULL — the previous string-collapsing path stored
470/// `""` for nullable FKs and broke any non-text column.
471pub fn build_insert_sql(
472    entity: &str,
473    data: &serde_json::Value,
474) -> Result<(String, Vec<JsonParam>), StorageError> {
475    let id = generate_id();
476    let obj = data.as_object().ok_or_else(|| StorageError {
477        code: "PG_INVALID_DATA".into(),
478        message: "Insert data must be a JSON object".into(),
479    })?;
480
481    let mut col_names = vec!["id".to_string()];
482    let mut placeholders = vec!["$1".to_string()];
483    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
484
485    for (i, (key, val)) in obj.iter().enumerate() {
486        col_names.push(quote_ident(key));
487        placeholders.push(format!("${}", i + 2));
488        values.push(JsonParam::from_json(val));
489    }
490
491    let sql = format!(
492        "INSERT INTO {} ({}) VALUES ({})",
493        quote_ident(entity),
494        col_names.join(", "),
495        placeholders.join(", ")
496    );
497
498    Ok((sql, values))
499}
500
501/// Build an UPDATE SQL statement and collect typed parameter values.
502/// Returns `(sql, params)` where `params[0]` is the row ID.
503pub fn build_update_sql(
504    entity: &str,
505    id: &str,
506    data: &serde_json::Value,
507) -> Result<(String, Vec<JsonParam>), StorageError> {
508    let obj = data.as_object().ok_or_else(|| StorageError {
509        code: "PG_INVALID_DATA".into(),
510        message: "Update data must be a JSON object".into(),
511    })?;
512
513    if obj.is_empty() {
514        return Err(StorageError {
515            code: "PG_INVALID_DATA".into(),
516            message: "Update data must contain at least one field".into(),
517        });
518    }
519
520    let mut set_clauses = Vec::new();
521    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
522
523    for (i, (key, val)) in obj.iter().enumerate() {
524        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
525        values.push(JsonParam::from_json(val));
526    }
527
528    let sql = format!(
529        "UPDATE {} SET {} WHERE id = $1",
530        quote_ident(entity),
531        set_clauses.join(", ")
532    );
533
534    Ok((sql, values))
535}
536
537/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
538/// at insert/update/transact call sites. The postgres driver wants a
539/// slice of trait objects; this avoids repeating the same map at each site.
540#[cfg(feature = "postgres-live")]
541fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
542    values
543        .iter()
544        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
545        .collect()
546}
547
548// ---------------------------------------------------------------------------
549// Live Postgres adapter (requires "postgres-live" feature)
550// ---------------------------------------------------------------------------
551
552#[cfg(feature = "postgres-live")]
553pub mod live {
554    use super::*;
555    use crate::{
556        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
557    };
558
559    /// A live Postgres adapter with a real database connection.
560    pub struct LivePostgresAdapter {
561        client: postgres::Client,
562    }
563
564    impl LivePostgresAdapter {
565        /// Connect to a Postgres database.
566        pub fn connect(url: &str) -> Result<Self, StorageError> {
567            let client =
568                postgres::Client::connect(url, postgres::NoTls).map_err(|e| StorageError {
569                    code: "PG_CONNECT_FAILED".into(),
570                    message: format!("Failed to connect to Postgres: {e}"),
571                })?;
572            Ok(Self { client })
573        }
574
575        /// Read the current schema from the live database.
576        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
577            let table_rows = self
578                .client
579                .query(INTROSPECT_TABLES_SQL, &[])
580                .map_err(pg_err)?;
581
582            let mut tables = Vec::new();
583            for row in &table_rows {
584                let table_name: String = row.get(0);
585                let columns = self.read_columns(&table_name)?;
586                let indexes = self.read_indexes(&table_name)?;
587                tables.push(TableSnapshot {
588                    name: table_name,
589                    columns,
590                    indexes,
591                });
592            }
593
594            Ok(SchemaSnapshot { tables })
595        }
596
597        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
598            let rows = self
599                .client
600                .query(INTROSPECT_COLUMNS_SQL, &[&table])
601                .map_err(pg_err)?;
602
603            let mut columns = Vec::new();
604            for row in &rows {
605                let name: String = row.get(0);
606                let data_type: String = row.get(1);
607                let is_nullable: String = row.get(2);
608                let is_pk: i64 = row.get(3);
609                columns.push(ColumnSnapshot {
610                    name,
611                    column_type: data_type,
612                    notnull: is_nullable == "NO",
613                    primary_key: is_pk > 0,
614                });
615            }
616            Ok(columns)
617        }
618
619        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
620            let rows = self
621                .client
622                .query(INTROSPECT_INDEXES_SQL, &[&table])
623                .map_err(pg_err)?;
624
625            let mut indexes = Vec::new();
626            for row in &rows {
627                let name: String = row.get(0);
628                let unique: bool = row.get(1);
629                let columns: Vec<String> = row.get(2);
630                indexes.push(IndexSnapshot {
631                    name,
632                    columns,
633                    unique,
634                });
635            }
636            Ok(indexes)
637        }
638
639        /// Plan from live database state.
640        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
641            let snapshot = self.read_schema()?;
642            Ok(crate::plan_from_snapshot(&snapshot, target))
643        }
644    }
645
646    impl StorageAdapter for LivePostgresAdapter {
647        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
648            Err(StorageError {
649                code: "PG_PLAN_NEEDS_MUTABLE".into(),
650                message: "Use plan_from_live() instead for live Postgres planning".into(),
651            })
652        }
653
654        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
655            Err(StorageError {
656                code: "PG_APPLY_USE_METHOD".into(),
657                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
658            })
659        }
660    }
661
662    impl LivePostgresAdapter {
663        /// Apply a schema plan to the live database.
664        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
665            let statements = plan_to_sql(plan)?;
666            for sql in &statements {
667                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
668            }
669            Ok(())
670        }
671
672        /// Execute a raw SQL statement against the live database. Used by
673        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
674        /// production code should go through `apply_plan` so changes are
675        /// represented in the migration history. Returns the number of
676        /// rows affected.
677        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
678            self.client.execute(sql, &[]).map_err(pg_err)
679        }
680
681        /// Insert a row. Returns the generated ID.
682        pub fn insert(
683            &mut self,
684            entity: &str,
685            data: &serde_json::Value,
686        ) -> Result<String, StorageError> {
687            let (sql, values) = build_insert_sql(entity, data)?;
688            // The first param is always the generated ID — extract it before
689            // we hand `values` off to the postgres driver as borrowed slices.
690            let id = match &values[0] {
691                JsonParam::Text(s) => s.clone(),
692                _ => {
693                    return Err(StorageError {
694                        code: "PG_INTERNAL".into(),
695                        message: "build_insert_sql produced non-text id param".into(),
696                    });
697                }
698            };
699            let params = as_pg_params(&values);
700            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
701            Ok(id)
702        }
703
704        /// Get a row by ID.
705        pub fn get_by_id(
706            &mut self,
707            entity: &str,
708            id: &str,
709        ) -> Result<Option<serde_json::Value>, StorageError> {
710            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
711            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
712
713            match rows.first() {
714                Some(row) => Ok(Some(row_to_json(row))),
715                None => Ok(None),
716            }
717        }
718
719        /// List all rows from an entity.
720        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
721            let sql = format!("SELECT * FROM {}", quote_ident(entity));
722            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
723
724            Ok(rows.iter().map(row_to_json).collect())
725        }
726
727        /// Cursor-paginated list. `after` is the last `id` from the previous
728        /// page; the result contains rows with `id > after` (lex order),
729        /// limited to `limit`. Used for sync push/pull.
730        pub fn list_after(
731            &mut self,
732            entity: &str,
733            after: Option<&str>,
734            limit: usize,
735        ) -> Result<Vec<serde_json::Value>, StorageError> {
736            // Cap limit at a sensible upper bound so a malicious client can't
737            // stream the whole table by passing limit=u64::MAX.
738            let capped: i64 = limit.min(10_000) as i64;
739            let sql = match after {
740                Some(_) => format!(
741                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
742                    quote_ident(entity)
743                ),
744                None => format!(
745                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
746                    quote_ident(entity)
747                ),
748            };
749            let rows = match after {
750                Some(cursor) => self
751                    .client
752                    .query(sql.as_str(), &[&cursor, &capped])
753                    .map_err(pg_err)?,
754                None => self
755                    .client
756                    .query(sql.as_str(), &[&capped])
757                    .map_err(pg_err)?,
758            };
759            Ok(rows.iter().map(row_to_json).collect())
760        }
761
762        /// Update a row by ID. Returns true if the row was found and updated.
763        pub fn update(
764            &mut self,
765            entity: &str,
766            id: &str,
767            data: &serde_json::Value,
768        ) -> Result<bool, StorageError> {
769            let (sql, values) = build_update_sql(entity, id, data)?;
770            let params = as_pg_params(&values);
771            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
772            Ok(rows_affected > 0)
773        }
774
775        /// Delete a row by ID. Returns true if the row was found and deleted.
776        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
777            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
778            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
779            Ok(rows_affected > 0)
780        }
781
782        /// Look up a row by `field = value`. Caller must validate `field`
783        /// against the manifest before calling — we still `quote_ident` it
784        /// but won't catch a typo against the entity definition.
785        pub fn lookup_field(
786            &mut self,
787            entity: &str,
788            field: &str,
789            value: &str,
790        ) -> Result<Option<serde_json::Value>, StorageError> {
791            let sql = format!(
792                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
793                quote_ident(entity),
794                quote_ident(field),
795            );
796            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
797            Ok(rows.first().map(row_to_json))
798        }
799
800        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
801        ///
802        /// Supported operators (parity with the SQLite path):
803        /// - Equality (`field: value`)
804        /// - `$not`: emits `field != value`
805        /// - `$gt` / `$gte` / `$lt` / `$lte`
806        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
807        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
808        ///   if/when the SQLite side adds it)
809        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
810        ///
811        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
812        ///
813        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
814        /// would need a tsvector column or a generic ILIKE OR-fold across
815        /// every text field, neither of which is wired up yet. Returns
816        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
817        /// receiving silently-broad results.
818        ///
819        /// Anything else is silently ignored (matches the in-memory fallback's
820        /// permissive behavior). Field names are validated against `valid_columns`
821        /// to prevent SQL injection — pass the entity's column set.
822        pub fn query_filtered(
823            &mut self,
824            entity: &str,
825            filter: &serde_json::Value,
826            valid_columns: &[String],
827        ) -> Result<Vec<serde_json::Value>, StorageError> {
828            let empty = serde_json::Map::new();
829            let obj = filter.as_object().unwrap_or(&empty);
830
831            let validate = |col: &str| -> Result<(), StorageError> {
832                if col == "id" || valid_columns.iter().any(|c| c == col) {
833                    Ok(())
834                } else {
835                    Err(StorageError {
836                        code: "UNKNOWN_COLUMN".into(),
837                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
838                    })
839                }
840            };
841
842            let mut where_clauses: Vec<String> = Vec::new();
843            let mut order_clause = String::new();
844            let mut limit_clause = String::new();
845            let mut offset_clause = String::new();
846            // Collect (col, op, value) so placeholder numbers can be assigned
847            // in a single materialization pass after the parse loop. Values
848            // are now JsonParam (typed) instead of String — see `value_to_pg`.
849            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
850
851            for (key, val) in obj {
852                match key.as_str() {
853                    "$search" => {
854                        // FTS5 (SQLite) has no portable equivalent in
855                        // Postgres without an explicit tsvector column.
856                        // Return a clear error rather than silently
857                        // ignoring the operator — callers that hit this
858                        // need to either branch on backend or define a
859                        // tsvector + GIN index in their schema.
860                        return Err(StorageError {
861                            code: "SEARCH_NOT_SUPPORTED".into(),
862                            message: "$search is SQLite-FTS5-only; use a Postgres tsvector column with the storage adapter's full-text path"
863                                .into(),
864                        });
865                    }
866                    "$order" => {
867                        if let Some(ord) = val.as_object() {
868                            let mut parts = Vec::new();
869                            for (col, dir) in ord {
870                                validate(col)?;
871                                let d = match dir.as_str().unwrap_or("asc") {
872                                    "desc" | "DESC" => "DESC",
873                                    _ => "ASC",
874                                };
875                                parts.push(format!("{} {d}", quote_ident(col)));
876                            }
877                            if !parts.is_empty() {
878                                order_clause = format!(" ORDER BY {}", parts.join(", "));
879                            }
880                        }
881                    }
882                    "$limit" => {
883                        if let Some(n) = val.as_u64() {
884                            limit_clause = format!(" LIMIT {}", n);
885                        }
886                    }
887                    "$offset" => {
888                        if let Some(n) = val.as_u64() {
889                            offset_clause = format!(" OFFSET {}", n);
890                        }
891                    }
892                    field => {
893                        validate(field)?;
894                        match val {
895                            serde_json::Value::Object(ops) => {
896                                for (op, v) in ops {
897                                    match op.as_str() {
898                                        "$not" => planned.push((
899                                            field.into(),
900                                            "!=".into(),
901                                            value_to_pg(v),
902                                        )),
903                                        "$gt" => {
904                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
905                                        }
906                                        "$gte" => planned.push((
907                                            field.into(),
908                                            ">=".into(),
909                                            value_to_pg(v),
910                                        )),
911                                        "$lt" => {
912                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
913                                        }
914                                        "$lte" => planned.push((
915                                            field.into(),
916                                            "<=".into(),
917                                            value_to_pg(v),
918                                        )),
919                                        "$like" => {
920                                            // Wrap in `%...%` to match the
921                                            // SQLite path's substring
922                                            // semantics. Pre-fix divergence:
923                                            // SQLite wrapped, PG forwarded
924                                            // literally — `{name: {$like: "ann"}}`
925                                            // matched "Joanne" on SQLite but
926                                            // nothing on PG. Caller-supplied
927                                            // wildcards inside the value still
928                                            // work (`%j_n%` etc.) because we
929                                            // only add wraps, never strip.
930                                            let raw = match v {
931                                                serde_json::Value::String(s) => s.clone(),
932                                                other => other.to_string(),
933                                            };
934                                            planned.push((
935                                                field.into(),
936                                                "LIKE".into(),
937                                                JsonParam::Text(format!("%{raw}%")),
938                                            ));
939                                        }
940                                        "$in" => {
941                                            if let Some(arr) = v.as_array() {
942                                                if arr.is_empty() {
943                                                    // `field IN ()` is invalid
944                                                    // SQL on PG (and on SQLite
945                                                    // too, technically — its
946                                                    // path also short-circuits).
947                                                    // An empty $in matches
948                                                    // nothing; emit a guaranteed-
949                                                    // false predicate so the
950                                                    // parser doesn't choke and
951                                                    // the result set comes back
952                                                    // empty.
953                                                    where_clauses.push("FALSE".into());
954                                                } else {
955                                                    let placeholders: Vec<String> = (0..arr.len())
956                                                        .map(|i| {
957                                                            format!("${}", planned.len() + 1 + i)
958                                                        })
959                                                        .collect();
960                                                    where_clauses.push(format!(
961                                                        "{} IN ({})",
962                                                        quote_ident(field),
963                                                        placeholders.join(", "),
964                                                    ));
965                                                    for x in arr {
966                                                        planned.push((
967                                                            format!("__inline_{}", planned.len()),
968                                                            "__INLINE__".into(),
969                                                            value_to_pg(x),
970                                                        ));
971                                                    }
972                                                }
973                                            }
974                                        }
975                                        _ => {}
976                                    }
977                                }
978                            }
979                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
980                        }
981                    }
982                }
983            }
984
985            // Materialize planned -> SQL + params.
986            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
987            for (field, op, v) in &planned {
988                if op == "__INLINE__" {
989                    // Already emitted via the IN-clause path; just push the value.
990                    params.push(v.clone());
991                } else {
992                    let placeholder = format!("${}", params.len() + 1);
993                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
994                    params.push(v.clone());
995                }
996            }
997
998            let where_sql = if where_clauses.is_empty() {
999                String::new()
1000            } else {
1001                format!(" WHERE {}", where_clauses.join(" AND "))
1002            };
1003            // Default deterministic order when the caller didn't pass
1004            // `$order` — matches the SQLite path. Without this,
1005            // identical queries return rows in different orders across
1006            // backends, which makes paginated APIs flaky.
1007            let final_order = if order_clause.is_empty() {
1008                format!(" ORDER BY {}", quote_ident("id"))
1009            } else {
1010                order_clause
1011            };
1012            let sql = format!(
1013                "SELECT * FROM {}{}{}{}{}",
1014                quote_ident(entity),
1015                where_sql,
1016                final_order,
1017                limit_clause,
1018                offset_clause,
1019            );
1020
1021            let pg_params = as_pg_params(&params);
1022            let rows = self
1023                .client
1024                .query(sql.as_str(), &pg_params)
1025                .map_err(pg_err)?;
1026            Ok(rows.iter().map(row_to_json).collect())
1027        }
1028
1029        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
1030        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
1031        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
1032        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
1033        /// via `date_trunc`), and a flat-equality `where` filter.
1034        ///
1035        /// Spec format (same JSON shape used by the SQLite path):
1036        /// ```json
1037        /// { "count": "*",
1038        ///   "sum": ["amount"],
1039        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
1040        ///   "where": {"status": "paid"} }
1041        /// ```
1042        ///
1043        /// `valid_columns` is used to validate every field name before it's
1044        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
1045        /// `DataStore` impl in this crate) supplies the entity's column set
1046        /// from the manifest.
1047        pub fn aggregate(
1048            &mut self,
1049            entity: &str,
1050            spec: &serde_json::Value,
1051            valid_columns: &[String],
1052        ) -> Result<serde_json::Value, StorageError> {
1053            let obj = spec.as_object().ok_or_else(|| StorageError {
1054                code: "INVALID_QUERY".into(),
1055                message: "aggregate spec must be a JSON object".into(),
1056            })?;
1057
1058            let validate = |col: &str| -> Result<(), StorageError> {
1059                if col == "id" || valid_columns.iter().any(|c| c == col) {
1060                    Ok(())
1061                } else {
1062                    Err(StorageError {
1063                        code: "UNKNOWN_COLUMN".into(),
1064                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1065                    })
1066                }
1067            };
1068
1069            let mut select_parts: Vec<String> = Vec::new();
1070            let mut result_fields: Vec<String> = Vec::new();
1071
1072            if let Some(count) = obj.get("count") {
1073                match count {
1074                    serde_json::Value::String(s) if s == "*" => {
1075                        select_parts.push("COUNT(*) AS count".into());
1076                        result_fields.push("count".into());
1077                    }
1078                    serde_json::Value::String(field) => {
1079                        validate(field)?;
1080                        let alias = format!("count_{field}");
1081                        select_parts.push(format!(
1082                            "COUNT({}) AS {}",
1083                            quote_ident(field),
1084                            quote_ident(&alias),
1085                        ));
1086                        result_fields.push(alias);
1087                    }
1088                    _ => {}
1089                }
1090            }
1091
1092            for (fn_name, prefix) in [
1093                ("sum", "sum_"),
1094                ("avg", "avg_"),
1095                ("min", "min_"),
1096                ("max", "max_"),
1097            ] {
1098                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1099                    for field in fields {
1100                        if let Some(f) = field.as_str() {
1101                            validate(f)?;
1102                            let alias = format!("{prefix}{f}");
1103                            let sql_fn = fn_name.to_uppercase();
1104                            select_parts.push(format!(
1105                                "{}({}) AS {}",
1106                                sql_fn,
1107                                quote_ident(f),
1108                                quote_ident(&alias),
1109                            ));
1110                            result_fields.push(alias);
1111                        }
1112                    }
1113                }
1114            }
1115
1116            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1117                for field in fields {
1118                    if let Some(f) = field.as_str() {
1119                        validate(f)?;
1120                        let alias = format!("count_distinct_{f}");
1121                        select_parts.push(format!(
1122                            "COUNT(DISTINCT {}) AS {}",
1123                            quote_ident(f),
1124                            quote_ident(&alias),
1125                        ));
1126                        result_fields.push(alias);
1127                    }
1128                }
1129            }
1130
1131            // groupBy: column name or { field, bucket } — same vocabulary as
1132            // the SQLite path. Buckets translate to Postgres `date_trunc`
1133            // (SQLite uses `strftime`); both collapse rows to the bucket
1134            // boundary identically.
1135            let mut group_by: Vec<String> = Vec::new();
1136            let mut group_select: Vec<String> = Vec::new();
1137            let mut group_field_names: Vec<String> = Vec::new();
1138            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1139                for g in groups {
1140                    if let Some(f) = g.as_str() {
1141                        validate(f)?;
1142                        let q = quote_ident(f);
1143                        group_by.push(q.clone());
1144                        group_select.push(q);
1145                        group_field_names.push(f.to_string());
1146                    } else if let Some(spec) = g.as_object() {
1147                        let field =
1148                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1149                                StorageError {
1150                                    code: "INVALID_QUERY".into(),
1151                                    message: "groupBy object spec requires `field`".into(),
1152                                }
1153                            })?;
1154                        validate(field)?;
1155                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1156                        let trunc_unit = match bucket {
1157                            "hour" | "day" | "week" | "month" | "year" => bucket,
1158                            _ => {
1159                                return Err(StorageError {
1160                                    code: "INVALID_QUERY".into(),
1161                                    message: format!(
1162                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1163                                    ),
1164                                });
1165                            }
1166                        };
1167                        let alias = format!("{field}_{bucket}");
1168                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1169                        group_by.push(expr.clone());
1170                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1171                        group_field_names.push(alias);
1172                    }
1173                }
1174            }
1175
1176            let mut full_select = group_select.clone();
1177            full_select.extend(select_parts.iter().cloned());
1178            if full_select.is_empty() {
1179                return Err(StorageError {
1180                    code: "INVALID_QUERY".into(),
1181                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1182                });
1183            }
1184
1185            let mut where_clauses: Vec<String> = Vec::new();
1186            let mut params: Vec<JsonParam> = Vec::new();
1187            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1188                for (k, v) in w {
1189                    validate(k)?;
1190                    let placeholder = format!("${}", params.len() + 1);
1191                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1192                    params.push(value_to_pg(v));
1193                }
1194            }
1195            let where_sql = if where_clauses.is_empty() {
1196                String::new()
1197            } else {
1198                format!(" WHERE {}", where_clauses.join(" AND "))
1199            };
1200            let group_sql = if group_by.is_empty() {
1201                String::new()
1202            } else {
1203                format!(" GROUP BY {}", group_by.join(", "))
1204            };
1205
1206            let sql = format!(
1207                "SELECT {} FROM {}{}{}",
1208                full_select.join(", "),
1209                quote_ident(entity),
1210                where_sql,
1211                group_sql,
1212            );
1213
1214            let pg_params = as_pg_params(&params);
1215            let rows = self
1216                .client
1217                .query(sql.as_str(), &pg_params)
1218                .map_err(pg_err)?;
1219
1220            let column_names: Vec<String> = group_field_names
1221                .iter()
1222                .chain(result_fields.iter())
1223                .cloned()
1224                .collect();
1225
1226            let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1227            for row in &rows {
1228                let row_json = row_to_json(row);
1229                if let serde_json::Value::Object(map) = &row_json {
1230                    let mut filtered = serde_json::Map::new();
1231                    for name in &column_names {
1232                        if let Some(v) = map.get(name) {
1233                            filtered.insert(name.clone(), v.clone());
1234                        }
1235                    }
1236                    out.push(serde_json::Value::Object(filtered));
1237                } else {
1238                    out.push(row_json);
1239                }
1240            }
1241            Ok(serde_json::json!({ "rows": out }))
1242        }
1243    }
1244
1245    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1246    pub enum TxOp<'a> {
1247        Insert {
1248            entity: &'a str,
1249            data: &'a serde_json::Value,
1250        },
1251        Update {
1252            entity: &'a str,
1253            id: &'a str,
1254            data: &'a serde_json::Value,
1255        },
1256        Delete {
1257            entity: &'a str,
1258            id: &'a str,
1259        },
1260    }
1261
1262    /// Result of a single op inside a transaction.
1263    #[derive(Debug, Clone)]
1264    pub enum TxResult {
1265        Inserted(String),
1266        Updated(bool),
1267        Deleted(bool),
1268    }
1269
1270    impl LivePostgresAdapter {
1271        /// Run `ops` inside a single Postgres transaction. Either all of them
1272        /// commit together or none of them do — there is no partial state on
1273        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1274        /// guard is dropped without `commit()` being called.
1275        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1276            let mut tx = self.client.transaction().map_err(pg_err)?;
1277            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1278
1279            for op in ops {
1280                match op {
1281                    TxOp::Insert { entity, data } => {
1282                        let (sql, values) = build_insert_sql(entity, data)?;
1283                        let id = match &values[0] {
1284                            JsonParam::Text(s) => s.clone(),
1285                            _ => {
1286                                return Err(StorageError {
1287                                    code: "PG_INTERNAL".into(),
1288                                    message: "build_insert_sql produced non-text id param".into(),
1289                                });
1290                            }
1291                        };
1292                        let params = as_pg_params(&values);
1293                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1294                        results.push(TxResult::Inserted(id));
1295                    }
1296                    TxOp::Update { entity, id, data } => {
1297                        let (sql, values) = build_update_sql(entity, id, data)?;
1298                        let params = as_pg_params(&values);
1299                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1300                        results.push(TxResult::Updated(n > 0));
1301                    }
1302                    TxOp::Delete { entity, id } => {
1303                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1304                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1305                        results.push(TxResult::Deleted(n > 0));
1306                    }
1307                }
1308            }
1309
1310            tx.commit().map_err(pg_err)?;
1311            Ok(results)
1312        }
1313    }
1314
1315    /// Lift a JSON value into a typed Postgres parameter. The previous
1316    /// implementation collapsed everything to `String`, which silently
1317    /// stringified ints/bools and turned JSON `null` into `""` for
1318    /// nullable columns. Forwarding through `JsonParam` keeps the column
1319    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1320    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1321        JsonParam::from_json(v)
1322    }
1323
1324    fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1325        use postgres::types::Type;
1326        let mut obj = serde_json::Map::new();
1327        for (i, col) in row.columns().iter().enumerate() {
1328            let name = col.name().to_string();
1329
1330            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1331            // and a panic in a query handler poisons the connection mutex,
1332            // taking down all subsequent reads on this datastore. Anything
1333            // that fails to decode becomes Null with a one-shot warning.
1334            //
1335            // Timestamps and the catch-all path explicitly DON'T request
1336            // `String` — the postgres crate uses binary protocol by default
1337            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1338            // ask for `Vec<u8>` and lossy-stringify, which works for all
1339            // text-shaped columns in either protocol.
1340            let value: serde_json::Value = match *col.type_() {
1341                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1342                    .flatten()
1343                    .map(serde_json::Value::Bool)
1344                    .unwrap_or(serde_json::Value::Null),
1345                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1346                    .flatten()
1347                    .map(|v| serde_json::Value::Number(v.into()))
1348                    .unwrap_or(serde_json::Value::Null),
1349                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1350                    .flatten()
1351                    .map(|v| serde_json::Value::Number(v.into()))
1352                    .unwrap_or(serde_json::Value::Null),
1353                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1354                    .flatten()
1355                    .map(|v| serde_json::Value::Number(v.into()))
1356                    .unwrap_or(serde_json::Value::Null),
1357                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1358                    .flatten()
1359                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1360                    .map(serde_json::Value::Number)
1361                    .unwrap_or(serde_json::Value::Null),
1362                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1363                    .flatten()
1364                    .and_then(serde_json::Number::from_f64)
1365                    .map(serde_json::Value::Number)
1366                    .unwrap_or(serde_json::Value::Null),
1367                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1368                    .flatten()
1369                    .unwrap_or(serde_json::Value::Null),
1370                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1371                    .flatten()
1372                    .map(|b| serde_json::Value::String(b64(&b)))
1373                    .unwrap_or(serde_json::Value::Null),
1374                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1375                    try_get_or_null::<Option<String>>(row, i)
1376                        .flatten()
1377                        .map(serde_json::Value::String)
1378                        .unwrap_or(serde_json::Value::Null)
1379                }
1380                Type::TIMESTAMPTZ => {
1381                    // Decode via chrono::DateTime<Utc> (postgres's
1382                    // `with-chrono-0_4` feature provides FromSql) and
1383                    // re-format as ISO 8601 — the shape pylon's clients
1384                    // expect (matches `pylon_kernel::util::now_iso`,
1385                    // so timestamps round-trip with the same surface
1386                    // across SQLite + PG).
1387                    try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1388                        .flatten()
1389                        .map(|dt| {
1390                            serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1391                        })
1392                        .unwrap_or(serde_json::Value::Null)
1393                }
1394                Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1395                    .flatten()
1396                    .map(|dt| {
1397                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1398                    })
1399                    .unwrap_or(serde_json::Value::Null),
1400                Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1401                    .flatten()
1402                    .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1403                    .unwrap_or(serde_json::Value::Null),
1404                _ => {
1405                    // Last resort: ask Postgres to render anything else as
1406                    // text via a stringifying decode through Vec<u8>. If even
1407                    // that fails (rare — Postgres types not implementing the
1408                    // text format), fall through to Null with a warning.
1409                    match row.try_get::<_, Option<String>>(i) {
1410                        Ok(Some(s)) => serde_json::Value::String(s),
1411                        Ok(None) => serde_json::Value::Null,
1412                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1413                            Ok(Some(bytes)) => serde_json::Value::String(
1414                                String::from_utf8_lossy(&bytes).into_owned(),
1415                            ),
1416                            _ => serde_json::Value::Null,
1417                        },
1418                    }
1419                }
1420            };
1421            obj.insert(name, value);
1422        }
1423        serde_json::Value::Object(obj)
1424    }
1425
1426    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1427    where
1428        T: postgres::types::FromSql<'a>,
1429    {
1430        match row.try_get::<_, T>(i) {
1431            Ok(v) => Some(v),
1432            Err(e) => {
1433                tracing::warn!(
1434                    "[postgres] decode failed for column {} ({}): {e}",
1435                    i,
1436                    row.columns()[i].name()
1437                );
1438                None
1439            }
1440        }
1441    }
1442
1443    /// Minimal base64 encoder so we don't need another dependency just for
1444    /// the BYTEA column edge case.
1445    fn b64(bytes: &[u8]) -> String {
1446        const TABLE: &[u8; 64] =
1447            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1448        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1449        let chunks = bytes.chunks(3);
1450        for chunk in chunks {
1451            let b = [
1452                chunk.first().copied().unwrap_or(0),
1453                chunk.get(1).copied().unwrap_or(0),
1454                chunk.get(2).copied().unwrap_or(0),
1455            ];
1456            out.push(TABLE[(b[0] >> 2) as usize] as char);
1457            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1458            if chunk.len() > 1 {
1459                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1460            } else {
1461                out.push('=');
1462            }
1463            if chunk.len() > 2 {
1464                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1465            } else {
1466                out.push('=');
1467            }
1468        }
1469        out
1470    }
1471
1472    fn pg_err(e: postgres::Error) -> StorageError {
1473        // postgres::Error's Display is intentionally short ("db error",
1474        // "connection error" etc.) — the actual SQLSTATE / detail lives
1475        // on the source chain. Walk the chain so the final message has
1476        // enough signal to debug a failed insert/update without
1477        // attaching a debugger.
1478        use std::error::Error;
1479        let mut detail = format!("{e}");
1480        let mut src: Option<&dyn Error> = e.source();
1481        while let Some(s) = src {
1482            detail.push_str(": ");
1483            detail.push_str(&format!("{s}"));
1484            src = s.source();
1485        }
1486        StorageError {
1487            code: "PG_QUERY_FAILED".into(),
1488            message: format!("Postgres query failed: {detail}"),
1489        }
1490    }
1491}
1492
1493// ---------------------------------------------------------------------------
1494// Tests
1495// ---------------------------------------------------------------------------
1496
1497#[cfg(test)]
1498mod tests {
1499    use super::*;
1500
1501    /// Hand-rolled fixture that matches the snapshots in the tests
1502    /// below. Decoupled from any example's `pylon.manifest.json` so
1503    /// changing an example schema doesn't bleed into adapter tests.
1504    fn test_manifest() -> AppManifest {
1505        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1506        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1507            name: name.into(),
1508            field_type: ty.into(),
1509            optional: opt,
1510            unique: uniq,
1511            crdt: None,
1512        };
1513        AppManifest {
1514            manifest_version: 1,
1515            name: "test".into(),
1516            version: "0.0.0".into(),
1517            entities: vec![
1518                ManifestEntity {
1519                    name: "User".into(),
1520                    fields: vec![
1521                        f("email", "string", false, true),
1522                        f("displayName", "string", false, false),
1523                        f("createdAt", "datetime", false, false),
1524                    ],
1525                    indexes: vec![],
1526                    relations: vec![],
1527                    search: None,
1528                    crdt: true,
1529                },
1530                ManifestEntity {
1531                    name: "Todo".into(),
1532                    fields: vec![
1533                        f("title", "string", false, false),
1534                        f("done", "bool", false, false),
1535                        f("userId", "id(User)", false, false),
1536                        f("createdAt", "datetime", false, false),
1537                    ],
1538                    indexes: vec![ManifestIndex {
1539                        name: "by_user".into(),
1540                        fields: vec!["userId".into()],
1541                        unique: false,
1542                    }],
1543                    relations: vec![],
1544                    search: None,
1545                    crdt: true,
1546                },
1547            ],
1548            queries: vec![],
1549            actions: vec![],
1550            policies: vec![],
1551            routes: vec![],
1552            auth: Default::default(),
1553        }
1554    }
1555
1556    #[test]
1557    fn pg_type_mapping() {
1558        assert_eq!(pg_column_type("string"), "TEXT");
1559        assert_eq!(pg_column_type("int"), "INTEGER");
1560        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1561        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1562        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1563        assert_eq!(pg_column_type("richtext"), "TEXT");
1564        assert_eq!(pg_column_type("id(User)"), "TEXT");
1565    }
1566
1567    #[test]
1568    fn quote_ident_simple() {
1569        assert_eq!(quote_ident("User"), "\"User\"");
1570        assert_eq!(quote_ident("email"), "\"email\"");
1571    }
1572
1573    #[test]
1574    fn quote_ident_escapes_embedded_double_quotes() {
1575        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1576        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1577    }
1578
1579    #[test]
1580    fn create_table_sql_basic() {
1581        let fields = vec![
1582            FieldSpec {
1583                name: "email".into(),
1584                field_type: "string".into(),
1585                optional: false,
1586                unique: true,
1587            },
1588            FieldSpec {
1589                name: "age".into(),
1590                field_type: "int".into(),
1591                optional: true,
1592                unique: false,
1593            },
1594        ];
1595        let sql = create_table_sql("User", &fields);
1596        assert_eq!(
1597            sql,
1598            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
1599        );
1600    }
1601
1602    #[test]
1603    fn create_table_sql_escapes_identifiers() {
1604        let fields = vec![FieldSpec {
1605            name: "col\"x".into(),
1606            field_type: "string".into(),
1607            optional: false,
1608            unique: false,
1609        }];
1610        let sql = create_table_sql("my\"table", &fields);
1611        assert!(sql.contains("\"my\"\"table\""));
1612        assert!(sql.contains("\"col\"\"x\""));
1613    }
1614
1615    #[test]
1616    fn create_index_sql_unique() {
1617        let sql = create_index_sql("User", "by_email", &["email".into()], true);
1618        assert_eq!(
1619            sql,
1620            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
1621        );
1622    }
1623
1624    #[test]
1625    fn create_index_sql_non_unique() {
1626        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
1627        assert_eq!(
1628            sql,
1629            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
1630        );
1631    }
1632
1633    #[test]
1634    fn add_column_sql_basic() {
1635        let field = FieldSpec {
1636            name: "bio".into(),
1637            field_type: "string".into(),
1638            optional: true,
1639            unique: false,
1640        };
1641        let sql = add_column_sql("User", &field);
1642        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
1643    }
1644
1645    #[test]
1646    fn plan_from_manifest() {
1647        let adapter = PostgresAdapter;
1648        let manifest = test_manifest();
1649        let plan = adapter.plan_schema(&manifest).unwrap();
1650
1651        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
1652        assert!(plan.operations.iter().any(|op| matches!(
1653            op,
1654            SchemaOperation::CreateEntity { name, .. } if name == "User"
1655        )));
1656        assert!(plan.operations.iter().any(|op| matches!(
1657            op,
1658            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1659        )));
1660        assert!(plan.operations.iter().any(|op| matches!(
1661            op,
1662            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1663        )));
1664    }
1665
1666    #[test]
1667    fn plan_to_sql_produces_statements() {
1668        let adapter = PostgresAdapter;
1669        let manifest = test_manifest();
1670        let plan = adapter.plan_schema(&manifest).unwrap();
1671        let stmts = plan_to_sql(&plan).unwrap();
1672
1673        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
1674        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
1675        // declares a unique by_email index on User which lands as part of
1676        // the table. Final count: 2 tables + 2 indexes.
1677        let create_tables = stmts
1678            .iter()
1679            .filter(|s| s.starts_with("CREATE TABLE"))
1680            .count();
1681        let create_indexes = stmts
1682            .iter()
1683            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
1684            .count();
1685        assert_eq!(create_tables, 2);
1686        assert!(create_indexes >= 1);
1687        assert!(stmts[0].starts_with("CREATE TABLE"));
1688        assert!(stmts[1].starts_with("CREATE TABLE"));
1689    }
1690
1691    #[test]
1692    fn plan_to_sql_rejects_unsupported() {
1693        let plan = SchemaPlan {
1694            operations: vec![SchemaOperation::RemoveEntity {
1695                name: "User".into(),
1696            }],
1697        };
1698        let result = plan_to_sql(&plan);
1699        assert!(result.is_err());
1700        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
1701    }
1702
1703    #[test]
1704    fn apply_not_implemented() {
1705        let adapter = PostgresAdapter;
1706        let plan = SchemaPlan {
1707            operations: vec![SchemaOperation::Noop],
1708        };
1709        let result = adapter.apply_schema(&plan);
1710        assert!(result.is_err());
1711        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
1712    }
1713
1714    #[test]
1715    fn sql_uses_quoted_identifiers() {
1716        let fields = vec![FieldSpec {
1717            name: "createdAt".into(),
1718            field_type: "datetime".into(),
1719            optional: false,
1720            unique: false,
1721        }];
1722        let sql = create_table_sql("User", &fields);
1723        // Postgres identifiers should be quoted for case-sensitivity.
1724        assert!(sql.contains("\"User\""));
1725        assert!(sql.contains("\"createdAt\""));
1726        assert!(sql.contains("TIMESTAMPTZ"));
1727    }
1728
1729    // -- Introspection SQL tests --
1730
1731    #[test]
1732    fn introspect_sql_constants_are_valid() {
1733        // Sanity checks that the SQL strings exist and look reasonable.
1734        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
1735        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
1736        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
1737        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
1738    }
1739
1740    // -- Plan from snapshot tests --
1741
1742    #[test]
1743    fn plan_from_empty_snapshot_creates_all() {
1744        let snapshot = crate::SchemaSnapshot { tables: vec![] };
1745        let manifest = test_manifest();
1746        let plan = plan_from_snapshot(&snapshot, &manifest);
1747
1748        assert!(plan.operations.iter().any(|op| matches!(
1749            op,
1750            SchemaOperation::CreateEntity { name, .. } if name == "User"
1751        )));
1752        assert!(plan.operations.iter().any(|op| matches!(
1753            op,
1754            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
1755        )));
1756        assert!(plan.operations.iter().any(|op| matches!(
1757            op,
1758            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
1759        )));
1760    }
1761
1762    #[test]
1763    fn plan_from_full_snapshot_is_noop() {
1764        let snapshot = crate::SchemaSnapshot {
1765            tables: vec![
1766                crate::TableSnapshot {
1767                    name: "User".into(),
1768                    columns: vec![
1769                        crate::ColumnSnapshot {
1770                            name: "id".into(),
1771                            column_type: "TEXT".into(),
1772                            notnull: true,
1773                            primary_key: true,
1774                        },
1775                        crate::ColumnSnapshot {
1776                            name: "email".into(),
1777                            column_type: "TEXT".into(),
1778                            notnull: true,
1779                            primary_key: false,
1780                        },
1781                        crate::ColumnSnapshot {
1782                            name: "displayName".into(),
1783                            column_type: "TEXT".into(),
1784                            notnull: true,
1785                            primary_key: false,
1786                        },
1787                        crate::ColumnSnapshot {
1788                            name: "createdAt".into(),
1789                            column_type: "TIMESTAMPTZ".into(),
1790                            notnull: true,
1791                            primary_key: false,
1792                        },
1793                    ],
1794                    indexes: vec![],
1795                },
1796                crate::TableSnapshot {
1797                    name: "Todo".into(),
1798                    columns: vec![
1799                        crate::ColumnSnapshot {
1800                            name: "id".into(),
1801                            column_type: "TEXT".into(),
1802                            notnull: true,
1803                            primary_key: true,
1804                        },
1805                        crate::ColumnSnapshot {
1806                            name: "title".into(),
1807                            column_type: "TEXT".into(),
1808                            notnull: true,
1809                            primary_key: false,
1810                        },
1811                        crate::ColumnSnapshot {
1812                            name: "done".into(),
1813                            column_type: "BOOLEAN".into(),
1814                            notnull: true,
1815                            primary_key: false,
1816                        },
1817                        crate::ColumnSnapshot {
1818                            name: "userId".into(),
1819                            column_type: "TEXT".into(),
1820                            notnull: true,
1821                            primary_key: false,
1822                        },
1823                        crate::ColumnSnapshot {
1824                            name: "createdAt".into(),
1825                            column_type: "TIMESTAMPTZ".into(),
1826                            notnull: true,
1827                            primary_key: false,
1828                        },
1829                    ],
1830                    indexes: vec![crate::IndexSnapshot {
1831                        name: "Todo_by_user".into(),
1832                        columns: vec!["userId".into()],
1833                        unique: false,
1834                    }],
1835                },
1836            ],
1837        };
1838        let manifest = test_manifest();
1839        let plan = plan_from_snapshot(&snapshot, &manifest);
1840        assert!(plan.is_empty());
1841    }
1842
1843    #[test]
1844    fn plan_detects_missing_column_in_snapshot() {
1845        let snapshot = crate::SchemaSnapshot {
1846            tables: vec![
1847                crate::TableSnapshot {
1848                    name: "User".into(),
1849                    columns: vec![
1850                        crate::ColumnSnapshot {
1851                            name: "id".into(),
1852                            column_type: "TEXT".into(),
1853                            notnull: true,
1854                            primary_key: true,
1855                        },
1856                        crate::ColumnSnapshot {
1857                            name: "email".into(),
1858                            column_type: "TEXT".into(),
1859                            notnull: true,
1860                            primary_key: false,
1861                        },
1862                        // missing displayName and createdAt
1863                    ],
1864                    indexes: vec![],
1865                },
1866                crate::TableSnapshot {
1867                    name: "Todo".into(),
1868                    columns: vec![
1869                        crate::ColumnSnapshot {
1870                            name: "id".into(),
1871                            column_type: "TEXT".into(),
1872                            notnull: true,
1873                            primary_key: true,
1874                        },
1875                        crate::ColumnSnapshot {
1876                            name: "title".into(),
1877                            column_type: "TEXT".into(),
1878                            notnull: true,
1879                            primary_key: false,
1880                        },
1881                        crate::ColumnSnapshot {
1882                            name: "done".into(),
1883                            column_type: "BOOLEAN".into(),
1884                            notnull: true,
1885                            primary_key: false,
1886                        },
1887                        crate::ColumnSnapshot {
1888                            name: "userId".into(),
1889                            column_type: "TEXT".into(),
1890                            notnull: true,
1891                            primary_key: false,
1892                        },
1893                        crate::ColumnSnapshot {
1894                            name: "createdAt".into(),
1895                            column_type: "TIMESTAMPTZ".into(),
1896                            notnull: true,
1897                            primary_key: false,
1898                        },
1899                    ],
1900                    indexes: vec![crate::IndexSnapshot {
1901                        name: "Todo_by_user".into(),
1902                        columns: vec!["userId".into()],
1903                        unique: false,
1904                    }],
1905                },
1906            ],
1907        };
1908        let manifest = test_manifest();
1909        let plan = plan_from_snapshot(&snapshot, &manifest);
1910
1911        let add_fields: Vec<_> = plan
1912            .operations
1913            .iter()
1914            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
1915            .collect();
1916        assert_eq!(add_fields.len(), 2); // displayName + createdAt
1917    }
1918
1919    // -- CRUD helper tests (no live database required) --
1920
1921    #[test]
1922    fn json_value_to_string_handles_all_types() {
1923        assert_eq!(
1924            json_value_to_string(&serde_json::Value::String("hello".into())),
1925            "hello"
1926        );
1927        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
1928        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
1929        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
1930        assert_eq!(
1931            json_value_to_string(&serde_json::Value::Bool(false)),
1932            "false"
1933        );
1934        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
1935        // Arrays and objects get their JSON representation.
1936        assert_eq!(
1937            json_value_to_string(&serde_json::json!([1, 2, 3])),
1938            "[1,2,3]"
1939        );
1940        assert_eq!(
1941            json_value_to_string(&serde_json::json!({"a": 1})),
1942            "{\"a\":1}"
1943        );
1944    }
1945
1946    #[test]
1947    fn generate_id_returns_hex_string() {
1948        let id = generate_id();
1949        assert!(!id.is_empty());
1950        // Must be valid hex characters.
1951        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1952    }
1953
1954    #[test]
1955    fn generate_id_is_unique_across_calls() {
1956        let id1 = generate_id();
1957        let id2 = generate_id();
1958        assert_ne!(id1, id2);
1959    }
1960
1961    #[test]
1962    fn generate_id_is_lex_sortable() {
1963        // 1000 IDs back-to-back must come out in monotonically increasing
1964        // lexicographic order. This is what makes cursor pagination correct.
1965        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
1966        let sorted = {
1967            let mut s = ids.clone();
1968            s.sort();
1969            s
1970        };
1971        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
1972        // And every id must be the same width (otherwise lex comparison is
1973        // wrong at width boundaries).
1974        let len0 = ids[0].len();
1975        assert!(ids.iter().all(|id| id.len() == len0));
1976        ids.dedup();
1977        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
1978    }
1979
1980    #[test]
1981    fn build_insert_sql_simple() {
1982        let data = serde_json::json!({
1983            "email": "alice@example.com",
1984            "displayName": "Alice"
1985        });
1986        let (sql, values) = build_insert_sql("User", &data).unwrap();
1987
1988        assert!(sql.starts_with("INSERT INTO \"User\""));
1989        assert!(sql.contains("id"));
1990        assert!(sql.contains("$1"));
1991        assert!(sql.contains("$2"));
1992        assert!(sql.contains("$3"));
1993        // First value is the generated ID — JsonParam::Text variant.
1994        match &values[0] {
1995            JsonParam::Text(s) => assert!(!s.is_empty()),
1996            other => panic!("expected Text id param, got {other:?}"),
1997        }
1998        assert_eq!(values.len(), 3); // id + 2 fields
1999    }
2000
2001    #[test]
2002    fn build_insert_sql_preserves_json_types() {
2003        let data = serde_json::json!({
2004            "n": 42,
2005            "f": 1.5,
2006            "b": true,
2007            "s": "hi",
2008            "z": null,
2009        });
2010        let (_sql, values) = build_insert_sql("T", &data).unwrap();
2011        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
2012        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2013        assert!(matches!(kinds[0], JsonParam::Bool(true)));
2014        assert!(matches!(kinds[1], JsonParam::Float(_)));
2015        assert!(matches!(kinds[2], JsonParam::Int(42)));
2016        assert!(matches!(kinds[3], JsonParam::Text(_)));
2017        assert!(matches!(kinds[4], JsonParam::Null));
2018    }
2019
2020    #[test]
2021    fn build_insert_sql_quotes_column_names() {
2022        let data = serde_json::json!({"createdAt": "2026-01-01"});
2023        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2024        assert!(sql.contains("\"createdAt\""));
2025        assert!(sql.contains("\"Todo\""));
2026    }
2027
2028    #[test]
2029    fn build_insert_sql_rejects_non_object() {
2030        let data = serde_json::json!("not an object");
2031        let result = build_insert_sql("User", &data);
2032        assert!(result.is_err());
2033        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2034    }
2035
2036    #[test]
2037    fn build_update_sql_simple() {
2038        let data = serde_json::json!({
2039            "displayName": "Bob",
2040            "email": "bob@example.com"
2041        });
2042        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2043
2044        assert!(sql.starts_with("UPDATE \"User\" SET"));
2045        assert!(sql.contains("WHERE id = $1"));
2046        assert!(sql.contains("$2"));
2047        assert!(sql.contains("$3"));
2048        match &values[0] {
2049            JsonParam::Text(s) => assert_eq!(s, "abc123"),
2050            other => panic!("expected Text id param, got {other:?}"),
2051        }
2052        assert_eq!(values.len(), 3); // id + 2 fields
2053    }
2054
2055    #[test]
2056    fn build_update_sql_quotes_column_names() {
2057        let data = serde_json::json!({"displayName": "Carol"});
2058        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2059        assert!(sql.contains("\"displayName\" = $2"));
2060    }
2061
2062    #[test]
2063    fn build_update_sql_rejects_non_object() {
2064        let data = serde_json::json!(42);
2065        let result = build_update_sql("User", "id1", &data);
2066        assert!(result.is_err());
2067        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2068    }
2069
2070    #[test]
2071    fn build_update_sql_rejects_empty_object() {
2072        let data = serde_json::json!({});
2073        let err = build_update_sql("User", "id1", &data).unwrap_err();
2074        assert_eq!(err.code, "PG_INVALID_DATA");
2075        assert!(err.message.contains("at least one field"));
2076    }
2077}