Skip to main content

pylon_storage/
postgres.rs

1use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
2use pylon_kernel::AppManifest;
3
4// ---------------------------------------------------------------------------
5// Type mapping: manifest field types -> PostgreSQL column types
6//
7//   string    -> TEXT
8//   int       -> INTEGER
9//   float     -> DOUBLE PRECISION
10//   bool      -> BOOLEAN
11//   datetime  -> TIMESTAMPTZ
12//   richtext  -> TEXT
13//   id(...)   -> TEXT
14// ---------------------------------------------------------------------------
15
16fn pg_column_type(field_type: &str) -> &'static str {
17    match field_type {
18        "string" => "TEXT",
19        "int" => "INTEGER",
20        "float" => "DOUBLE PRECISION",
21        "bool" => "BOOLEAN",
22        "datetime" => "TIMESTAMPTZ",
23        "richtext" => "TEXT",
24        _ if field_type.starts_with("id(") => "TEXT",
25        _ => "TEXT",
26    }
27}
28
29// ---------------------------------------------------------------------------
30// Identifier quoting
31// ---------------------------------------------------------------------------
32
33/// Quote a SQL identifier, escaping embedded double-quotes by doubling them.
34///
35/// PostgreSQL standard: `"foo""bar"` represents the identifier `foo"bar`.
36pub(crate) fn quote_ident(name: &str) -> String {
37    format!("\"{}\"", name.replace('"', "\"\""))
38}
39
40/// Public re-export for sibling modules (e.g. `pg_tx_store`) that need
41/// to build SQL strings against the same dialect rules. Keeping the
42/// crate-private function untouched preserves the existing call sites
43/// inside this module.
44#[cfg(feature = "postgres-live")]
45pub fn quote_ident_pub(name: &str) -> String {
46    quote_ident(name)
47}
48
49/// Public re-export of [`live::row_to_json`] for sibling modules. Used
50/// by `pg_tx_store` so transactional reads return rows in the same
51/// JSON shape as the non-transactional path.
52#[cfg(feature = "postgres-live")]
53pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
54    live::row_to_json(row)
55}
56
57/// Public re-export of the filter builder so the in-tx PgTxStore can
58/// reuse the same operator surface ($eq, $like, $in, $order, $limit,
59/// $offset, $not, $gt, $gte, $lt, $lte) as the non-tx path. Returns
60/// `(sql, params)` ready to feed to either `client.query` or
61/// `transaction.query`.
62#[cfg(feature = "postgres-live")]
63pub fn build_query_filtered_sql_pub(
64    entity: &str,
65    filter: &serde_json::Value,
66    valid_columns: &[String],
67) -> Result<(String, Vec<JsonParam>), StorageError> {
68    live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
69}
70
71/// Public re-export of the aggregate builder. Returns
72/// `(sql, params, column_names)` — the column_names list drives the
73/// per-row projection in `aggregate_rows_to_json_pub`.
74#[cfg(feature = "postgres-live")]
75pub fn build_aggregate_sql_pub(
76    entity: &str,
77    spec: &serde_json::Value,
78    valid_columns: &[String],
79) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
80    live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
81}
82
83/// Public re-export of the post-processing helper that converts raw
84/// aggregate `Row`s into the `{ rows: [{...}] }` JSON shape.
85#[cfg(feature = "postgres-live")]
86pub fn aggregate_rows_to_json_pub(
87    rows: &[postgres::Row],
88    column_names: &[String],
89) -> serde_json::Value {
90    live::aggregate_rows_to_json(rows, column_names)
91}
92
93// ---------------------------------------------------------------------------
94// SQL generation
95// ---------------------------------------------------------------------------
96
97/// Generate a Postgres CREATE TABLE statement.
98pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
99    let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
100
101    for field in fields {
102        let col_type = pg_column_type(&field.field_type);
103        let not_null = if field.optional { "" } else { " NOT NULL" };
104        let unique = if field.unique { " UNIQUE" } else { "" };
105        columns.push(format!(
106            "{} {}{}{}",
107            quote_ident(&field.name),
108            col_type,
109            not_null,
110            unique
111        ));
112    }
113
114    format!(
115        "CREATE TABLE IF NOT EXISTS {} ({})",
116        quote_ident(entity_name),
117        columns.join(", ")
118    )
119}
120
121/// Generate a Postgres ALTER TABLE ADD COLUMN statement.
122/// NOT NULL is omitted on ADD COLUMN to avoid requiring DEFAULT values.
123/// Required-ness is tracked in the manifest; enforcement deferred.
124pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
125    let col_type = pg_column_type(&field.field_type);
126    let unique = if field.unique { " UNIQUE" } else { "" };
127    format!(
128        "ALTER TABLE {} ADD COLUMN {} {}{}",
129        quote_ident(entity_name),
130        quote_ident(&field.name),
131        col_type,
132        unique
133    )
134}
135
136/// Generate a Postgres CREATE INDEX statement.
137pub fn create_index_sql(
138    entity_name: &str,
139    index_name: &str,
140    fields: &[String],
141    unique: bool,
142) -> String {
143    let unique_str = if unique { "UNIQUE " } else { "" };
144    let full_index_name = format!("{}_{}", entity_name, index_name);
145    let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
146    format!(
147        "CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
148        unique_str,
149        quote_ident(&full_index_name),
150        quote_ident(entity_name),
151        quoted_fields.join(", ")
152    )
153}
154
155// ---------------------------------------------------------------------------
156// PostgresAdapter — planning-only adapter
157// ---------------------------------------------------------------------------
158
159/// A Postgres storage adapter. Currently supports planning only.
160/// No live connection — SQL generation and planning from manifest.
161pub struct PostgresAdapter;
162
163impl StorageAdapter for PostgresAdapter {
164    fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
165        // Plan from empty baseline.
166        let mut operations = Vec::new();
167
168        for entity in &target.entities {
169            let fields: Vec<FieldSpec> = entity
170                .fields
171                .iter()
172                .map(|f| FieldSpec {
173                    name: f.name.clone(),
174                    field_type: f.field_type.clone(),
175                    optional: f.optional,
176                    unique: f.unique,
177                })
178                .collect();
179
180            operations.push(SchemaOperation::CreateEntity {
181                name: entity.name.clone(),
182                fields,
183            });
184
185            for index in &entity.indexes {
186                operations.push(SchemaOperation::AddIndex {
187                    entity: entity.name.clone(),
188                    name: index.name.clone(),
189                    fields: index.fields.clone(),
190                    unique: index.unique,
191                });
192            }
193        }
194
195        if operations.is_empty() {
196            operations.push(SchemaOperation::Noop);
197        }
198
199        Ok(SchemaPlan { operations })
200    }
201
202    // apply_schema intentionally not implemented — uses default trait error.
203}
204
205/// Generate all SQL statements for a plan, in order.
206/// Useful for dry-run preview of what Postgres DDL would be executed.
207pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
208    let mut statements = Vec::new();
209
210    for op in &plan.operations {
211        match op {
212            SchemaOperation::CreateEntity { name, fields } => {
213                statements.push(create_table_sql(name, fields));
214            }
215            SchemaOperation::AddField { entity, field } => {
216                statements.push(add_column_sql(entity, field));
217            }
218            SchemaOperation::AlterField {
219                entity,
220                previous,
221                target,
222            } => {
223                // Only nullable transitions today. SET / DROP NOT NULL is
224                // safe on a populated table when going from required →
225                // optional (existing rows already satisfy NOT NULL); the
226                // reverse direction (optional → required) succeeds only
227                // if every row has a non-null value, which the planner
228                // has no way to know — Postgres will fail the migration
229                // if it can't and the operator gets a clear error from
230                // the apply step.
231                if previous.optional && !target.optional {
232                    statements.push(format!(
233                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
234                        quote_ident(entity),
235                        quote_ident(&target.name)
236                    ));
237                } else if !previous.optional && target.optional {
238                    statements.push(format!(
239                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
240                        quote_ident(entity),
241                        quote_ident(&target.name)
242                    ));
243                }
244                // Nothing emitted when neither nullable nor type changed
245                // — falls through silently. AlterField with no actual
246                // shape change shouldn't happen in practice (the planner
247                // only emits it on real drift), but guard against
248                // emitting empty SQL just in case.
249            }
250            SchemaOperation::AddIndex {
251                entity,
252                name,
253                fields,
254                unique,
255            } => {
256                statements.push(create_index_sql(entity, name, fields, *unique));
257            }
258            SchemaOperation::CreateSearchIndex { entity, config } => {
259                #[cfg(feature = "postgres-live")]
260                {
261                    statements.extend(crate::pg_search::create_search_index_sql(entity, config));
262                }
263                #[cfg(not(feature = "postgres-live"))]
264                {
265                    let _ = (entity, config);
266                    return Err(StorageError {
267                        code: "PG_SEARCH_FEATURE_OFF".into(),
268                        message: "CreateSearchIndex requires the `postgres-live` feature".into(),
269                    });
270                }
271            }
272            SchemaOperation::RemoveSearchIndex { entity } => {
273                // Without the original config we don't know which
274                // facet/sort indexes were created. Drop the FTS table
275                // and the GIN index by their fixed names; per-field
276                // facet/sort indexes are dropped automatically by the
277                // entity DROP path. Operators removing search via
278                // schema diff will see leftover indexes only if the
279                // entity table itself still exists — at which point
280                // the next CREATE INDEX IF NOT EXISTS catches up.
281                //
282                // quote_ident on the synthetic table/index names so a
283                // malicious entity name with embedded `"` can't break
284                // out of the identifier.
285                statements.push(format!(
286                    "DROP TABLE IF EXISTS {} CASCADE",
287                    quote_ident(&format!("_fts_{entity}"))
288                ));
289                statements.push(format!(
290                    "DROP INDEX IF EXISTS {}",
291                    quote_ident(&format!("{entity}_fts_gin"))
292                ));
293            }
294            SchemaOperation::Noop => {}
295            other => {
296                return Err(StorageError {
297                    code: "PG_OP_UNSUPPORTED".into(),
298                    message: format!("Operation not supported by Postgres adapter: {other:?}"),
299                });
300            }
301        }
302    }
303
304    Ok(statements)
305}
306
307// ---------------------------------------------------------------------------
308// Introspection SQL helpers
309//
310// These generate the SQL queries that a live Postgres connection would run
311// to read the current schema. No connection required — just SQL strings.
312// ---------------------------------------------------------------------------
313
314/// SQL to list user tables in the public schema.
315pub const INTROSPECT_TABLES_SQL: &str = "\
316    SELECT table_name \
317    FROM information_schema.tables \
318    WHERE table_schema = 'public' \
319      AND table_type = 'BASE TABLE' \
320      AND table_name NOT LIKE '_pylon_%' \
321    ORDER BY table_name";
322
323/// SQL to list columns for a given table.
324/// Use with parameter: table_name.
325pub const INTROSPECT_COLUMNS_SQL: &str = "\
326    SELECT column_name, data_type, is_nullable, \
327           (SELECT COUNT(*) FROM information_schema.table_constraints tc \
328            JOIN information_schema.key_column_usage kcu \
329              ON tc.constraint_name = kcu.constraint_name \
330            WHERE tc.table_name = c.table_name \
331              AND kcu.column_name = c.column_name \
332              AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
333    FROM information_schema.columns c \
334    WHERE table_schema = 'public' AND table_name = $1 \
335    ORDER BY ordinal_position";
336
337/// SQL to list indexes for a given table.
338/// Use with parameter: table_name.
339pub const INTROSPECT_INDEXES_SQL: &str = "\
340    SELECT i.relname as index_name, \
341           ix.indisunique as is_unique, \
342           array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
343    FROM pg_index ix \
344    JOIN pg_class t ON t.oid = ix.indrelid \
345    JOIN pg_class i ON i.oid = ix.indexrelid \
346    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
347    JOIN pg_namespace n ON n.oid = t.relnamespace \
348    WHERE n.nspname = 'public' \
349      AND t.relname = $1 \
350      AND NOT ix.indisprimary \
351    GROUP BY i.relname, ix.indisunique \
352    ORDER BY i.relname";
353
354/// Plan from a snapshot (reuses the shared plan_from_snapshot).
355/// This allows Postgres to plan incrementally once introspection data is available.
356pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
357    crate::plan_from_snapshot(snapshot, target)
358}
359
360// ---------------------------------------------------------------------------
361// CRUD SQL generation helpers (used by live adapter, testable without a DB)
362// ---------------------------------------------------------------------------
363
364/// Generate a lex-sortable, monotonic-ish unique ID.
365///
366/// Format: 32 hex chars of `as_nanos()` (zero-padded) followed by 8 hex chars
367/// of a per-process atomic counter. The counter prevents collisions when two
368/// inserts hit the same nanosecond and — critically — keeps order stable: an
369/// id minted at the same nanosecond is monotonically greater than the
370/// previous one. Width is fixed at 40 chars so lexicographic comparison
371/// matches creation order, which is what cursor pagination relies on.
372pub fn generate_id() -> String {
373    use std::sync::atomic::{AtomicU32, Ordering};
374    use std::time::{SystemTime, UNIX_EPOCH};
375    static COUNTER: AtomicU32 = AtomicU32::new(0);
376    let ts = SystemTime::now()
377        .duration_since(UNIX_EPOCH)
378        .unwrap_or_default()
379        .as_nanos();
380    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
381    format!("{ts:032x}{seq:08x}")
382}
383
384/// Convert a JSON value to its string representation for use as a SQL parameter.
385///
386/// Kept for back-compat with callers that need a textual fallback (e.g.
387/// human-readable logs). New code should bind through [`JsonParam`] so
388/// integers/booleans/nulls reach Postgres in their typed form instead of
389/// collapsing to TEXT, which the driver can't coerce into INTEGER /
390/// BOOLEAN / TIMESTAMPTZ columns and which silently turns JSON `null`
391/// into an empty string for FK columns.
392pub fn json_value_to_string(val: &serde_json::Value) -> String {
393    match val {
394        serde_json::Value::String(s) => s.clone(),
395        serde_json::Value::Number(n) => n.to_string(),
396        serde_json::Value::Bool(b) => b.to_string(),
397        serde_json::Value::Null => String::new(),
398        other => other.to_string(),
399    }
400}
401
402/// A typed wrapper around a JSON scalar that implements
403/// [`postgres::types::ToSql`] for INSERT/UPDATE parameters.
404///
405/// The previous implementation passed every value as `String`, which broke
406/// non-text columns (the postgres driver can't bind a string literal into
407/// an INTEGER / BOOLEAN / TIMESTAMPTZ slot) and silently turned JSON
408/// `null` into the empty string for nullable FKs (so `unlink` left
409/// dangling `""` references instead of NULL). `JsonParam` carries the
410/// JSON variant tag through to `to_sql` so the driver can pick the
411/// correct binary representation per column type.
412///
413/// JSON arrays/objects collapse to their JSON-string form (TEXT) — the
414/// runtime layer doesn't currently model array/object columns at the
415/// manifest level on Postgres, so anything that lands here came from
416/// caller-supplied prose that's expected to fit into a TEXT column.
417#[derive(Debug, Clone, PartialEq)]
418pub enum JsonParam {
419    Null,
420    Text(String),
421    Int(i64),
422    Float(f64),
423    Bool(bool),
424}
425
426impl JsonParam {
427    /// Lift a `serde_json::Value` into the typed parameter form. Numbers
428    /// that fit `i64` go through as Int; everything else goes through as
429    /// Float to preserve fractional / large-magnitude values.
430    pub fn from_json(val: &serde_json::Value) -> Self {
431        match val {
432            serde_json::Value::Null => JsonParam::Null,
433            serde_json::Value::String(s) => JsonParam::Text(s.clone()),
434            serde_json::Value::Bool(b) => JsonParam::Bool(*b),
435            serde_json::Value::Number(n) => {
436                if let Some(i) = n.as_i64() {
437                    JsonParam::Int(i)
438                } else if let Some(f) = n.as_f64() {
439                    JsonParam::Float(f)
440                } else {
441                    JsonParam::Text(n.to_string())
442                }
443            }
444            other => JsonParam::Text(other.to_string()),
445        }
446    }
447}
448
449#[cfg(feature = "postgres-live")]
450impl postgres::types::ToSql for JsonParam {
451    fn to_sql(
452        &self,
453        ty: &postgres::types::Type,
454        out: &mut bytes::BytesMut,
455    ) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
456        use postgres::types::Type;
457
458        // Null binds as SQL NULL regardless of the column's declared
459        // type — the postgres driver treats `IsNull::Yes` as a null
460        // value of the requested type.
461        if matches!(self, JsonParam::Null) {
462            return Ok(postgres::types::IsNull::Yes);
463        }
464
465        // Match each JsonParam variant against the COLUMN's declared
466        // type so the binary encoding actually fits the target slot.
467        // Postgres rejects "binary data of wrong size" if you bind an
468        // `i64` (BIGINT, 8 bytes) into an INTEGER (4 bytes) — which is
469        // exactly what the previous "everything is a String" path did
470        // on every non-TEXT column.
471        match (self, ty) {
472            (JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
473
474            (JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
475            (JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
476            (JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
477            (JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
478            (JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
479
480            (JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
481            (JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
482            (JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
483            (JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
484
485            (JsonParam::Text(s), &Type::TEXT)
486            | (JsonParam::Text(s), &Type::VARCHAR)
487            | (JsonParam::Text(s), &Type::BPCHAR)
488            | (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
489            (JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
490                // The runtime models datetimes as ISO 8601 strings
491                // (`pylon_kernel::util::now_iso` shape, plus
492                // user-supplied RFC 3339). Postgres's TIMESTAMPTZ
493                // binary wire format is `i64` microseconds since
494                // 2000-01-01 UTC — NOT the bytes of an ISO string.
495                // The previous impl bound via `&str::to_sql(TIMESTAMPTZ, ...)`,
496                // which advertised TIMESTAMPTZ format but wrote raw
497                // ASCII; Postgres rejected with "incorrect binary
498                // data format in bind parameter N". This was the
499                // OAuth-callback failure mode on pylon-cloud
500                // (User.createdAt). Parse via chrono and let the
501                // postgres crate's `with-chrono-0_4` ToSql impl
502                // emit the proper binary format.
503                let dt = chrono::DateTime::parse_from_rfc3339(s)
504                    .map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
505                    .with_timezone(&chrono::Utc);
506                dt.to_sql(ty, out)
507            }
508            (JsonParam::Text(s), &Type::TIMESTAMP) => {
509                // TIMESTAMP (no timezone) — same conversion shape but
510                // bind as NaiveDateTime so chrono picks the right
511                // binary encoding for the column.
512                let dt = chrono::DateTime::parse_from_rfc3339(s)
513                    .map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
514                    .with_timezone(&chrono::Utc)
515                    .naive_utc();
516                dt.to_sql(ty, out)
517            }
518            (JsonParam::Text(s), &Type::DATE) => {
519                let dt = chrono::DateTime::parse_from_rfc3339(s)
520                    .map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
521                    .with_timezone(&chrono::Utc)
522                    .date_naive();
523                dt.to_sql(ty, out)
524            }
525
526            // Cross-type fallback: render as text and bind into a TEXT
527            // slot, OR error if the target column doesn't accept text.
528            // Catches "manifest says INT but caller sent a stringified
529            // number" — better to fail loudly than silently coerce.
530            (other, _) => {
531                let s = match other {
532                    JsonParam::Bool(b) => b.to_string(),
533                    JsonParam::Int(n) => n.to_string(),
534                    JsonParam::Float(f) => f.to_string(),
535                    JsonParam::Text(s) => s.clone(),
536                    JsonParam::Null => unreachable!(),
537                };
538                s.to_sql(ty, out)
539            }
540        }
541    }
542
543    fn accepts(_ty: &postgres::types::Type) -> bool {
544        // Defer per-variant acceptance to to_sql_checked, which dispatches
545        // to the inner type's ToSql impl. Returning `true` here matches
546        // the postgres crate's recommended pattern for sum-type wrappers.
547        true
548    }
549
550    postgres::types::to_sql_checked!();
551}
552
553/// Build an INSERT SQL statement and collect typed parameter values.
554/// Returns `(sql, params)` where `params[0]` is the generated ID
555/// (always `JsonParam::Text`). Subsequent params carry the JSON-typed
556/// value so the postgres driver can bind them to typed columns
557/// (INTEGER / BOOLEAN / TIMESTAMPTZ / TEXT) and so JSON `null` reaches
558/// the database as SQL NULL — the previous string-collapsing path stored
559/// `""` for nullable FKs and broke any non-text column.
560pub fn build_insert_sql(
561    entity: &str,
562    data: &serde_json::Value,
563) -> Result<(String, Vec<JsonParam>), StorageError> {
564    let obj = data.as_object().ok_or_else(|| StorageError {
565        code: "PG_INVALID_DATA".into(),
566        message: "Insert data must be a JSON object".into(),
567    })?;
568
569    // Honor caller-supplied `id` (used by the CRDT path that needs to
570    // share the row id with the LoroDoc snapshot key it just wrote).
571    // Fall back to a fresh ULID-shaped id when absent. A non-string
572    // `id` value is rejected explicitly — silently regenerating would
573    // mask schema bugs (e.g. a caller passing an int id) and let the
574    // CRDT snapshot key drift from the materialized row id.
575    let id = match obj.get("id") {
576        None | Some(serde_json::Value::Null) => generate_id(),
577        Some(serde_json::Value::String(s)) => s.clone(),
578        Some(other) => {
579            return Err(StorageError {
580                code: "PG_INVALID_ID".into(),
581                message: format!(
582                    "Insert data carried a non-string `id` value: {other}. Pylon row ids \
583                     are always strings (40-char hex). Drop the `id` field to let the \
584                     server generate one, or supply a string."
585                ),
586            });
587        }
588    };
589
590    let mut col_names = vec!["id".to_string()];
591    let mut placeholders = vec!["$1".to_string()];
592    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
593
594    let mut i = 0usize;
595    for (key, val) in obj {
596        if key == "id" {
597            // Already emitted via the synthetic first column; skipping
598            // here avoids `INSERT ... (id, id, ...)` which Postgres
599            // rejects with `duplicate key value`.
600            continue;
601        }
602        col_names.push(quote_ident(key));
603        placeholders.push(format!("${}", i + 2));
604        values.push(JsonParam::from_json(val));
605        i += 1;
606    }
607
608    let sql = format!(
609        "INSERT INTO {} ({}) VALUES ({})",
610        quote_ident(entity),
611        col_names.join(", "),
612        placeholders.join(", ")
613    );
614
615    Ok((sql, values))
616}
617
618/// Build an UPDATE SQL statement and collect typed parameter values.
619/// Returns `(sql, params)` where `params[0]` is the row ID.
620pub fn build_update_sql(
621    entity: &str,
622    id: &str,
623    data: &serde_json::Value,
624) -> Result<(String, Vec<JsonParam>), StorageError> {
625    let obj = data.as_object().ok_or_else(|| StorageError {
626        code: "PG_INVALID_DATA".into(),
627        message: "Update data must be a JSON object".into(),
628    })?;
629
630    if obj.is_empty() {
631        return Err(StorageError {
632            code: "PG_INVALID_DATA".into(),
633            message: "Update data must contain at least one field".into(),
634        });
635    }
636
637    let mut set_clauses = Vec::new();
638    let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
639
640    let mut i = 0usize;
641    for (key, val) in obj {
642        if key == "id" {
643            // Reject primary-key mutation. Letting `id` into the SET
644            // clause lets a client move a row out from under its CRDT
645            // sidecar (which is keyed by the original row_id) and its
646            // FTS shadow row (FK-bound to the original id). The SQLite
647            // path silently drops `id` here too — keep the same
648            // shape, but errors so the caller sees the bug.
649            return Err(StorageError {
650                code: "PG_INVALID_UPDATE".into(),
651                message: "Updating the `id` column is not allowed — Pylon row ids are immutable; \
652                     drop the field from the patch."
653                    .into(),
654            });
655        }
656        set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
657        values.push(JsonParam::from_json(val));
658        i += 1;
659    }
660
661    if set_clauses.is_empty() {
662        return Err(StorageError {
663            code: "PG_INVALID_DATA".into(),
664            message: "Update data must contain at least one non-id field".into(),
665        });
666    }
667
668    let sql = format!(
669        "UPDATE {} SET {} WHERE id = $1",
670        quote_ident(entity),
671        set_clauses.join(", ")
672    );
673
674    Ok((sql, values))
675}
676
677/// Helper for the existing `Vec<JsonParam>` → `&[&dyn ToSql + Sync]` lift
678/// at insert/update/transact call sites. The postgres driver wants a
679/// slice of trait objects; this avoids repeating the same map at each site.
680#[cfg(feature = "postgres-live")]
681fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
682    values
683        .iter()
684        .map(|v| v as &(dyn postgres::types::ToSql + Sync))
685        .collect()
686}
687
688// ---------------------------------------------------------------------------
689// Live Postgres adapter (requires "postgres-live" feature)
690// ---------------------------------------------------------------------------
691
692#[cfg(feature = "postgres-live")]
693pub mod live {
694    use super::*;
695    use crate::{
696        ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
697    };
698
699    /// Connect to Postgres with the same TLS handling the entity
700    /// store uses. Strips libpq-only URL params (`sslmode=verify-full`,
701    /// `sslrootcert=system`) and uses rustls + native CA store when
702    /// the URL asks for TLS. Used by the auxiliary auth backends
703    /// (sessions, OAuth state, magic codes, accounts, API keys, orgs)
704    /// so they all behave identically to the entity store on managed
705    /// providers (Fly Postgres, PlanetScale, Neon, Supabase).
706    pub fn connect_pg(url: &str) -> Result<postgres::Client, String> {
707        let (cleaned, ssl) = parse_pg_url_ssl(url);
708        if ssl.use_tls {
709            let mut roots = rustls::RootCertStore::empty();
710            let native_certs = rustls_native_certs::load_native_certs();
711            for cert in native_certs.certs {
712                let _ = roots.add(cert);
713            }
714            if !native_certs.errors.is_empty() {
715                tracing::warn!(
716                    "[pg] rustls native cert load reported {} non-fatal errors",
717                    native_certs.errors.len()
718                );
719            }
720            let config = rustls::ClientConfig::builder()
721                .with_root_certificates(roots)
722                .with_no_client_auth();
723            let tls = tokio_postgres_rustls::MakeRustlsConnect::new(config);
724            postgres::Client::connect(&cleaned, tls).map_err(|e| format!("PG connect: {e}"))
725        } else {
726            postgres::Client::connect(&cleaned, postgres::NoTls)
727                .map_err(|e| format!("PG connect: {e}"))
728        }
729    }
730
731    /// SSL parsing result for `parse_pg_url_ssl`. We need both the
732    /// cleaned-up URL (libpq-only params stripped) and the boolean
733    /// "should we use TLS for this connection" so the connect path
734    /// can pick between `NoTls` and `MakeTlsConnector`.
735    pub struct PgUrlSsl {
736        pub use_tls: bool,
737    }
738
739    /// Pre-process a Postgres URL: strip libpq-specific query params
740    /// the Rust `postgres` crate's URL parser doesn't accept, and
741    /// figure out whether TLS should be enabled for the connection.
742    ///
743    /// Recognizes:
744    ///   - sslmode=disable / prefer / allow → no TLS
745    ///   - sslmode=require / verify-ca / verify-full → TLS
746    ///   - sslrootcert=system → TLS, use OS CA store (handled by
747    ///     `native-tls` automatically; we just record the intent)
748    ///   - sslrootcert=<path> → TLS, but the path is currently
749    ///     ignored — `native-tls` reads system roots only. Logged
750    ///     as a one-time warning so operators see the gap.
751    ///
752    /// Anything we don't understand is dropped from the URL silently
753    /// (defense against future libpq additions confusing the parser).
754    pub fn parse_pg_url_ssl(url: &str) -> (String, PgUrlSsl) {
755        let (base, query) = match url.find('?') {
756            Some(idx) => (&url[..idx], &url[idx + 1..]),
757            None => return (url.to_string(), PgUrlSsl { use_tls: false }),
758        };
759
760        let mut use_tls = false;
761        let mut kept: Vec<String> = Vec::new();
762        for pair in query.split('&') {
763            if pair.is_empty() {
764                continue;
765            }
766            let (k, v) = match pair.find('=') {
767                Some(i) => (&pair[..i], &pair[i + 1..]),
768                None => (pair, ""),
769            };
770            match k {
771                "sslmode" => match v.to_ascii_lowercase().as_str() {
772                    "disable" | "allow" => {
773                        // Caller said "no TLS" — pass through to the
774                        // postgres crate, which knows `disable`.
775                        kept.push("sslmode=disable".to_string());
776                    }
777                    "prefer" => {
778                        // Postgres crate's default; let it through.
779                        kept.push("sslmode=prefer".to_string());
780                    }
781                    "require" | "verify-ca" | "verify-full" | "" => {
782                        // All the "use TLS" modes. Rust crate only
783                        // accepts `require`; verify-* would be
784                        // rejected. Normalize to `require` and let
785                        // our TLS connector handle cert verification.
786                        use_tls = true;
787                        kept.push("sslmode=require".to_string());
788                    }
789                    other => {
790                        tracing::warn!(
791                            "[pg] unknown sslmode='{other}' — defaulting to require + TLS"
792                        );
793                        use_tls = true;
794                        kept.push("sslmode=require".to_string());
795                    }
796                },
797                "sslrootcert" => {
798                    // Either "system" (use OS roots — that's what
799                    // native-tls does anyway) or a path (which we
800                    // can't honor without bringing in openssl). Log
801                    // and drop in both cases; TLS still happens.
802                    if v != "system" && !v.is_empty() {
803                        tracing::warn!(
804                            "[pg] sslrootcert={v} ignored — native-tls uses system roots"
805                        );
806                    }
807                    use_tls = true;
808                }
809                _ => {
810                    // Forward everything else (search_path, application_name,
811                    // connect_timeout, etc.) so libpq-style URLs that work
812                    // for the rest of the world keep working here.
813                    kept.push(pair.to_string());
814                }
815            }
816        }
817
818        let cleaned = if kept.is_empty() {
819            base.to_string()
820        } else {
821            format!("{}?{}", base, kept.join("&"))
822        };
823        (cleaned, PgUrlSsl { use_tls })
824    }
825
826    #[cfg(test)]
827    mod url_tests {
828        use super::parse_pg_url_ssl;
829
830        #[test]
831        fn strips_libpq_only_sslmode_verify_full() {
832            let (cleaned, ssl) =
833                parse_pg_url_ssl("postgres://u:p@h:5432/db?sslmode=verify-full&sslrootcert=system");
834            assert!(ssl.use_tls);
835            // verify-full normalized to require; sslrootcert dropped.
836            assert_eq!(cleaned, "postgres://u:p@h:5432/db?sslmode=require");
837        }
838
839        #[test]
840        fn passes_through_disable() {
841            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslmode=disable");
842            assert!(!ssl.use_tls);
843            assert_eq!(cleaned, "postgres://h/db?sslmode=disable");
844        }
845
846        #[test]
847        fn no_query_string_no_tls() {
848            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db");
849            assert!(!ssl.use_tls);
850            assert_eq!(cleaned, "postgres://h/db");
851        }
852
853        #[test]
854        fn unknown_params_pass_through() {
855            let (cleaned, _) =
856                parse_pg_url_ssl("postgres://h/db?application_name=pylon&connect_timeout=5");
857            assert!(cleaned.contains("application_name=pylon"));
858            assert!(cleaned.contains("connect_timeout=5"));
859        }
860
861        #[test]
862        fn sslrootcert_alone_enables_tls() {
863            // Rare but valid — sslrootcert=system implies TLS even
864            // without an explicit sslmode.
865            let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslrootcert=system");
866            assert!(ssl.use_tls);
867            // No sslmode synthesized — the postgres crate defaults
868            // to `prefer` which happily upgrades to our TLS connector.
869            assert_eq!(cleaned, "postgres://h/db");
870        }
871    }
872
873    /// A live Postgres adapter with a real database connection.
874    pub struct LivePostgresAdapter {
875        client: postgres::Client,
876    }
877
878    impl LivePostgresAdapter {
879        /// Borrow the underlying postgres client mutably. Used by
880        /// `PostgresDataStore::with_transaction` to start an
881        /// interactive transaction across multiple TS-function
882        /// `ctx.db` calls. `pub(crate)` because exposing raw
883        /// `&mut Client` outside pylon-storage would let callers
884        /// issue arbitrary SQL, bypassing the typed `DataStore`
885        /// surface that the rest of the framework relies on.
886        pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
887            &mut self.client
888        }
889
890        /// Connect to a Postgres database.
891        ///
892        /// Honors libpq-style URL params the Rust postgres crate doesn't
893        /// natively understand:
894        ///   - `sslmode=verify-full`, `verify-ca`, `require` → use TLS
895        ///     via rustls + the OS trust store. The Rust postgres crate
896        ///     only knows `disable`/`prefer`/`require` — it rejects the
897        ///     libpq extras with "invalid connection string".
898        ///   - `sslrootcert=system` → trust the OS CA store (rustls-
899        ///     native-certs reads it via SecurityFramework / SChannel /
900        ///     /etc/ssl). `sslrootcert=<path>` is not yet supported and
901        ///     falls back to system roots with a warning.
902        ///
903        /// Strips the libpq-only params from the URL before passing to
904        /// the postgres crate's parser, so it doesn't choke. Common
905        /// real-world example that was failing pre-fix: Fly Postgres
906        /// emits `?sslmode=verify-full&sslrootcert=system` URLs.
907        ///
908        /// Uses rustls (not native-tls/openssl) so binary builds
909        /// cross-compile cleanly on musl + arm64 without needing
910        /// OPENSSL_DIR. Pure Rust all the way down.
911        pub fn connect(url: &str) -> Result<Self, StorageError> {
912            let client = connect_pg(url).map_err(|e| StorageError {
913                code: "PG_CONNECT_FAILED".into(),
914                message: format!("Failed to connect to Postgres: {e}"),
915            })?;
916            Ok(Self { client })
917        }
918
919        /// Read the current schema from the live database.
920        pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
921            let table_rows = self
922                .client
923                .query(INTROSPECT_TABLES_SQL, &[])
924                .map_err(pg_err)?;
925
926            let mut tables = Vec::new();
927            for row in &table_rows {
928                let table_name: String = row.get(0);
929                let columns = self.read_columns(&table_name)?;
930                let indexes = self.read_indexes(&table_name)?;
931                tables.push(TableSnapshot {
932                    name: table_name,
933                    columns,
934                    indexes,
935                });
936            }
937
938            Ok(SchemaSnapshot { tables })
939        }
940
941        fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
942            let rows = self
943                .client
944                .query(INTROSPECT_COLUMNS_SQL, &[&table])
945                .map_err(pg_err)?;
946
947            let mut columns = Vec::new();
948            for row in &rows {
949                let name: String = row.get(0);
950                let data_type: String = row.get(1);
951                let is_nullable: String = row.get(2);
952                let is_pk: i64 = row.get(3);
953                columns.push(ColumnSnapshot {
954                    name,
955                    column_type: data_type,
956                    notnull: is_nullable == "NO",
957                    primary_key: is_pk > 0,
958                });
959            }
960            Ok(columns)
961        }
962
963        fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
964            let rows = self
965                .client
966                .query(INTROSPECT_INDEXES_SQL, &[&table])
967                .map_err(pg_err)?;
968
969            let mut indexes = Vec::new();
970            for row in &rows {
971                let name: String = row.get(0);
972                let unique: bool = row.get(1);
973                let columns: Vec<String> = row.get(2);
974                indexes.push(IndexSnapshot {
975                    name,
976                    columns,
977                    unique,
978                });
979            }
980            Ok(indexes)
981        }
982
983        /// Plan from live database state.
984        pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
985            let snapshot = self.read_schema()?;
986            Ok(crate::plan_from_snapshot(&snapshot, target))
987        }
988    }
989
990    impl StorageAdapter for LivePostgresAdapter {
991        fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
992            Err(StorageError {
993                code: "PG_PLAN_NEEDS_MUTABLE".into(),
994                message: "Use plan_from_live() instead for live Postgres planning".into(),
995            })
996        }
997
998        fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
999            Err(StorageError {
1000                code: "PG_APPLY_USE_METHOD".into(),
1001                message: "Use apply_plan() instead of the trait method for live Postgres".into(),
1002            })
1003        }
1004    }
1005
1006    impl LivePostgresAdapter {
1007        /// Apply a schema plan to the live database.
1008        pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
1009            let statements = plan_to_sql(plan)?;
1010            for sql in &statements {
1011                self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
1012            }
1013            Ok(())
1014        }
1015
1016        /// Execute a raw SQL statement against the live database. Used by
1017        /// integration tests for setup/teardown (DROP TABLE, TRUNCATE) —
1018        /// production code should go through `apply_plan` so changes are
1019        /// represented in the migration history. Returns the number of
1020        /// rows affected.
1021        pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
1022            self.client.execute(sql, &[]).map_err(pg_err)
1023        }
1024
1025        /// Insert a row. Returns the generated ID.
1026        pub fn insert(
1027            &mut self,
1028            entity: &str,
1029            data: &serde_json::Value,
1030        ) -> Result<String, StorageError> {
1031            let (sql, values) = build_insert_sql(entity, data)?;
1032            // The first param is always the generated ID — extract it before
1033            // we hand `values` off to the postgres driver as borrowed slices.
1034            let id = match &values[0] {
1035                JsonParam::Text(s) => s.clone(),
1036                _ => {
1037                    return Err(StorageError {
1038                        code: "PG_INTERNAL".into(),
1039                        message: "build_insert_sql produced non-text id param".into(),
1040                    });
1041                }
1042            };
1043            let params = as_pg_params(&values);
1044            self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1045            Ok(id)
1046        }
1047
1048        /// Get a row by ID.
1049        pub fn get_by_id(
1050            &mut self,
1051            entity: &str,
1052            id: &str,
1053        ) -> Result<Option<serde_json::Value>, StorageError> {
1054            let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
1055            let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
1056
1057            match rows.first() {
1058                Some(row) => Ok(Some(row_to_json(row))),
1059                None => Ok(None),
1060            }
1061        }
1062
1063        /// List all rows from an entity.
1064        pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
1065            let sql = format!("SELECT * FROM {}", quote_ident(entity));
1066            let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
1067
1068            Ok(rows.iter().map(row_to_json).collect())
1069        }
1070
1071        /// Cursor-paginated list. `after` is the last `id` from the previous
1072        /// page; the result contains rows with `id > after` (lex order),
1073        /// limited to `limit`. Used for sync push/pull.
1074        pub fn list_after(
1075            &mut self,
1076            entity: &str,
1077            after: Option<&str>,
1078            limit: usize,
1079        ) -> Result<Vec<serde_json::Value>, StorageError> {
1080            // Cap limit at a sensible upper bound so a malicious client can't
1081            // stream the whole table by passing limit=u64::MAX.
1082            let capped: i64 = limit.min(10_000) as i64;
1083            let sql = match after {
1084                Some(_) => format!(
1085                    "SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
1086                    quote_ident(entity)
1087                ),
1088                None => format!(
1089                    "SELECT * FROM {} ORDER BY id ASC LIMIT $1",
1090                    quote_ident(entity)
1091                ),
1092            };
1093            let rows = match after {
1094                Some(cursor) => self
1095                    .client
1096                    .query(sql.as_str(), &[&cursor, &capped])
1097                    .map_err(pg_err)?,
1098                None => self
1099                    .client
1100                    .query(sql.as_str(), &[&capped])
1101                    .map_err(pg_err)?,
1102            };
1103            Ok(rows.iter().map(row_to_json).collect())
1104        }
1105
1106        /// Update a row by ID. Returns true if the row was found and updated.
1107        pub fn update(
1108            &mut self,
1109            entity: &str,
1110            id: &str,
1111            data: &serde_json::Value,
1112        ) -> Result<bool, StorageError> {
1113            let (sql, values) = build_update_sql(entity, id, data)?;
1114            let params = as_pg_params(&values);
1115            let rows_affected = self.client.execute(sql.as_str(), &params).map_err(pg_err)?;
1116            Ok(rows_affected > 0)
1117        }
1118
1119        /// Delete a row by ID. Returns true if the row was found and deleted.
1120        pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
1121            let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1122            let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
1123            Ok(rows_affected > 0)
1124        }
1125
1126        /// Look up a row by `field = value`. Caller must validate `field`
1127        /// against the manifest before calling — we still `quote_ident` it
1128        /// but won't catch a typo against the entity definition.
1129        pub fn lookup_field(
1130            &mut self,
1131            entity: &str,
1132            field: &str,
1133            value: &str,
1134        ) -> Result<Option<serde_json::Value>, StorageError> {
1135            let sql = format!(
1136                "SELECT * FROM {} WHERE {} = $1 LIMIT 1",
1137                quote_ident(entity),
1138                quote_ident(field),
1139            );
1140            let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
1141            Ok(rows.first().map(row_to_json))
1142        }
1143
1144        /// Push a `query_filtered` filter down to a real Postgres `WHERE`.
1145        ///
1146        /// Supported operators (parity with the SQLite path):
1147        /// - Equality (`field: value`)
1148        /// - `$not`: emits `field != value`
1149        /// - `$gt` / `$gte` / `$lt` / `$lte`
1150        /// - `$like`: emits `field LIKE value` (use `%`/`_` wildcards in
1151        ///   the value; case-sensitive — pass `$ilike` for case-insensitive
1152        ///   if/when the SQLite side adds it)
1153        /// - `$in: [..]`: emits `field IN ($1, $2, ...)`
1154        ///
1155        /// Top-level meta operators: `$order`, `$limit`, `$offset`.
1156        ///
1157        /// `$search` (FTS5 on SQLite) is NOT supported here — Postgres
1158        /// would need a tsvector column or a generic ILIKE OR-fold across
1159        /// every text field, neither of which is wired up yet. Returns
1160        /// `SEARCH_NOT_SUPPORTED` so callers can branch instead of
1161        /// receiving silently-broad results.
1162        ///
1163        /// Anything else is silently ignored (matches the in-memory fallback's
1164        /// permissive behavior). Field names are validated against `valid_columns`
1165        /// to prevent SQL injection — pass the entity's column set.
1166        pub fn query_filtered(
1167            &mut self,
1168            entity: &str,
1169            filter: &serde_json::Value,
1170            valid_columns: &[String],
1171        ) -> Result<Vec<serde_json::Value>, StorageError> {
1172            let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
1173            let pg_params = as_pg_params(&params);
1174            let rows = self
1175                .client
1176                .query(sql.as_str(), &pg_params)
1177                .map_err(pg_err)?;
1178            Ok(rows.iter().map(row_to_json).collect())
1179        }
1180
1181        /// Build the `SELECT ... FROM entity ...` SQL + bound params for
1182        /// a `query_filtered` request. Pure: takes a manifest's column
1183        /// list, returns text. Both the live adapter and the in-tx
1184        /// `PgTxStore` call this so the operator surface ($eq, $like,
1185        /// $in, $order, $limit, $offset) stays identical regardless of
1186        /// where the query runs.
1187        pub(crate) fn build_query_filtered_sql(
1188            entity: &str,
1189            filter: &serde_json::Value,
1190            valid_columns: &[String],
1191        ) -> Result<(String, Vec<JsonParam>), StorageError> {
1192            let empty = serde_json::Map::new();
1193            let obj = filter.as_object().unwrap_or(&empty);
1194
1195            let validate = |col: &str| -> Result<(), StorageError> {
1196                if col == "id" || valid_columns.iter().any(|c| c == col) {
1197                    Ok(())
1198                } else {
1199                    Err(StorageError {
1200                        code: "UNKNOWN_COLUMN".into(),
1201                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1202                    })
1203                }
1204            };
1205
1206            let mut where_clauses: Vec<String> = Vec::new();
1207            let mut order_clause = String::new();
1208            let mut limit_clause = String::new();
1209            let mut offset_clause = String::new();
1210            // Collect (col, op, value) so placeholder numbers can be assigned
1211            // in a single materialization pass after the parse loop. Values
1212            // are now JsonParam (typed) instead of String — see `value_to_pg`.
1213            let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
1214
1215            for (key, val) in obj {
1216                match key.as_str() {
1217                    "$search" => {
1218                        // PG full-text via the entity's `_fts_<entity>`
1219                        // shadow table: `id IN (SELECT entity_id FROM
1220                        // _fts_<E> WHERE tsv @@ plainto_tsquery(...))`.
1221                        // Mirrors the SQLite path that joins the FTS5
1222                        // virtual table; the join here is a subquery so
1223                        // it composes with arbitrary other predicates
1224                        // (`$gt`, `$in`, etc.) in the same WHERE.
1225                        let raw = match val {
1226                            serde_json::Value::String(s) => s.clone(),
1227                            other => other.to_string(),
1228                        };
1229                        // Bind as a normal text param so all the
1230                        // existing placeholder-numbering and binding
1231                        // logic applies. The SQL uses the placeholder
1232                        // inside `plainto_tsquery('english', $N)`.
1233                        // Positioning logic mirrors the `$in` arm: the
1234                        // value will land at `planned.len()+1` because
1235                        // the materialization pass below pushes one
1236                        // param per planned item in order.
1237                        let placeholder_n = planned.len() + 1;
1238                        where_clauses.push(format!(
1239                            "{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
1240                                       WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
1241                            quote_ident(entity),
1242                        ));
1243                        // Reuse the IN-style sentinel so the
1244                        // materialization pass below pushes the param
1245                        // without re-emitting a where_clause for it.
1246                        planned.push((
1247                            format!("__search_{}", planned.len()),
1248                            "__INLINE__".into(),
1249                            JsonParam::Text(raw),
1250                        ));
1251                    }
1252                    "$order" => {
1253                        if let Some(ord) = val.as_object() {
1254                            let mut parts = Vec::new();
1255                            for (col, dir) in ord {
1256                                validate(col)?;
1257                                let d = match dir.as_str().unwrap_or("asc") {
1258                                    "desc" | "DESC" => "DESC",
1259                                    _ => "ASC",
1260                                };
1261                                parts.push(format!("{} {d}", quote_ident(col)));
1262                            }
1263                            if !parts.is_empty() {
1264                                order_clause = format!(" ORDER BY {}", parts.join(", "));
1265                            }
1266                        }
1267                    }
1268                    "$limit" => {
1269                        if let Some(n) = val.as_u64() {
1270                            limit_clause = format!(" LIMIT {}", n);
1271                        }
1272                    }
1273                    "$offset" => {
1274                        if let Some(n) = val.as_u64() {
1275                            offset_clause = format!(" OFFSET {}", n);
1276                        }
1277                    }
1278                    field => {
1279                        validate(field)?;
1280                        match val {
1281                            serde_json::Value::Object(ops) => {
1282                                for (op, v) in ops {
1283                                    match op.as_str() {
1284                                        "$not" => planned.push((
1285                                            field.into(),
1286                                            "!=".into(),
1287                                            value_to_pg(v),
1288                                        )),
1289                                        "$gt" => {
1290                                            planned.push((field.into(), ">".into(), value_to_pg(v)))
1291                                        }
1292                                        "$gte" => planned.push((
1293                                            field.into(),
1294                                            ">=".into(),
1295                                            value_to_pg(v),
1296                                        )),
1297                                        "$lt" => {
1298                                            planned.push((field.into(), "<".into(), value_to_pg(v)))
1299                                        }
1300                                        "$lte" => planned.push((
1301                                            field.into(),
1302                                            "<=".into(),
1303                                            value_to_pg(v),
1304                                        )),
1305                                        "$like" => {
1306                                            // Wrap in `%...%` to match the
1307                                            // SQLite path's substring
1308                                            // semantics. Pre-fix divergence:
1309                                            // SQLite wrapped, PG forwarded
1310                                            // literally — `{name: {$like: "ann"}}`
1311                                            // matched "Joanne" on SQLite but
1312                                            // nothing on PG. Caller-supplied
1313                                            // wildcards inside the value still
1314                                            // work (`%j_n%` etc.) because we
1315                                            // only add wraps, never strip.
1316                                            let raw = match v {
1317                                                serde_json::Value::String(s) => s.clone(),
1318                                                other => other.to_string(),
1319                                            };
1320                                            planned.push((
1321                                                field.into(),
1322                                                "LIKE".into(),
1323                                                JsonParam::Text(format!("%{raw}%")),
1324                                            ));
1325                                        }
1326                                        "$in" => {
1327                                            if let Some(arr) = v.as_array() {
1328                                                if arr.is_empty() {
1329                                                    // `field IN ()` is invalid
1330                                                    // SQL on PG (and on SQLite
1331                                                    // too, technically — its
1332                                                    // path also short-circuits).
1333                                                    // An empty $in matches
1334                                                    // nothing; emit a guaranteed-
1335                                                    // false predicate so the
1336                                                    // parser doesn't choke and
1337                                                    // the result set comes back
1338                                                    // empty.
1339                                                    where_clauses.push("FALSE".into());
1340                                                } else {
1341                                                    let placeholders: Vec<String> = (0..arr.len())
1342                                                        .map(|i| {
1343                                                            format!("${}", planned.len() + 1 + i)
1344                                                        })
1345                                                        .collect();
1346                                                    where_clauses.push(format!(
1347                                                        "{} IN ({})",
1348                                                        quote_ident(field),
1349                                                        placeholders.join(", "),
1350                                                    ));
1351                                                    for x in arr {
1352                                                        planned.push((
1353                                                            format!("__inline_{}", planned.len()),
1354                                                            "__INLINE__".into(),
1355                                                            value_to_pg(x),
1356                                                        ));
1357                                                    }
1358                                                }
1359                                            }
1360                                        }
1361                                        _ => {}
1362                                    }
1363                                }
1364                            }
1365                            _ => planned.push((field.into(), "=".into(), value_to_pg(val))),
1366                        }
1367                    }
1368                }
1369            }
1370
1371            // Materialize planned -> SQL + params.
1372            let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
1373            for (field, op, v) in &planned {
1374                if op == "__INLINE__" {
1375                    // Already emitted via the IN-clause path; just push the value.
1376                    params.push(v.clone());
1377                } else {
1378                    let placeholder = format!("${}", params.len() + 1);
1379                    where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
1380                    params.push(v.clone());
1381                }
1382            }
1383
1384            let where_sql = if where_clauses.is_empty() {
1385                String::new()
1386            } else {
1387                format!(" WHERE {}", where_clauses.join(" AND "))
1388            };
1389            // Default deterministic order when the caller didn't pass
1390            // `$order` — matches the SQLite path. Without this,
1391            // identical queries return rows in different orders across
1392            // backends, which makes paginated APIs flaky.
1393            let final_order = if order_clause.is_empty() {
1394                format!(" ORDER BY {}", quote_ident("id"))
1395            } else {
1396                order_clause
1397            };
1398            let sql = format!(
1399                "SELECT * FROM {}{}{}{}{}",
1400                quote_ident(entity),
1401                where_sql,
1402                final_order,
1403                limit_clause,
1404                offset_clause,
1405            );
1406
1407            Ok((sql, params))
1408        }
1409
1410        /// Run a `DataStore::aggregate` spec against Postgres. Mirrors the
1411        /// SQLite path in `pylon-runtime` — supports `count`, `sum`, `avg`,
1412        /// `min`, `max`, `countDistinct`, `groupBy` (plain field names or
1413        /// `{field, bucket: hour|day|week|month|year}` for date bucketing
1414        /// via `date_trunc`), and a flat-equality `where` filter.
1415        ///
1416        /// Spec format (same JSON shape used by the SQLite path):
1417        /// ```json
1418        /// { "count": "*",
1419        ///   "sum": ["amount"],
1420        ///   "groupBy": [{"field": "createdAt", "bucket": "day"}],
1421        ///   "where": {"status": "paid"} }
1422        /// ```
1423        ///
1424        /// `valid_columns` is used to validate every field name before it's
1425        /// quoted into SQL — same pattern as `query_filtered`. Caller (the
1426        /// `DataStore` impl in this crate) supplies the entity's column set
1427        /// from the manifest.
1428        pub fn aggregate(
1429            &mut self,
1430            entity: &str,
1431            spec: &serde_json::Value,
1432            valid_columns: &[String],
1433        ) -> Result<serde_json::Value, StorageError> {
1434            let (sql, params, column_names) =
1435                Self::build_aggregate_sql(entity, spec, valid_columns)?;
1436            let pg_params = as_pg_params(&params);
1437            let rows = self
1438                .client
1439                .query(sql.as_str(), &pg_params)
1440                .map_err(pg_err)?;
1441            Ok(aggregate_rows_to_json(&rows, &column_names))
1442        }
1443
1444        /// Build the aggregate `SELECT` SQL + bound params + the
1445        /// expected output column names. Pure: takes the entity's
1446        /// validated column list, returns text. Both the live adapter
1447        /// and the in-tx `PgTxStore` call this so spec parsing
1448        /// (validation, bucket vocabulary, where-clause translation)
1449        /// stays identical regardless of where the query runs.
1450        pub(crate) fn build_aggregate_sql(
1451            entity: &str,
1452            spec: &serde_json::Value,
1453            valid_columns: &[String],
1454        ) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
1455            let obj = spec.as_object().ok_or_else(|| StorageError {
1456                code: "INVALID_QUERY".into(),
1457                message: "aggregate spec must be a JSON object".into(),
1458            })?;
1459
1460            let validate = |col: &str| -> Result<(), StorageError> {
1461                if col == "id" || valid_columns.iter().any(|c| c == col) {
1462                    Ok(())
1463                } else {
1464                    Err(StorageError {
1465                        code: "UNKNOWN_COLUMN".into(),
1466                        message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
1467                    })
1468                }
1469            };
1470
1471            let mut select_parts: Vec<String> = Vec::new();
1472            let mut result_fields: Vec<String> = Vec::new();
1473
1474            if let Some(count) = obj.get("count") {
1475                match count {
1476                    serde_json::Value::String(s) if s == "*" => {
1477                        select_parts.push("COUNT(*) AS count".into());
1478                        result_fields.push("count".into());
1479                    }
1480                    serde_json::Value::String(field) => {
1481                        validate(field)?;
1482                        let alias = format!("count_{field}");
1483                        select_parts.push(format!(
1484                            "COUNT({}) AS {}",
1485                            quote_ident(field),
1486                            quote_ident(&alias),
1487                        ));
1488                        result_fields.push(alias);
1489                    }
1490                    _ => {}
1491                }
1492            }
1493
1494            for (fn_name, prefix) in [
1495                ("sum", "sum_"),
1496                ("avg", "avg_"),
1497                ("min", "min_"),
1498                ("max", "max_"),
1499            ] {
1500                if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
1501                    for field in fields {
1502                        if let Some(f) = field.as_str() {
1503                            validate(f)?;
1504                            let alias = format!("{prefix}{f}");
1505                            let sql_fn = fn_name.to_uppercase();
1506                            select_parts.push(format!(
1507                                "{}({}) AS {}",
1508                                sql_fn,
1509                                quote_ident(f),
1510                                quote_ident(&alias),
1511                            ));
1512                            result_fields.push(alias);
1513                        }
1514                    }
1515                }
1516            }
1517
1518            if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
1519                for field in fields {
1520                    if let Some(f) = field.as_str() {
1521                        validate(f)?;
1522                        let alias = format!("count_distinct_{f}");
1523                        select_parts.push(format!(
1524                            "COUNT(DISTINCT {}) AS {}",
1525                            quote_ident(f),
1526                            quote_ident(&alias),
1527                        ));
1528                        result_fields.push(alias);
1529                    }
1530                }
1531            }
1532
1533            // groupBy: column name or { field, bucket } — same vocabulary as
1534            // the SQLite path. Buckets translate to Postgres `date_trunc`
1535            // (SQLite uses `strftime`); both collapse rows to the bucket
1536            // boundary identically.
1537            let mut group_by: Vec<String> = Vec::new();
1538            let mut group_select: Vec<String> = Vec::new();
1539            let mut group_field_names: Vec<String> = Vec::new();
1540            if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
1541                for g in groups {
1542                    if let Some(f) = g.as_str() {
1543                        validate(f)?;
1544                        let q = quote_ident(f);
1545                        group_by.push(q.clone());
1546                        group_select.push(q);
1547                        group_field_names.push(f.to_string());
1548                    } else if let Some(spec) = g.as_object() {
1549                        let field =
1550                            spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
1551                                StorageError {
1552                                    code: "INVALID_QUERY".into(),
1553                                    message: "groupBy object spec requires `field`".into(),
1554                                }
1555                            })?;
1556                        validate(field)?;
1557                        let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
1558                        let trunc_unit = match bucket {
1559                            "hour" | "day" | "week" | "month" | "year" => bucket,
1560                            _ => {
1561                                return Err(StorageError {
1562                                    code: "INVALID_QUERY".into(),
1563                                    message: format!(
1564                                        "bucket must be one of hour/day/week/month/year, got {bucket}"
1565                                    ),
1566                                });
1567                            }
1568                        };
1569                        let alias = format!("{field}_{bucket}");
1570                        let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
1571                        group_by.push(expr.clone());
1572                        group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
1573                        group_field_names.push(alias);
1574                    }
1575                }
1576            }
1577
1578            let mut full_select = group_select.clone();
1579            full_select.extend(select_parts.iter().cloned());
1580            if full_select.is_empty() {
1581                return Err(StorageError {
1582                    code: "INVALID_QUERY".into(),
1583                    message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
1584                });
1585            }
1586
1587            let mut where_clauses: Vec<String> = Vec::new();
1588            let mut params: Vec<JsonParam> = Vec::new();
1589            if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
1590                for (k, v) in w {
1591                    validate(k)?;
1592                    let placeholder = format!("${}", params.len() + 1);
1593                    where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
1594                    params.push(value_to_pg(v));
1595                }
1596            }
1597            let where_sql = if where_clauses.is_empty() {
1598                String::new()
1599            } else {
1600                format!(" WHERE {}", where_clauses.join(" AND "))
1601            };
1602            let group_sql = if group_by.is_empty() {
1603                String::new()
1604            } else {
1605                format!(" GROUP BY {}", group_by.join(", "))
1606            };
1607
1608            let sql = format!(
1609                "SELECT {} FROM {}{}{}",
1610                full_select.join(", "),
1611                quote_ident(entity),
1612                where_sql,
1613                group_sql,
1614            );
1615
1616            let column_names: Vec<String> = group_field_names
1617                .iter()
1618                .chain(result_fields.iter())
1619                .cloned()
1620                .collect();
1621
1622            Ok((sql, params, column_names))
1623        }
1624    }
1625
1626    /// Project rows from an aggregate `SELECT` into the
1627    /// `{ rows: [{...}] }` JSON shape both the SQLite path and the
1628    /// PG path return. Pure post-processing — works on rows produced
1629    /// from either `Client::query` or `Transaction::query`.
1630    pub fn aggregate_rows_to_json(
1631        rows: &[postgres::Row],
1632        column_names: &[String],
1633    ) -> serde_json::Value {
1634        let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
1635        for row in rows {
1636            let row_json = row_to_json(row);
1637            if let serde_json::Value::Object(map) = &row_json {
1638                let mut filtered = serde_json::Map::new();
1639                for name in column_names {
1640                    if let Some(v) = map.get(name) {
1641                        filtered.insert(name.clone(), v.clone());
1642                    }
1643                }
1644                out.push(serde_json::Value::Object(filtered));
1645            } else {
1646                out.push(row_json);
1647            }
1648        }
1649        serde_json::json!({ "rows": out })
1650    }
1651
1652    /// Atomic operation describing a single mutation inside [`LivePostgresAdapter::transact`].
1653    pub enum TxOp<'a> {
1654        Insert {
1655            entity: &'a str,
1656            data: &'a serde_json::Value,
1657        },
1658        Update {
1659            entity: &'a str,
1660            id: &'a str,
1661            data: &'a serde_json::Value,
1662        },
1663        Delete {
1664            entity: &'a str,
1665            id: &'a str,
1666        },
1667    }
1668
1669    /// Result of a single op inside a transaction.
1670    #[derive(Debug, Clone)]
1671    pub enum TxResult {
1672        Inserted(String),
1673        Updated(bool),
1674        Deleted(bool),
1675    }
1676
1677    impl LivePostgresAdapter {
1678        /// Run `ops` inside a single Postgres transaction. Either all of them
1679        /// commit together or none of them do — there is no partial state on
1680        /// failure. The ROLLBACK happens implicitly when the `Transaction`
1681        /// guard is dropped without `commit()` being called.
1682        pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
1683            let mut tx = self.client.transaction().map_err(pg_err)?;
1684            let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
1685
1686            for op in ops {
1687                match op {
1688                    TxOp::Insert { entity, data } => {
1689                        let (sql, values) = build_insert_sql(entity, data)?;
1690                        let id = match &values[0] {
1691                            JsonParam::Text(s) => s.clone(),
1692                            _ => {
1693                                return Err(StorageError {
1694                                    code: "PG_INTERNAL".into(),
1695                                    message: "build_insert_sql produced non-text id param".into(),
1696                                });
1697                            }
1698                        };
1699                        let params = as_pg_params(&values);
1700                        tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1701                        results.push(TxResult::Inserted(id));
1702                    }
1703                    TxOp::Update { entity, id, data } => {
1704                        let (sql, values) = build_update_sql(entity, id, data)?;
1705                        let params = as_pg_params(&values);
1706                        let n = tx.execute(sql.as_str(), &params).map_err(pg_err)?;
1707                        results.push(TxResult::Updated(n > 0));
1708                    }
1709                    TxOp::Delete { entity, id } => {
1710                        let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
1711                        let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
1712                        results.push(TxResult::Deleted(n > 0));
1713                    }
1714                }
1715            }
1716
1717            tx.commit().map_err(pg_err)?;
1718            Ok(results)
1719        }
1720    }
1721
1722    /// Lift a JSON value into a typed Postgres parameter. The previous
1723    /// implementation collapsed everything to `String`, which silently
1724    /// stringified ints/bools and turned JSON `null` into `""` for
1725    /// nullable columns. Forwarding through `JsonParam` keeps the column
1726    /// type honest and lets callers `unlink` (set FK to NULL) cleanly.
1727    fn value_to_pg(v: &serde_json::Value) -> JsonParam {
1728        JsonParam::from_json(v)
1729    }
1730
1731    pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
1732        use postgres::types::Type;
1733        let mut obj = serde_json::Map::new();
1734        for (i, col) in row.columns().iter().enumerate() {
1735            let name = col.name().to_string();
1736
1737            // Use `try_get` everywhere — `Row::get` panics on decode mismatch,
1738            // and a panic in a query handler poisons the connection mutex,
1739            // taking down all subsequent reads on this datastore. Anything
1740            // that fails to decode becomes Null with a one-shot warning.
1741            //
1742            // Timestamps and the catch-all path explicitly DON'T request
1743            // `String` — the postgres crate uses binary protocol by default
1744            // and there's no `FromSql<String>` impl for TIMESTAMPTZ etc. We
1745            // ask for `Vec<u8>` and lossy-stringify, which works for all
1746            // text-shaped columns in either protocol.
1747            let value: serde_json::Value = match *col.type_() {
1748                Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
1749                    .flatten()
1750                    .map(serde_json::Value::Bool)
1751                    .unwrap_or(serde_json::Value::Null),
1752                Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
1753                    .flatten()
1754                    .map(|v| serde_json::Value::Number(v.into()))
1755                    .unwrap_or(serde_json::Value::Null),
1756                Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
1757                    .flatten()
1758                    .map(|v| serde_json::Value::Number(v.into()))
1759                    .unwrap_or(serde_json::Value::Null),
1760                Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
1761                    .flatten()
1762                    .map(|v| serde_json::Value::Number(v.into()))
1763                    .unwrap_or(serde_json::Value::Null),
1764                Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
1765                    .flatten()
1766                    .and_then(|v| serde_json::Number::from_f64(v as f64))
1767                    .map(serde_json::Value::Number)
1768                    .unwrap_or(serde_json::Value::Null),
1769                Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
1770                    .flatten()
1771                    .and_then(serde_json::Number::from_f64)
1772                    .map(serde_json::Value::Number)
1773                    .unwrap_or(serde_json::Value::Null),
1774                Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
1775                    .flatten()
1776                    .unwrap_or(serde_json::Value::Null),
1777                Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
1778                    .flatten()
1779                    .map(|b| serde_json::Value::String(b64(&b)))
1780                    .unwrap_or(serde_json::Value::Null),
1781                Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
1782                    try_get_or_null::<Option<String>>(row, i)
1783                        .flatten()
1784                        .map(serde_json::Value::String)
1785                        .unwrap_or(serde_json::Value::Null)
1786                }
1787                Type::TIMESTAMPTZ => {
1788                    // Decode via chrono::DateTime<Utc> (postgres's
1789                    // `with-chrono-0_4` feature provides FromSql) and
1790                    // re-format as ISO 8601 — the shape pylon's clients
1791                    // expect (matches `pylon_kernel::util::now_iso`,
1792                    // so timestamps round-trip with the same surface
1793                    // across SQLite + PG).
1794                    try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
1795                        .flatten()
1796                        .map(|dt| {
1797                            serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1798                        })
1799                        .unwrap_or(serde_json::Value::Null)
1800                }
1801                Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
1802                    .flatten()
1803                    .map(|dt| {
1804                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
1805                    })
1806                    .unwrap_or(serde_json::Value::Null),
1807                Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
1808                    .flatten()
1809                    .map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
1810                    .unwrap_or(serde_json::Value::Null),
1811                _ => {
1812                    // Last resort: ask Postgres to render anything else as
1813                    // text via a stringifying decode through Vec<u8>. If even
1814                    // that fails (rare — Postgres types not implementing the
1815                    // text format), fall through to Null with a warning.
1816                    match row.try_get::<_, Option<String>>(i) {
1817                        Ok(Some(s)) => serde_json::Value::String(s),
1818                        Ok(None) => serde_json::Value::Null,
1819                        Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
1820                            Ok(Some(bytes)) => serde_json::Value::String(
1821                                String::from_utf8_lossy(&bytes).into_owned(),
1822                            ),
1823                            _ => serde_json::Value::Null,
1824                        },
1825                    }
1826                }
1827            };
1828            obj.insert(name, value);
1829        }
1830        serde_json::Value::Object(obj)
1831    }
1832
1833    fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
1834    where
1835        T: postgres::types::FromSql<'a>,
1836    {
1837        match row.try_get::<_, T>(i) {
1838            Ok(v) => Some(v),
1839            Err(e) => {
1840                tracing::warn!(
1841                    "[postgres] decode failed for column {} ({}): {e}",
1842                    i,
1843                    row.columns()[i].name()
1844                );
1845                None
1846            }
1847        }
1848    }
1849
1850    /// Minimal base64 encoder so we don't need another dependency just for
1851    /// the BYTEA column edge case.
1852    fn b64(bytes: &[u8]) -> String {
1853        const TABLE: &[u8; 64] =
1854            b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1855        let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1856        let chunks = bytes.chunks(3);
1857        for chunk in chunks {
1858            let b = [
1859                chunk.first().copied().unwrap_or(0),
1860                chunk.get(1).copied().unwrap_or(0),
1861                chunk.get(2).copied().unwrap_or(0),
1862            ];
1863            out.push(TABLE[(b[0] >> 2) as usize] as char);
1864            out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
1865            if chunk.len() > 1 {
1866                out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
1867            } else {
1868                out.push('=');
1869            }
1870            if chunk.len() > 2 {
1871                out.push(TABLE[(b[2] & 0x3F) as usize] as char);
1872            } else {
1873                out.push('=');
1874            }
1875        }
1876        out
1877    }
1878
1879    fn pg_err(e: postgres::Error) -> StorageError {
1880        // postgres::Error's Display is intentionally short ("db error",
1881        // "connection error" etc.) — the actual SQLSTATE / detail lives
1882        // on the source chain. Walk the chain so the final message has
1883        // enough signal to debug a failed insert/update without
1884        // attaching a debugger.
1885        use std::error::Error;
1886        let mut detail = format!("{e}");
1887        let mut src: Option<&dyn Error> = e.source();
1888        while let Some(s) = src {
1889            detail.push_str(": ");
1890            detail.push_str(&format!("{s}"));
1891            src = s.source();
1892        }
1893        StorageError {
1894            code: "PG_QUERY_FAILED".into(),
1895            message: format!("Postgres query failed: {detail}"),
1896        }
1897    }
1898}
1899
1900// ---------------------------------------------------------------------------
1901// Tests
1902// ---------------------------------------------------------------------------
1903
1904#[cfg(test)]
1905mod tests {
1906    use super::*;
1907
1908    /// Hand-rolled fixture that matches the snapshots in the tests
1909    /// below. Decoupled from any example's `pylon.manifest.json` so
1910    /// changing an example schema doesn't bleed into adapter tests.
1911    fn test_manifest() -> AppManifest {
1912        use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
1913        let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
1914            name: name.into(),
1915            field_type: ty.into(),
1916            optional: opt,
1917            unique: uniq,
1918            crdt: None,
1919        };
1920        AppManifest {
1921            manifest_version: 1,
1922            name: "test".into(),
1923            version: "0.0.0".into(),
1924            entities: vec![
1925                ManifestEntity {
1926                    name: "User".into(),
1927                    fields: vec![
1928                        f("email", "string", false, true),
1929                        f("displayName", "string", false, false),
1930                        f("createdAt", "datetime", false, false),
1931                    ],
1932                    indexes: vec![],
1933                    relations: vec![],
1934                    search: None,
1935                    crdt: true,
1936                },
1937                ManifestEntity {
1938                    name: "Todo".into(),
1939                    fields: vec![
1940                        f("title", "string", false, false),
1941                        f("done", "bool", false, false),
1942                        f("userId", "id(User)", false, false),
1943                        f("createdAt", "datetime", false, false),
1944                    ],
1945                    indexes: vec![ManifestIndex {
1946                        name: "by_user".into(),
1947                        fields: vec!["userId".into()],
1948                        unique: false,
1949                    }],
1950                    relations: vec![],
1951                    search: None,
1952                    crdt: true,
1953                },
1954            ],
1955            queries: vec![],
1956            actions: vec![],
1957            policies: vec![],
1958            routes: vec![],
1959            auth: Default::default(),
1960        }
1961    }
1962
1963    #[test]
1964    fn pg_type_mapping() {
1965        assert_eq!(pg_column_type("string"), "TEXT");
1966        assert_eq!(pg_column_type("int"), "INTEGER");
1967        assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
1968        assert_eq!(pg_column_type("bool"), "BOOLEAN");
1969        assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
1970        assert_eq!(pg_column_type("richtext"), "TEXT");
1971        assert_eq!(pg_column_type("id(User)"), "TEXT");
1972    }
1973
1974    #[test]
1975    fn quote_ident_simple() {
1976        assert_eq!(quote_ident("User"), "\"User\"");
1977        assert_eq!(quote_ident("email"), "\"email\"");
1978    }
1979
1980    #[test]
1981    fn quote_ident_escapes_embedded_double_quotes() {
1982        assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
1983        assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
1984    }
1985
1986    #[test]
1987    fn create_table_sql_basic() {
1988        let fields = vec![
1989            FieldSpec {
1990                name: "email".into(),
1991                field_type: "string".into(),
1992                optional: false,
1993                unique: true,
1994            },
1995            FieldSpec {
1996                name: "age".into(),
1997                field_type: "int".into(),
1998                optional: true,
1999                unique: false,
2000            },
2001        ];
2002        let sql = create_table_sql("User", &fields);
2003        assert_eq!(
2004            sql,
2005            "CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
2006        );
2007    }
2008
2009    #[test]
2010    fn create_table_sql_escapes_identifiers() {
2011        let fields = vec![FieldSpec {
2012            name: "col\"x".into(),
2013            field_type: "string".into(),
2014            optional: false,
2015            unique: false,
2016        }];
2017        let sql = create_table_sql("my\"table", &fields);
2018        assert!(sql.contains("\"my\"\"table\""));
2019        assert!(sql.contains("\"col\"\"x\""));
2020    }
2021
2022    #[test]
2023    fn create_index_sql_unique() {
2024        let sql = create_index_sql("User", "by_email", &["email".into()], true);
2025        assert_eq!(
2026            sql,
2027            "CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
2028        );
2029    }
2030
2031    #[test]
2032    fn create_index_sql_non_unique() {
2033        let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
2034        assert_eq!(
2035            sql,
2036            "CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
2037        );
2038    }
2039
2040    #[test]
2041    fn add_column_sql_basic() {
2042        let field = FieldSpec {
2043            name: "bio".into(),
2044            field_type: "string".into(),
2045            optional: true,
2046            unique: false,
2047        };
2048        let sql = add_column_sql("User", &field);
2049        assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
2050    }
2051
2052    #[test]
2053    fn plan_from_manifest() {
2054        let adapter = PostgresAdapter;
2055        let manifest = test_manifest();
2056        let plan = adapter.plan_schema(&manifest).unwrap();
2057
2058        // Should have CreateEntity for User and Todo, plus AddIndex for by_user.
2059        assert!(plan.operations.iter().any(|op| matches!(
2060            op,
2061            SchemaOperation::CreateEntity { name, .. } if name == "User"
2062        )));
2063        assert!(plan.operations.iter().any(|op| matches!(
2064            op,
2065            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2066        )));
2067        assert!(plan.operations.iter().any(|op| matches!(
2068            op,
2069            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2070        )));
2071    }
2072
2073    #[test]
2074    fn plan_to_sql_produces_statements() {
2075        let adapter = PostgresAdapter;
2076        let manifest = test_manifest();
2077        let plan = adapter.plan_schema(&manifest).unwrap();
2078        let stmts = plan_to_sql(&plan).unwrap();
2079
2080        // 2 CREATE TABLE (User, Todo) + 1 CREATE INDEX for Todo.by_user
2081        // + 1 CREATE INDEX for Todo.by_user_done. The Todo manifest also
2082        // declares a unique by_email index on User which lands as part of
2083        // the table. Final count: 2 tables + 2 indexes.
2084        let create_tables = stmts
2085            .iter()
2086            .filter(|s| s.starts_with("CREATE TABLE"))
2087            .count();
2088        let create_indexes = stmts
2089            .iter()
2090            .filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
2091            .count();
2092        assert_eq!(create_tables, 2);
2093        assert!(create_indexes >= 1);
2094        assert!(stmts[0].starts_with("CREATE TABLE"));
2095        assert!(stmts[1].starts_with("CREATE TABLE"));
2096    }
2097
2098    #[test]
2099    fn plan_to_sql_rejects_unsupported() {
2100        let plan = SchemaPlan {
2101            operations: vec![SchemaOperation::RemoveEntity {
2102                name: "User".into(),
2103            }],
2104        };
2105        let result = plan_to_sql(&plan);
2106        assert!(result.is_err());
2107        assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
2108    }
2109
2110    #[test]
2111    fn apply_not_implemented() {
2112        let adapter = PostgresAdapter;
2113        let plan = SchemaPlan {
2114            operations: vec![SchemaOperation::Noop],
2115        };
2116        let result = adapter.apply_schema(&plan);
2117        assert!(result.is_err());
2118        assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
2119    }
2120
2121    #[test]
2122    fn sql_uses_quoted_identifiers() {
2123        let fields = vec![FieldSpec {
2124            name: "createdAt".into(),
2125            field_type: "datetime".into(),
2126            optional: false,
2127            unique: false,
2128        }];
2129        let sql = create_table_sql("User", &fields);
2130        // Postgres identifiers should be quoted for case-sensitivity.
2131        assert!(sql.contains("\"User\""));
2132        assert!(sql.contains("\"createdAt\""));
2133        assert!(sql.contains("TIMESTAMPTZ"));
2134    }
2135
2136    // -- Introspection SQL tests --
2137
2138    #[test]
2139    fn introspect_sql_constants_are_valid() {
2140        // Sanity checks that the SQL strings exist and look reasonable.
2141        assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
2142        assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
2143        assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
2144        assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
2145    }
2146
2147    // -- Plan from snapshot tests --
2148
2149    #[test]
2150    fn plan_from_empty_snapshot_creates_all() {
2151        let snapshot = crate::SchemaSnapshot { tables: vec![] };
2152        let manifest = test_manifest();
2153        let plan = plan_from_snapshot(&snapshot, &manifest);
2154
2155        assert!(plan.operations.iter().any(|op| matches!(
2156            op,
2157            SchemaOperation::CreateEntity { name, .. } if name == "User"
2158        )));
2159        assert!(plan.operations.iter().any(|op| matches!(
2160            op,
2161            SchemaOperation::CreateEntity { name, .. } if name == "Todo"
2162        )));
2163        assert!(plan.operations.iter().any(|op| matches!(
2164            op,
2165            SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
2166        )));
2167    }
2168
2169    #[test]
2170    fn plan_from_full_snapshot_is_noop() {
2171        let snapshot = crate::SchemaSnapshot {
2172            tables: vec![
2173                crate::TableSnapshot {
2174                    name: "User".into(),
2175                    columns: vec![
2176                        crate::ColumnSnapshot {
2177                            name: "id".into(),
2178                            column_type: "TEXT".into(),
2179                            notnull: true,
2180                            primary_key: true,
2181                        },
2182                        crate::ColumnSnapshot {
2183                            name: "email".into(),
2184                            column_type: "TEXT".into(),
2185                            notnull: true,
2186                            primary_key: false,
2187                        },
2188                        crate::ColumnSnapshot {
2189                            name: "displayName".into(),
2190                            column_type: "TEXT".into(),
2191                            notnull: true,
2192                            primary_key: false,
2193                        },
2194                        crate::ColumnSnapshot {
2195                            name: "createdAt".into(),
2196                            column_type: "TIMESTAMPTZ".into(),
2197                            notnull: true,
2198                            primary_key: false,
2199                        },
2200                    ],
2201                    indexes: vec![],
2202                },
2203                crate::TableSnapshot {
2204                    name: "Todo".into(),
2205                    columns: vec![
2206                        crate::ColumnSnapshot {
2207                            name: "id".into(),
2208                            column_type: "TEXT".into(),
2209                            notnull: true,
2210                            primary_key: true,
2211                        },
2212                        crate::ColumnSnapshot {
2213                            name: "title".into(),
2214                            column_type: "TEXT".into(),
2215                            notnull: true,
2216                            primary_key: false,
2217                        },
2218                        crate::ColumnSnapshot {
2219                            name: "done".into(),
2220                            column_type: "BOOLEAN".into(),
2221                            notnull: true,
2222                            primary_key: false,
2223                        },
2224                        crate::ColumnSnapshot {
2225                            name: "userId".into(),
2226                            column_type: "TEXT".into(),
2227                            notnull: true,
2228                            primary_key: false,
2229                        },
2230                        crate::ColumnSnapshot {
2231                            name: "createdAt".into(),
2232                            column_type: "TIMESTAMPTZ".into(),
2233                            notnull: true,
2234                            primary_key: false,
2235                        },
2236                    ],
2237                    indexes: vec![crate::IndexSnapshot {
2238                        name: "Todo_by_user".into(),
2239                        columns: vec!["userId".into()],
2240                        unique: false,
2241                    }],
2242                },
2243            ],
2244        };
2245        let manifest = test_manifest();
2246        let plan = plan_from_snapshot(&snapshot, &manifest);
2247        assert!(plan.is_empty());
2248    }
2249
2250    #[test]
2251    fn plan_detects_missing_column_in_snapshot() {
2252        let snapshot = crate::SchemaSnapshot {
2253            tables: vec![
2254                crate::TableSnapshot {
2255                    name: "User".into(),
2256                    columns: vec![
2257                        crate::ColumnSnapshot {
2258                            name: "id".into(),
2259                            column_type: "TEXT".into(),
2260                            notnull: true,
2261                            primary_key: true,
2262                        },
2263                        crate::ColumnSnapshot {
2264                            name: "email".into(),
2265                            column_type: "TEXT".into(),
2266                            notnull: true,
2267                            primary_key: false,
2268                        },
2269                        // missing displayName and createdAt
2270                    ],
2271                    indexes: vec![],
2272                },
2273                crate::TableSnapshot {
2274                    name: "Todo".into(),
2275                    columns: vec![
2276                        crate::ColumnSnapshot {
2277                            name: "id".into(),
2278                            column_type: "TEXT".into(),
2279                            notnull: true,
2280                            primary_key: true,
2281                        },
2282                        crate::ColumnSnapshot {
2283                            name: "title".into(),
2284                            column_type: "TEXT".into(),
2285                            notnull: true,
2286                            primary_key: false,
2287                        },
2288                        crate::ColumnSnapshot {
2289                            name: "done".into(),
2290                            column_type: "BOOLEAN".into(),
2291                            notnull: true,
2292                            primary_key: false,
2293                        },
2294                        crate::ColumnSnapshot {
2295                            name: "userId".into(),
2296                            column_type: "TEXT".into(),
2297                            notnull: true,
2298                            primary_key: false,
2299                        },
2300                        crate::ColumnSnapshot {
2301                            name: "createdAt".into(),
2302                            column_type: "TIMESTAMPTZ".into(),
2303                            notnull: true,
2304                            primary_key: false,
2305                        },
2306                    ],
2307                    indexes: vec![crate::IndexSnapshot {
2308                        name: "Todo_by_user".into(),
2309                        columns: vec!["userId".into()],
2310                        unique: false,
2311                    }],
2312                },
2313            ],
2314        };
2315        let manifest = test_manifest();
2316        let plan = plan_from_snapshot(&snapshot, &manifest);
2317
2318        let add_fields: Vec<_> = plan
2319            .operations
2320            .iter()
2321            .filter(|op| matches!(op, SchemaOperation::AddField { .. }))
2322            .collect();
2323        assert_eq!(add_fields.len(), 2); // displayName + createdAt
2324    }
2325
2326    // -- CRUD helper tests (no live database required) --
2327
2328    #[test]
2329    fn json_value_to_string_handles_all_types() {
2330        assert_eq!(
2331            json_value_to_string(&serde_json::Value::String("hello".into())),
2332            "hello"
2333        );
2334        assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
2335        assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
2336        assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
2337        assert_eq!(
2338            json_value_to_string(&serde_json::Value::Bool(false)),
2339            "false"
2340        );
2341        assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
2342        // Arrays and objects get their JSON representation.
2343        assert_eq!(
2344            json_value_to_string(&serde_json::json!([1, 2, 3])),
2345            "[1,2,3]"
2346        );
2347        assert_eq!(
2348            json_value_to_string(&serde_json::json!({"a": 1})),
2349            "{\"a\":1}"
2350        );
2351    }
2352
2353    #[test]
2354    fn generate_id_returns_hex_string() {
2355        let id = generate_id();
2356        assert!(!id.is_empty());
2357        // Must be valid hex characters.
2358        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
2359    }
2360
2361    #[test]
2362    fn generate_id_is_unique_across_calls() {
2363        let id1 = generate_id();
2364        let id2 = generate_id();
2365        assert_ne!(id1, id2);
2366    }
2367
2368    #[test]
2369    fn generate_id_is_lex_sortable() {
2370        // 1000 IDs back-to-back must come out in monotonically increasing
2371        // lexicographic order. This is what makes cursor pagination correct.
2372        let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
2373        let sorted = {
2374            let mut s = ids.clone();
2375            s.sort();
2376            s
2377        };
2378        assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
2379        // And every id must be the same width (otherwise lex comparison is
2380        // wrong at width boundaries).
2381        let len0 = ids[0].len();
2382        assert!(ids.iter().all(|id| id.len() == len0));
2383        ids.dedup();
2384        assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
2385    }
2386
2387    #[test]
2388    fn build_insert_sql_simple() {
2389        let data = serde_json::json!({
2390            "email": "alice@example.com",
2391            "displayName": "Alice"
2392        });
2393        let (sql, values) = build_insert_sql("User", &data).unwrap();
2394
2395        assert!(sql.starts_with("INSERT INTO \"User\""));
2396        assert!(sql.contains("id"));
2397        assert!(sql.contains("$1"));
2398        assert!(sql.contains("$2"));
2399        assert!(sql.contains("$3"));
2400        // First value is the generated ID — JsonParam::Text variant.
2401        match &values[0] {
2402            JsonParam::Text(s) => assert!(!s.is_empty()),
2403            other => panic!("expected Text id param, got {other:?}"),
2404        }
2405        assert_eq!(values.len(), 3); // id + 2 fields
2406    }
2407
2408    #[test]
2409    fn build_insert_sql_preserves_json_types() {
2410        let data = serde_json::json!({
2411            "n": 42,
2412            "f": 1.5,
2413            "b": true,
2414            "s": "hi",
2415            "z": null,
2416        });
2417        let (_sql, values) = build_insert_sql("T", &data).unwrap();
2418        // values[0] is the id; remaining are in BTreeMap order ("b","f","n","s","z").
2419        let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
2420        assert!(matches!(kinds[0], JsonParam::Bool(true)));
2421        assert!(matches!(kinds[1], JsonParam::Float(_)));
2422        assert!(matches!(kinds[2], JsonParam::Int(42)));
2423        assert!(matches!(kinds[3], JsonParam::Text(_)));
2424        assert!(matches!(kinds[4], JsonParam::Null));
2425    }
2426
2427    #[test]
2428    fn build_insert_sql_quotes_column_names() {
2429        let data = serde_json::json!({"createdAt": "2026-01-01"});
2430        let (sql, _) = build_insert_sql("Todo", &data).unwrap();
2431        assert!(sql.contains("\"createdAt\""));
2432        assert!(sql.contains("\"Todo\""));
2433    }
2434
2435    #[test]
2436    fn build_insert_sql_rejects_non_object() {
2437        let data = serde_json::json!("not an object");
2438        let result = build_insert_sql("User", &data);
2439        assert!(result.is_err());
2440        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2441    }
2442
2443    #[test]
2444    fn build_update_sql_simple() {
2445        let data = serde_json::json!({
2446            "displayName": "Bob",
2447            "email": "bob@example.com"
2448        });
2449        let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
2450
2451        assert!(sql.starts_with("UPDATE \"User\" SET"));
2452        assert!(sql.contains("WHERE id = $1"));
2453        assert!(sql.contains("$2"));
2454        assert!(sql.contains("$3"));
2455        match &values[0] {
2456            JsonParam::Text(s) => assert_eq!(s, "abc123"),
2457            other => panic!("expected Text id param, got {other:?}"),
2458        }
2459        assert_eq!(values.len(), 3); // id + 2 fields
2460    }
2461
2462    #[test]
2463    fn build_update_sql_quotes_column_names() {
2464        let data = serde_json::json!({"displayName": "Carol"});
2465        let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
2466        assert!(sql.contains("\"displayName\" = $2"));
2467    }
2468
2469    #[test]
2470    fn build_update_sql_rejects_non_object() {
2471        let data = serde_json::json!(42);
2472        let result = build_update_sql("User", "id1", &data);
2473        assert!(result.is_err());
2474        assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
2475    }
2476
2477    #[test]
2478    fn build_update_sql_rejects_empty_object() {
2479        let data = serde_json::json!({});
2480        let err = build_update_sql("User", "id1", &data).unwrap_err();
2481        assert_eq!(err.code, "PG_INVALID_DATA");
2482        assert!(err.message.contains("at least one field"));
2483    }
2484}